Skip to main content

durable_streams_server/protocol/
error.rs

1use thiserror::Error;
2
3/// Single error type for all storage and protocol operations
4///
5/// Maps to HTTP status codes in handlers. Following the single error enum
6/// pattern to avoid error type proliferation.
7#[derive(Debug, Error)]
8pub enum Error {
9    /// Stream not found (404)
10    #[error("Stream not found: {0}")]
11    NotFound(String),
12
13    /// Stream already exists with different config (409)
14    #[error("Stream already exists with different configuration")]
15    ConfigMismatch,
16
17    /// Stream already exists (used for idempotent operations)
18    #[error("Stream already exists: {0}")]
19    AlreadyExists(String),
20
21    /// Invalid offset format (400)
22    #[error("Invalid offset format: {0}")]
23    InvalidOffset(String),
24
25    /// Invalid stream name (400)
26    #[error("Invalid stream name: {0}")]
27    InvalidStreamName(String),
28
29    /// Content type mismatch (409)
30    #[error("Content type mismatch: expected {expected}, got {actual}")]
31    ContentTypeMismatch { expected: String, actual: String },
32
33    /// Stream is closed (409)
34    #[error("Stream is closed")]
35    StreamClosed,
36
37    /// Producer sequence regression (409)
38    #[error("Producer sequence regression: expected > {expected}, got {actual}")]
39    SequenceRegression { expected: u64, actual: u64 },
40
41    /// Producer sequence gap (409)
42    #[error("Producer sequence gap: expected {expected}, got {actual}")]
43    SequenceGap { expected: u64, actual: u64 },
44
45    /// Producer epoch fenced (403)
46    #[error("Producer epoch fenced: current {current}, received {received}")]
47    EpochFenced { current: u64, received: u64 },
48
49    /// Invalid producer state (400)
50    #[error("Invalid producer state: {0}")]
51    InvalidProducerState(String),
52
53    /// Memory limit exceeded (413)
54    #[error("Memory limit exceeded")]
55    MemoryLimitExceeded,
56
57    /// Stream size limit exceeded (413)
58    #[error("Stream size limit exceeded")]
59    StreamSizeLimitExceeded,
60
61    /// Invalid TTL format (400)
62    #[error("Invalid TTL format: {0}")]
63    InvalidTtl(String),
64
65    /// Both TTL and Expires-At provided (400)
66    #[error("Cannot specify both TTL and Expires-At")]
67    ConflictingExpiration,
68
69    /// Stream has expired (404)
70    #[error("Stream has expired")]
71    StreamExpired,
72
73    /// Invalid JSON (400)
74    #[error("Invalid JSON: {0}")]
75    InvalidJson(String),
76
77    /// Empty request body when data expected (400)
78    #[error("Empty request body")]
79    EmptyBody,
80
81    /// Invalid header value (400)
82    #[error("Invalid header value for {header}: {reason}")]
83    InvalidHeader { header: String, reason: String },
84
85    /// Stream-Seq ordering violation (409)
86    #[error("Stream-Seq ordering violation: last={last}, received={received}")]
87    SeqOrderingViolation { last: String, received: String },
88
89    /// Storage backend I/O or serialization error (500)
90    #[error("Storage error: {0}")]
91    Storage(String),
92}
93
94impl Error {
95    /// Map error to HTTP status code
96    ///
97    /// This is the single place where errors are mapped to status codes.
98    /// Handlers should use this method to determine the response code.
99    #[must_use]
100    pub fn status_code(&self) -> u16 {
101        match self {
102            Self::NotFound(_) | Self::StreamExpired => 404,
103            Self::ConfigMismatch
104            | Self::ContentTypeMismatch { .. }
105            | Self::StreamClosed
106            | Self::SequenceRegression { .. }
107            | Self::SequenceGap { .. }
108            | Self::SeqOrderingViolation { .. } => 409,
109            Self::EpochFenced { .. } => 403,
110            Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => 413,
111            Self::AlreadyExists(_)
112            | Self::InvalidOffset(_)
113            | Self::InvalidStreamName(_)
114            | Self::InvalidProducerState(_)
115            | Self::InvalidTtl(_)
116            | Self::ConflictingExpiration
117            | Self::InvalidJson(_)
118            | Self::EmptyBody
119            | Self::InvalidHeader { .. } => 400,
120            Self::Storage(_) => 500,
121        }
122    }
123}
124
125/// Result type alias for storage and protocol operations
126pub type Result<T> = std::result::Result<T, Error>;
127
128/// Convert Error to HTTP response
129impl axum::response::IntoResponse for Error {
130    fn into_response(self) -> axum::response::Response {
131        let status = axum::http::StatusCode::from_u16(self.status_code())
132            .unwrap_or(axum::http::StatusCode::INTERNAL_SERVER_ERROR);
133
134        let body = self.to_string();
135
136        (status, body).into_response()
137    }
138}