Skip to main content

durable_streams_server/protocol/
error.rs

1use crate::protocol::problem::{ProblemDetails, ProblemResponse, ProblemTelemetry};
2use axum::http::{HeaderValue, StatusCode, header::RETRY_AFTER};
3use std::io;
4use thiserror::Error;
5
6/// Default `Retry-After` value for temporary backend unavailability.
7pub const DEFAULT_STORAGE_RETRY_AFTER_SECS: u32 = 1;
8
9/// Internal classification for storage-originated failures.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StorageFailureClass {
12    Unavailable,
13    InsufficientStorage,
14}
15
16impl StorageFailureClass {
17    #[must_use]
18    pub fn as_str(self) -> &'static str {
19        match self {
20            Self::Unavailable => "unavailable",
21            Self::InsufficientStorage => "insufficient_storage",
22        }
23    }
24}
25
26/// Internal metadata retained for storage-related failures.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct StorageFailure {
29    pub class: StorageFailureClass,
30    pub backend: &'static str,
31    pub operation: String,
32    pub detail: String,
33    pub retry_after_secs: Option<u32>,
34}
35
36impl std::fmt::Display for StorageFailure {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{} {}: {}", self.backend, self.operation, self.detail)
39    }
40}
41
42/// Single error type for all storage and protocol operations
43///
44/// Maps to HTTP status codes in handlers. Following the single error enum
45/// pattern to avoid error type proliferation.
46#[derive(Debug, Error)]
47pub enum Error {
48    /// Stream not found (404)
49    #[error("Stream not found: {0}")]
50    NotFound(String),
51
52    /// Stream already exists with different config (409)
53    #[error("Stream already exists with different configuration")]
54    ConfigMismatch,
55
56    /// Invalid offset format (400)
57    #[error("Invalid offset format: {0}")]
58    InvalidOffset(String),
59
60    /// Content type mismatch (409)
61    #[error("Content type mismatch: expected {expected}, got {actual}")]
62    ContentTypeMismatch { expected: String, actual: String },
63
64    /// Stream is closed (409)
65    #[error("Stream is closed")]
66    StreamClosed,
67
68    /// Producer sequence gap (409)
69    #[error("Producer sequence gap: expected {expected}, got {actual}")]
70    SequenceGap { expected: u64, actual: u64 },
71
72    /// Producer epoch fenced (403)
73    #[error("Producer epoch fenced: current {current}, received {received}")]
74    EpochFenced { current: u64, received: u64 },
75
76    /// Invalid producer state (400)
77    #[error("Invalid producer state: {0}")]
78    InvalidProducerState(String),
79
80    /// Memory limit exceeded (413)
81    #[error("Memory limit exceeded")]
82    MemoryLimitExceeded,
83
84    /// Stream size limit exceeded (413)
85    #[error("Stream size limit exceeded")]
86    StreamSizeLimitExceeded,
87
88    /// Invalid TTL format (400)
89    #[error("Invalid TTL format: {0}")]
90    InvalidTtl(String),
91
92    /// Both TTL and Expires-At provided (400)
93    #[error("Cannot specify both TTL and Expires-At")]
94    ConflictingExpiration,
95
96    /// Stream has expired (404)
97    #[error("Stream has expired")]
98    StreamExpired,
99
100    /// Invalid JSON (400)
101    #[error("Invalid JSON: {0}")]
102    InvalidJson(String),
103
104    /// Empty request body when data is required (400)
105    #[error("Empty request body requires Stream-Closed: true")]
106    EmptyBody,
107
108    /// Empty JSON array append body (400)
109    #[error("Empty JSON arrays are not permitted for append")]
110    EmptyArray,
111
112    /// Invalid header value (400)
113    #[error("Invalid header value for {header}: {reason}")]
114    InvalidHeader { header: String, reason: String },
115
116    /// Invalid stream name (400)
117    #[error("Invalid stream name: {0}")]
118    InvalidStreamName(String),
119
120    /// Stream-Seq ordering violation (409)
121    #[error("Stream-Seq ordering violation: last={last}, received={received}")]
122    SeqOrderingViolation { last: String, received: String },
123
124    /// Storage backend is temporarily unavailable (503)
125    #[error("Storage temporarily unavailable: {0}")]
126    Unavailable(StorageFailure),
127
128    /// Storage backend has insufficient capacity (507)
129    #[error("Storage capacity exhausted: {0}")]
130    InsufficientStorage(StorageFailure),
131
132    /// Storage backend I/O or serialization error (500)
133    #[error("Storage error: {0}")]
134    Storage(String),
135}
136
137impl Error {
138    /// Map error to HTTP status code
139    ///
140    /// This is the single place where errors are mapped to status codes.
141    /// Handlers should use this method to determine the response code.
142    #[must_use]
143    pub fn status_code(&self) -> StatusCode {
144        match self {
145            Self::NotFound(_) | Self::StreamExpired => StatusCode::NOT_FOUND,
146            Self::ConfigMismatch
147            | Self::ContentTypeMismatch { .. }
148            | Self::StreamClosed
149            | Self::SequenceGap { .. }
150            | Self::SeqOrderingViolation { .. } => StatusCode::CONFLICT,
151            Self::EpochFenced { .. } => StatusCode::FORBIDDEN,
152            Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => {
153                StatusCode::PAYLOAD_TOO_LARGE
154            }
155            Self::InvalidOffset(_)
156            | Self::InvalidProducerState(_)
157            | Self::InvalidTtl(_)
158            | Self::ConflictingExpiration
159            | Self::InvalidJson(_)
160            | Self::InvalidHeader { .. }
161            | Self::InvalidStreamName(_)
162            | Self::EmptyBody
163            | Self::EmptyArray => StatusCode::BAD_REQUEST,
164            Self::Unavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
165            Self::InsufficientStorage(_) => {
166                StatusCode::from_u16(507).expect("507 is a valid status code")
167            }
168            Self::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
169        }
170    }
171
172    #[must_use]
173    fn problem_details(&self) -> ProblemDetails {
174        match self {
175            Self::NotFound(name) => ProblemDetails::new(
176                "/errors/not-found",
177                "Stream Not Found",
178                self.status_code(),
179                "NOT_FOUND",
180            )
181            .with_detail(format!("Stream not found: {name}")),
182            Self::ConfigMismatch => ProblemDetails::new(
183                "/errors/already-exists",
184                "Stream Already Exists",
185                self.status_code(),
186                "ALREADY_EXISTS",
187            )
188            .with_detail(self.to_string()),
189            Self::InvalidOffset(_) => ProblemDetails::new(
190                "/errors/invalid-offset",
191                "Invalid Offset",
192                self.status_code(),
193                "INVALID_OFFSET",
194            )
195            .with_detail(self.to_string()),
196            Self::ContentTypeMismatch { .. } => ProblemDetails::new(
197                "/errors/content-type-mismatch",
198                "Content Type Mismatch",
199                self.status_code(),
200                "CONTENT_TYPE_MISMATCH",
201            )
202            .with_detail(self.to_string()),
203            Self::StreamClosed => ProblemDetails::new(
204                "/errors/stream-closed",
205                "Stream Closed",
206                self.status_code(),
207                "STREAM_CLOSED",
208            )
209            .with_detail(self.to_string()),
210            Self::SequenceGap { .. } | Self::SeqOrderingViolation { .. } => ProblemDetails::new(
211                "/errors/sequence-conflict",
212                "Sequence Conflict",
213                self.status_code(),
214                "SEQUENCE_CONFLICT",
215            )
216            .with_detail(self.to_string()),
217            Self::EpochFenced { .. } => ProblemDetails::new(
218                "/errors/producer-epoch-fenced",
219                "Producer Epoch Fenced",
220                self.status_code(),
221                "PRODUCER_EPOCH_FENCED",
222            )
223            .with_detail(self.to_string()),
224            Self::InvalidProducerState(_)
225            | Self::InvalidTtl(_)
226            | Self::ConflictingExpiration
227            | Self::InvalidHeader { .. } => ProblemDetails::new(
228                "/errors/bad-request",
229                "Bad Request",
230                self.status_code(),
231                "BAD_REQUEST",
232            )
233            .with_detail(self.to_string()),
234            Self::InvalidStreamName(_) => ProblemDetails::new(
235                "/errors/invalid-stream-name",
236                "Invalid Stream Name",
237                self.status_code(),
238                "INVALID_STREAM_NAME",
239            )
240            .with_detail(self.to_string()),
241            Self::InvalidJson(_) => ProblemDetails::new(
242                "/errors/invalid-json",
243                "Invalid JSON",
244                self.status_code(),
245                "INVALID_JSON",
246            )
247            .with_detail(self.to_string()),
248            Self::EmptyBody => ProblemDetails::new(
249                "/errors/empty-body",
250                "Empty Body",
251                self.status_code(),
252                "EMPTY_BODY",
253            )
254            .with_detail(self.to_string()),
255            Self::EmptyArray => ProblemDetails::new(
256                "/errors/empty-array",
257                "Empty Array",
258                self.status_code(),
259                "EMPTY_ARRAY",
260            )
261            .with_detail(self.to_string()),
262            Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => ProblemDetails::new(
263                "/errors/payload-too-large",
264                "Payload Too Large",
265                self.status_code(),
266                "PAYLOAD_TOO_LARGE",
267            )
268            .with_detail(self.to_string()),
269            Self::Unavailable(_) => ProblemDetails::new(
270                "/errors/unavailable",
271                "Service Unavailable",
272                self.status_code(),
273                "UNAVAILABLE",
274            )
275            .with_detail("The server is temporarily unable to complete the request."),
276            Self::InsufficientStorage(_) => ProblemDetails::new(
277                "/errors/insufficient-storage",
278                "Insufficient Storage",
279                self.status_code(),
280                "INSUFFICIENT_STORAGE",
281            )
282            .with_detail(
283                "The server does not have enough storage capacity to complete the request.",
284            ),
285            Self::StreamExpired => ProblemDetails::new(
286                "/errors/not-found",
287                "Stream Not Found",
288                self.status_code(),
289                "NOT_FOUND",
290            )
291            .with_detail(self.to_string()),
292            Self::Storage(_) => ProblemDetails::new(
293                "/errors/internal",
294                "Internal Server Error",
295                self.status_code(),
296                "INTERNAL_ERROR",
297            )
298            .with_detail("The server encountered an internal error."),
299        }
300    }
301
302    #[must_use]
303    pub fn storage_unavailable(
304        backend: &'static str,
305        operation: impl Into<String>,
306        detail: impl Into<String>,
307    ) -> Self {
308        Self::Unavailable(StorageFailure {
309            class: StorageFailureClass::Unavailable,
310            backend,
311            operation: operation.into(),
312            detail: detail.into(),
313            retry_after_secs: Some(DEFAULT_STORAGE_RETRY_AFTER_SECS),
314        })
315    }
316
317    #[must_use]
318    pub fn storage_insufficient(
319        backend: &'static str,
320        operation: impl Into<String>,
321        detail: impl Into<String>,
322    ) -> Self {
323        Self::InsufficientStorage(StorageFailure {
324            class: StorageFailureClass::InsufficientStorage,
325            backend,
326            operation: operation.into(),
327            detail: detail.into(),
328            retry_after_secs: None,
329        })
330    }
331
332    #[must_use]
333    pub fn classify_io_failure(
334        backend: &'static str,
335        operation: impl Into<String>,
336        detail: impl Into<String>,
337        error: &io::Error,
338    ) -> Self {
339        match error.kind() {
340            io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
341                Self::storage_unavailable(backend, operation, detail)
342            }
343            io::ErrorKind::StorageFull
344            | io::ErrorKind::QuotaExceeded
345            | io::ErrorKind::FileTooLarge => Self::storage_insufficient(backend, operation, detail),
346            _ => Self::Storage(detail.into()),
347        }
348    }
349
350    #[must_use]
351    pub fn is_retryable_io_error(error: &io::Error) -> bool {
352        matches!(
353            error.kind(),
354            io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
355        )
356    }
357
358    /// Build telemetry metadata for storage-related errors.
359    ///
360    /// Derives the base fields from [`Self::problem_details`] so that the
361    /// type URI, code, title, and detail stay in sync automatically, then
362    /// overlays storage-specific context that is only emitted to logs.
363    #[must_use]
364    fn telemetry(&self) -> Option<ProblemTelemetry> {
365        match self {
366            Self::Unavailable(failure) | Self::InsufficientStorage(failure) => {
367                let mut t = ProblemTelemetry::from(&self.problem_details());
368                t.error_class = Some(failure.class.as_str().to_string());
369                t.storage_backend = Some(failure.backend.to_string());
370                t.storage_operation = Some(failure.operation.clone());
371                t.internal_detail = Some(failure.detail.clone());
372                if let Self::Unavailable(f) = self {
373                    t.retry_after_secs = f.retry_after_secs;
374                }
375                Some(t)
376            }
377            Self::Storage(detail) => {
378                let mut t = ProblemTelemetry::from(&self.problem_details());
379                t.error_class = Some("internal".to_string());
380                t.internal_detail = Some(detail.clone());
381                Some(t)
382            }
383            _ => None,
384        }
385    }
386}
387
388/// Result type alias for storage and protocol operations
389pub type Result<T> = std::result::Result<T, Error>;
390
391impl From<Error> for ProblemResponse {
392    fn from(error: Error) -> Self {
393        let problem = error.problem_details();
394        let telemetry = error.telemetry();
395        let mut response = ProblemResponse::new(problem);
396
397        if let Some(retry_after_secs) = match &error {
398            Error::Unavailable(failure) => failure.retry_after_secs,
399            _ => None,
400        } {
401            response = response.with_header(
402                RETRY_AFTER,
403                HeaderValue::from_str(&retry_after_secs.to_string())
404                    .expect("retry-after header value must be valid"),
405            );
406        }
407
408        if let Some(telemetry) = telemetry {
409            response = response.with_telemetry(telemetry);
410        }
411
412        response
413    }
414}
415
416/// Convert Error to HTTP response
417impl axum::response::IntoResponse for Error {
418    fn into_response(self) -> axum::response::Response {
419        ProblemResponse::from(self).into_response()
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::Error;
426    use axum::http::HeaderValue;
427    use axum::response::IntoResponse;
428    use std::io;
429
430    #[test]
431    fn classify_io_failure_maps_transient_errors_to_503() {
432        let error = io::Error::new(io::ErrorKind::TimedOut, "backend timed out");
433        let response = Error::classify_io_failure(
434            "file",
435            "append stream log",
436            "failed to append stream log: backend timed out",
437            &error,
438        )
439        .into_response();
440
441        assert_eq!(
442            response.status(),
443            axum::http::StatusCode::SERVICE_UNAVAILABLE
444        );
445        assert_eq!(
446            response.headers().get("retry-after").unwrap(),
447            &HeaderValue::from_static("1")
448        );
449    }
450
451    #[test]
452    fn classify_io_failure_maps_capacity_errors_to_507() {
453        let error = io::Error::new(io::ErrorKind::StorageFull, "disk full");
454        let response = Error::classify_io_failure(
455            "file",
456            "sync stream log",
457            "failed to sync stream log: disk full",
458            &error,
459        )
460        .into_response();
461
462        assert_eq!(
463            response.status(),
464            axum::http::StatusCode::from_u16(507).unwrap()
465        );
466        assert!(response.headers().get("retry-after").is_none());
467    }
468}