Skip to main content

durable_streams_server/protocol/
error.rs

1use crate::protocol::problem::{ProblemDetails, ProblemResponse, ProblemTelemetry};
2use axum::http::{HeaderValue, StatusCode, header::RETRY_AFTER};
3use std::io;
4use thiserror::Error;
5
6/// Default `Retry-After` value for temporary backend unavailability.
7pub const DEFAULT_STORAGE_RETRY_AFTER_SECS: u32 = 1;
8
9/// Internal classification for storage-originated failures.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StorageFailureClass {
12    Unavailable,
13    InsufficientStorage,
14}
15
16impl StorageFailureClass {
17    #[must_use]
18    pub fn as_str(self) -> &'static str {
19        match self {
20            Self::Unavailable => "unavailable",
21            Self::InsufficientStorage => "insufficient_storage",
22        }
23    }
24}
25
26/// Internal metadata retained for storage-related failures.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct StorageFailure {
29    pub class: StorageFailureClass,
30    pub backend: &'static str,
31    pub operation: String,
32    pub detail: String,
33    pub retry_after_secs: Option<u32>,
34}
35
36impl std::fmt::Display for StorageFailure {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{} {}: {}", self.backend, self.operation, self.detail)
39    }
40}
41
42/// Single error type for all storage and protocol operations
43///
44/// Maps to HTTP status codes in handlers. Following the single error enum
45/// pattern to avoid error type proliferation.
46#[derive(Debug, Error)]
47pub enum Error {
48    /// Stream not found (404)
49    #[error("Stream not found: {0}")]
50    NotFound(String),
51
52    /// Stream already exists with different config (409)
53    #[error("Stream already exists with different configuration")]
54    ConfigMismatch,
55
56    /// Invalid offset format (400)
57    #[error("Invalid offset format: {0}")]
58    InvalidOffset(String),
59
60    /// Content type mismatch (409)
61    #[error("Content type mismatch: expected {expected}, got {actual}")]
62    ContentTypeMismatch { expected: String, actual: String },
63
64    /// Stream is closed (409)
65    #[error("Stream is closed")]
66    StreamClosed,
67
68    /// Producer sequence gap (409)
69    #[error("Producer sequence gap: expected {expected}, got {actual}")]
70    SequenceGap { expected: u64, actual: u64 },
71
72    /// Producer epoch fenced (403)
73    #[error("Producer epoch fenced: current {current}, received {received}")]
74    EpochFenced { current: u64, received: u64 },
75
76    /// Invalid producer state (400)
77    #[error("Invalid producer state: {0}")]
78    InvalidProducerState(String),
79
80    /// Memory limit exceeded (413)
81    #[error("Memory limit exceeded")]
82    MemoryLimitExceeded,
83
84    /// Stream size limit exceeded (413)
85    #[error("Stream size limit exceeded")]
86    StreamSizeLimitExceeded,
87
88    /// Invalid TTL format (400)
89    #[error("Invalid TTL format: {0}")]
90    InvalidTtl(String),
91
92    /// Both TTL and Expires-At provided (400)
93    #[error("Cannot specify both TTL and Expires-At")]
94    ConflictingExpiration,
95
96    /// Stream has expired (404)
97    #[error("Stream has expired")]
98    StreamExpired,
99
100    /// Invalid JSON (400)
101    #[error("Invalid JSON: {0}")]
102    InvalidJson(String),
103
104    /// Empty request body when data is required (400)
105    #[error("Empty request body requires Stream-Closed: true")]
106    EmptyBody,
107
108    /// Empty JSON array append body (400)
109    #[error("Empty JSON arrays are not permitted for append")]
110    EmptyArray,
111
112    /// Invalid header value (400)
113    #[error("Invalid header value for {header}: {reason}")]
114    InvalidHeader { header: String, reason: String },
115
116    /// Invalid stream name (400)
117    #[error("Invalid stream name: {0}")]
118    InvalidStreamName(String),
119
120    /// Stream-Seq ordering violation (409)
121    #[error("Stream-Seq ordering violation: last={last}, received={received}")]
122    SeqOrderingViolation { last: String, received: String },
123
124    /// Storage backend is temporarily unavailable (503)
125    #[error("Storage temporarily unavailable: {0}")]
126    Unavailable(StorageFailure),
127
128    /// Storage backend has insufficient capacity (507)
129    #[error("Storage capacity exhausted: {0}")]
130    InsufficientStorage(StorageFailure),
131
132    /// Stream has been deleted (tombstoned) and is gone (410)
133    #[error("Stream is gone: {0}")]
134    StreamGone(String),
135
136    /// Stream path is blocked because a soft-deleted lineage still owns it (409)
137    #[error("Stream path is reserved by a soft-deleted lineage: {0}")]
138    StreamPathBlocked(String),
139
140    /// Fork offset exceeds the source stream's tail (400)
141    #[error("Fork offset is beyond the source stream's tail")]
142    ForkOffsetBeyondTail,
143
144    /// Cannot fork from a tombstoned stream (409)
145    #[error("Cannot fork from deleted stream: {0}")]
146    ForkFromTombstone(String),
147
148    /// Storage backend I/O or serialization error (500)
149    #[error("Storage error: {0}")]
150    Storage(String),
151}
152
153impl Error {
154    /// Map error to HTTP status code
155    ///
156    /// This is the single place where errors are mapped to status codes.
157    /// Handlers should use this method to determine the response code.
158    ///
159    /// # Panics
160    ///
161    /// Panics if HTTP status code 507 cannot be constructed, which should
162    /// never happen since 507 is a valid IANA-registered status code.
163    #[must_use]
164    pub fn status_code(&self) -> StatusCode {
165        match self {
166            Self::NotFound(_) | Self::StreamExpired => StatusCode::NOT_FOUND,
167            Self::StreamGone(_) => StatusCode::GONE,
168            Self::StreamPathBlocked(_)
169            | Self::ForkFromTombstone(_)
170            | Self::ConfigMismatch
171            | Self::ContentTypeMismatch { .. }
172            | Self::StreamClosed
173            | Self::SequenceGap { .. }
174            | Self::SeqOrderingViolation { .. } => StatusCode::CONFLICT,
175            Self::EpochFenced { .. } => StatusCode::FORBIDDEN,
176            Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => {
177                StatusCode::PAYLOAD_TOO_LARGE
178            }
179            Self::ForkOffsetBeyondTail
180            | Self::InvalidOffset(_)
181            | Self::InvalidProducerState(_)
182            | Self::InvalidTtl(_)
183            | Self::ConflictingExpiration
184            | Self::InvalidJson(_)
185            | Self::InvalidHeader { .. }
186            | Self::InvalidStreamName(_)
187            | Self::EmptyBody
188            | Self::EmptyArray => StatusCode::BAD_REQUEST,
189            Self::Unavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
190            Self::InsufficientStorage(_) => {
191                StatusCode::from_u16(507).expect("507 is a valid status code")
192            }
193            Self::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
194        }
195    }
196
197    /// Build a [`ProblemDetails`] using `self.to_string()` as the detail message.
198    ///
199    /// This covers the common case where the `Display` impl already provides a
200    /// sufficiently descriptive detail string.
201    fn simple_problem(
202        &self,
203        type_uri: &'static str,
204        title: &'static str,
205        code: &'static str,
206    ) -> ProblemDetails {
207        ProblemDetails::new(type_uri, title, self.status_code(), code).with_detail(self.to_string())
208    }
209
210    fn conflict_problem(&self) -> ProblemDetails {
211        match self {
212            Self::ConfigMismatch => self.simple_problem(
213                "/errors/already-exists",
214                "Stream Already Exists",
215                "ALREADY_EXISTS",
216            ),
217            Self::ContentTypeMismatch { .. } => self.simple_problem(
218                "/errors/content-type-mismatch",
219                "Content Type Mismatch",
220                "CONTENT_TYPE_MISMATCH",
221            ),
222            Self::StreamClosed => {
223                self.simple_problem("/errors/stream-closed", "Stream Closed", "STREAM_CLOSED")
224            }
225            Self::SequenceGap { .. } | Self::SeqOrderingViolation { .. } => self.simple_problem(
226                "/errors/sequence-conflict",
227                "Sequence Conflict",
228                "SEQUENCE_CONFLICT",
229            ),
230            Self::ForkFromTombstone(_) => self.simple_problem(
231                "/errors/fork-from-tombstone",
232                "Fork From Deleted Stream",
233                "FORK_FROM_TOMBSTONE",
234            ),
235            Self::StreamPathBlocked(name) => ProblemDetails::new(
236                "/errors/path-blocked",
237                "Stream Path Blocked",
238                self.status_code(),
239                "PATH_BLOCKED",
240            )
241            .with_detail(format!(
242                "Stream path is reserved by a soft-deleted lineage: {name}"
243            )),
244            _ => unreachable!("conflict_problem called with non-conflict error"),
245        }
246    }
247
248    fn client_problem(&self) -> ProblemDetails {
249        match self {
250            Self::InvalidOffset(_) => {
251                self.simple_problem("/errors/invalid-offset", "Invalid Offset", "INVALID_OFFSET")
252            }
253            Self::InvalidStreamName(_) => self.simple_problem(
254                "/errors/invalid-stream-name",
255                "Invalid Stream Name",
256                "INVALID_STREAM_NAME",
257            ),
258            Self::InvalidJson(_) => {
259                self.simple_problem("/errors/invalid-json", "Invalid JSON", "INVALID_JSON")
260            }
261            Self::EmptyBody => {
262                self.simple_problem("/errors/empty-body", "Empty Body", "EMPTY_BODY")
263            }
264            Self::EmptyArray => {
265                self.simple_problem("/errors/empty-array", "Empty Array", "EMPTY_ARRAY")
266            }
267            Self::EpochFenced { .. } => self.simple_problem(
268                "/errors/producer-epoch-fenced",
269                "Producer Epoch Fenced",
270                "PRODUCER_EPOCH_FENCED",
271            ),
272            Self::ForkOffsetBeyondTail
273            | Self::InvalidProducerState(_)
274            | Self::InvalidTtl(_)
275            | Self::ConflictingExpiration
276            | Self::InvalidHeader { .. } => {
277                self.simple_problem("/errors/bad-request", "Bad Request", "BAD_REQUEST")
278            }
279            _ => unreachable!("client_problem called with unsupported error"),
280        }
281    }
282
283    fn storage_problem(&self) -> ProblemDetails {
284        match self {
285            Self::Unavailable(_) => ProblemDetails::new(
286                "/errors/unavailable",
287                "Service Unavailable",
288                self.status_code(),
289                "UNAVAILABLE",
290            )
291            .with_detail("The server is temporarily unable to complete the request."),
292            Self::InsufficientStorage(_) => ProblemDetails::new(
293                "/errors/insufficient-storage",
294                "Insufficient Storage",
295                self.status_code(),
296                "INSUFFICIENT_STORAGE",
297            )
298            .with_detail(
299                "The server does not have enough storage capacity to complete the request.",
300            ),
301            Self::Storage(_) => ProblemDetails::new(
302                "/errors/internal",
303                "Internal Server Error",
304                self.status_code(),
305                "INTERNAL_ERROR",
306            )
307            .with_detail("The server encountered an internal error."),
308            _ => unreachable!("storage_problem called with non-storage error"),
309        }
310    }
311
312    #[must_use]
313    fn problem_details(&self) -> ProblemDetails {
314        match self {
315            Self::NotFound(name) => ProblemDetails::new(
316                "/errors/not-found",
317                "Stream Not Found",
318                self.status_code(),
319                "NOT_FOUND",
320            )
321            .with_detail(format!("Stream not found: {name}")),
322            Self::ConfigMismatch
323            | Self::ContentTypeMismatch { .. }
324            | Self::StreamClosed
325            | Self::SequenceGap { .. }
326            | Self::SeqOrderingViolation { .. }
327            | Self::ForkFromTombstone(_)
328            | Self::StreamPathBlocked(_) => self.conflict_problem(),
329            Self::InvalidOffset(_)
330            | Self::EpochFenced { .. }
331            | Self::InvalidProducerState(_)
332            | Self::InvalidTtl(_)
333            | Self::ConflictingExpiration
334            | Self::InvalidJson(_)
335            | Self::EmptyBody
336            | Self::EmptyArray
337            | Self::InvalidHeader { .. }
338            | Self::InvalidStreamName(_)
339            | Self::ForkOffsetBeyondTail => self.client_problem(),
340            Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => self.simple_problem(
341                "/errors/payload-too-large",
342                "Payload Too Large",
343                "PAYLOAD_TOO_LARGE",
344            ),
345            Self::Unavailable(_) | Self::InsufficientStorage(_) | Self::Storage(_) => {
346                self.storage_problem()
347            }
348            Self::StreamExpired => {
349                self.simple_problem("/errors/not-found", "Stream Not Found", "NOT_FOUND")
350            }
351            Self::StreamGone(name) => {
352                ProblemDetails::new("/errors/gone", "Stream Gone", self.status_code(), "GONE")
353                    .with_detail(format!("Stream is gone: {name}"))
354            }
355        }
356    }
357
358    #[must_use]
359    pub fn storage_unavailable(
360        backend: &'static str,
361        operation: impl Into<String>,
362        detail: impl Into<String>,
363    ) -> Self {
364        Self::Unavailable(StorageFailure {
365            class: StorageFailureClass::Unavailable,
366            backend,
367            operation: operation.into(),
368            detail: detail.into(),
369            retry_after_secs: Some(DEFAULT_STORAGE_RETRY_AFTER_SECS),
370        })
371    }
372
373    #[must_use]
374    pub fn storage_insufficient(
375        backend: &'static str,
376        operation: impl Into<String>,
377        detail: impl Into<String>,
378    ) -> Self {
379        Self::InsufficientStorage(StorageFailure {
380            class: StorageFailureClass::InsufficientStorage,
381            backend,
382            operation: operation.into(),
383            detail: detail.into(),
384            retry_after_secs: None,
385        })
386    }
387
388    #[must_use]
389    pub fn classify_io_failure(
390        backend: &'static str,
391        operation: impl Into<String>,
392        detail: impl Into<String>,
393        error: &io::Error,
394    ) -> Self {
395        match error.kind() {
396            io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
397                Self::storage_unavailable(backend, operation, detail)
398            }
399            io::ErrorKind::StorageFull
400            | io::ErrorKind::QuotaExceeded
401            | io::ErrorKind::FileTooLarge => Self::storage_insufficient(backend, operation, detail),
402            _ => Self::Storage(detail.into()),
403        }
404    }
405
406    #[must_use]
407    pub fn is_retryable_io_error(error: &io::Error) -> bool {
408        matches!(
409            error.kind(),
410            io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
411        )
412    }
413
414    /// Build telemetry metadata for storage-related errors.
415    ///
416    /// Derives the base fields from [`Self::problem_details`] so that the
417    /// type URI, code, title, and detail stay in sync automatically, then
418    /// overlays storage-specific context that is only emitted to logs.
419    #[must_use]
420    fn telemetry(&self) -> Option<ProblemTelemetry> {
421        match self {
422            Self::Unavailable(failure) | Self::InsufficientStorage(failure) => {
423                let mut t = ProblemTelemetry::from(&self.problem_details());
424                t.error_class = Some(failure.class.as_str().to_string());
425                t.storage_backend = Some(failure.backend.to_string());
426                t.storage_operation = Some(failure.operation.clone());
427                t.internal_detail = Some(failure.detail.clone());
428                if let Self::Unavailable(f) = self {
429                    t.retry_after_secs = f.retry_after_secs;
430                }
431                Some(t)
432            }
433            Self::Storage(detail) => {
434                let mut t = ProblemTelemetry::from(&self.problem_details());
435                t.error_class = Some("internal".to_string());
436                t.internal_detail = Some(detail.clone());
437                Some(t)
438            }
439            _ => None,
440        }
441    }
442}
443
444/// Result type alias for storage and protocol operations
445pub type Result<T> = std::result::Result<T, Error>;
446
447impl From<Error> for ProblemResponse {
448    fn from(error: Error) -> Self {
449        let problem = error.problem_details();
450        let telemetry = error.telemetry();
451        let mut response = ProblemResponse::new(problem);
452
453        if let Some(retry_after_secs) = match &error {
454            Error::Unavailable(failure) => failure.retry_after_secs,
455            _ => None,
456        } {
457            response = response.with_header(
458                RETRY_AFTER,
459                HeaderValue::from_str(&retry_after_secs.to_string())
460                    .expect("retry-after header value must be valid"),
461            );
462        }
463
464        if let Some(telemetry) = telemetry {
465            response = response.with_telemetry(telemetry);
466        }
467
468        response
469    }
470}
471
472/// Convert Error to HTTP response
473impl axum::response::IntoResponse for Error {
474    fn into_response(self) -> axum::response::Response {
475        ProblemResponse::from(self).into_response()
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use super::Error;
482    use axum::http::HeaderValue;
483    use axum::response::IntoResponse;
484    use std::io;
485
486    #[test]
487    fn classify_io_failure_maps_transient_errors_to_503() {
488        let error = io::Error::new(io::ErrorKind::TimedOut, "backend timed out");
489        let response = Error::classify_io_failure(
490            "file",
491            "append stream log",
492            "failed to append stream log: backend timed out",
493            &error,
494        )
495        .into_response();
496
497        assert_eq!(
498            response.status(),
499            axum::http::StatusCode::SERVICE_UNAVAILABLE
500        );
501        assert_eq!(
502            response.headers().get("retry-after").unwrap(),
503            &HeaderValue::from_static("1")
504        );
505    }
506
507    #[test]
508    fn classify_io_failure_maps_capacity_errors_to_507() {
509        let error = io::Error::new(io::ErrorKind::StorageFull, "disk full");
510        let response = Error::classify_io_failure(
511            "file",
512            "sync stream log",
513            "failed to sync stream log: disk full",
514            &error,
515        )
516        .into_response();
517
518        assert_eq!(
519            response.status(),
520            axum::http::StatusCode::from_u16(507).unwrap()
521        );
522        assert!(response.headers().get("retry-after").is_none());
523    }
524}