1use crate::protocol::problem::{ProblemDetails, ProblemResponse, ProblemTelemetry};
2use axum::http::{HeaderValue, StatusCode, header::RETRY_AFTER};
3use std::io;
4use thiserror::Error;
5
6pub const DEFAULT_STORAGE_RETRY_AFTER_SECS: u32 = 1;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StorageFailureClass {
12 Unavailable,
13 InsufficientStorage,
14}
15
16impl StorageFailureClass {
17 #[must_use]
18 pub fn as_str(self) -> &'static str {
19 match self {
20 Self::Unavailable => "unavailable",
21 Self::InsufficientStorage => "insufficient_storage",
22 }
23 }
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct StorageFailure {
29 pub class: StorageFailureClass,
30 pub backend: &'static str,
31 pub operation: String,
32 pub detail: String,
33 pub retry_after_secs: Option<u32>,
34}
35
36impl std::fmt::Display for StorageFailure {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "{} {}: {}", self.backend, self.operation, self.detail)
39 }
40}
41
42#[derive(Debug, Error)]
47pub enum Error {
48 #[error("Stream not found: {0}")]
50 NotFound(String),
51
52 #[error("Stream already exists with different configuration")]
54 ConfigMismatch,
55
56 #[error("Invalid offset format: {0}")]
58 InvalidOffset(String),
59
60 #[error("Content type mismatch: expected {expected}, got {actual}")]
62 ContentTypeMismatch { expected: String, actual: String },
63
64 #[error("Stream is closed")]
66 StreamClosed,
67
68 #[error("Producer sequence gap: expected {expected}, got {actual}")]
70 SequenceGap { expected: u64, actual: u64 },
71
72 #[error("Producer epoch fenced: current {current}, received {received}")]
74 EpochFenced { current: u64, received: u64 },
75
76 #[error("Invalid producer state: {0}")]
78 InvalidProducerState(String),
79
80 #[error("Memory limit exceeded")]
82 MemoryLimitExceeded,
83
84 #[error("Stream size limit exceeded")]
86 StreamSizeLimitExceeded,
87
88 #[error("Invalid TTL format: {0}")]
90 InvalidTtl(String),
91
92 #[error("Cannot specify both TTL and Expires-At")]
94 ConflictingExpiration,
95
96 #[error("Stream has expired")]
98 StreamExpired,
99
100 #[error("Invalid JSON: {0}")]
102 InvalidJson(String),
103
104 #[error("Empty request body requires Stream-Closed: true")]
106 EmptyBody,
107
108 #[error("Empty JSON arrays are not permitted for append")]
110 EmptyArray,
111
112 #[error("Invalid header value for {header}: {reason}")]
114 InvalidHeader { header: String, reason: String },
115
116 #[error("Invalid stream name: {0}")]
118 InvalidStreamName(String),
119
120 #[error("Stream-Seq ordering violation: last={last}, received={received}")]
122 SeqOrderingViolation { last: String, received: String },
123
124 #[error("Storage temporarily unavailable: {0}")]
126 Unavailable(StorageFailure),
127
128 #[error("Storage capacity exhausted: {0}")]
130 InsufficientStorage(StorageFailure),
131
132 #[error("Stream is gone: {0}")]
134 StreamGone(String),
135
136 #[error("Stream path is reserved by a soft-deleted lineage: {0}")]
138 StreamPathBlocked(String),
139
140 #[error("Fork offset is beyond the source stream's tail")]
142 ForkOffsetBeyondTail,
143
144 #[error("Cannot fork from deleted stream: {0}")]
146 ForkFromTombstone(String),
147
148 #[error("Storage error: {0}")]
150 Storage(String),
151}
152
153impl Error {
154 #[must_use]
164 pub fn status_code(&self) -> StatusCode {
165 match self {
166 Self::NotFound(_) | Self::StreamExpired => StatusCode::NOT_FOUND,
167 Self::StreamGone(_) => StatusCode::GONE,
168 Self::StreamPathBlocked(_)
169 | Self::ForkFromTombstone(_)
170 | Self::ConfigMismatch
171 | Self::ContentTypeMismatch { .. }
172 | Self::StreamClosed
173 | Self::SequenceGap { .. }
174 | Self::SeqOrderingViolation { .. } => StatusCode::CONFLICT,
175 Self::EpochFenced { .. } => StatusCode::FORBIDDEN,
176 Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => {
177 StatusCode::PAYLOAD_TOO_LARGE
178 }
179 Self::ForkOffsetBeyondTail
180 | Self::InvalidOffset(_)
181 | Self::InvalidProducerState(_)
182 | Self::InvalidTtl(_)
183 | Self::ConflictingExpiration
184 | Self::InvalidJson(_)
185 | Self::InvalidHeader { .. }
186 | Self::InvalidStreamName(_)
187 | Self::EmptyBody
188 | Self::EmptyArray => StatusCode::BAD_REQUEST,
189 Self::Unavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
190 Self::InsufficientStorage(_) => {
191 StatusCode::from_u16(507).expect("507 is a valid status code")
192 }
193 Self::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
194 }
195 }
196
197 fn simple_problem(
202 &self,
203 type_uri: &'static str,
204 title: &'static str,
205 code: &'static str,
206 ) -> ProblemDetails {
207 ProblemDetails::new(type_uri, title, self.status_code(), code).with_detail(self.to_string())
208 }
209
210 fn conflict_problem(&self) -> ProblemDetails {
211 match self {
212 Self::ConfigMismatch => self.simple_problem(
213 "/errors/already-exists",
214 "Stream Already Exists",
215 "ALREADY_EXISTS",
216 ),
217 Self::ContentTypeMismatch { .. } => self.simple_problem(
218 "/errors/content-type-mismatch",
219 "Content Type Mismatch",
220 "CONTENT_TYPE_MISMATCH",
221 ),
222 Self::StreamClosed => {
223 self.simple_problem("/errors/stream-closed", "Stream Closed", "STREAM_CLOSED")
224 }
225 Self::SequenceGap { .. } | Self::SeqOrderingViolation { .. } => self.simple_problem(
226 "/errors/sequence-conflict",
227 "Sequence Conflict",
228 "SEQUENCE_CONFLICT",
229 ),
230 Self::ForkFromTombstone(_) => self.simple_problem(
231 "/errors/fork-from-tombstone",
232 "Fork From Deleted Stream",
233 "FORK_FROM_TOMBSTONE",
234 ),
235 Self::StreamPathBlocked(name) => ProblemDetails::new(
236 "/errors/path-blocked",
237 "Stream Path Blocked",
238 self.status_code(),
239 "PATH_BLOCKED",
240 )
241 .with_detail(format!(
242 "Stream path is reserved by a soft-deleted lineage: {name}"
243 )),
244 _ => unreachable!("conflict_problem called with non-conflict error"),
245 }
246 }
247
248 fn client_problem(&self) -> ProblemDetails {
249 match self {
250 Self::InvalidOffset(_) => {
251 self.simple_problem("/errors/invalid-offset", "Invalid Offset", "INVALID_OFFSET")
252 }
253 Self::InvalidStreamName(_) => self.simple_problem(
254 "/errors/invalid-stream-name",
255 "Invalid Stream Name",
256 "INVALID_STREAM_NAME",
257 ),
258 Self::InvalidJson(_) => {
259 self.simple_problem("/errors/invalid-json", "Invalid JSON", "INVALID_JSON")
260 }
261 Self::EmptyBody => {
262 self.simple_problem("/errors/empty-body", "Empty Body", "EMPTY_BODY")
263 }
264 Self::EmptyArray => {
265 self.simple_problem("/errors/empty-array", "Empty Array", "EMPTY_ARRAY")
266 }
267 Self::EpochFenced { .. } => self.simple_problem(
268 "/errors/producer-epoch-fenced",
269 "Producer Epoch Fenced",
270 "PRODUCER_EPOCH_FENCED",
271 ),
272 Self::ForkOffsetBeyondTail
273 | Self::InvalidProducerState(_)
274 | Self::InvalidTtl(_)
275 | Self::ConflictingExpiration
276 | Self::InvalidHeader { .. } => {
277 self.simple_problem("/errors/bad-request", "Bad Request", "BAD_REQUEST")
278 }
279 _ => unreachable!("client_problem called with unsupported error"),
280 }
281 }
282
283 fn storage_problem(&self) -> ProblemDetails {
284 match self {
285 Self::Unavailable(_) => ProblemDetails::new(
286 "/errors/unavailable",
287 "Service Unavailable",
288 self.status_code(),
289 "UNAVAILABLE",
290 )
291 .with_detail("The server is temporarily unable to complete the request."),
292 Self::InsufficientStorage(_) => ProblemDetails::new(
293 "/errors/insufficient-storage",
294 "Insufficient Storage",
295 self.status_code(),
296 "INSUFFICIENT_STORAGE",
297 )
298 .with_detail(
299 "The server does not have enough storage capacity to complete the request.",
300 ),
301 Self::Storage(_) => ProblemDetails::new(
302 "/errors/internal",
303 "Internal Server Error",
304 self.status_code(),
305 "INTERNAL_ERROR",
306 )
307 .with_detail("The server encountered an internal error."),
308 _ => unreachable!("storage_problem called with non-storage error"),
309 }
310 }
311
312 #[must_use]
313 fn problem_details(&self) -> ProblemDetails {
314 match self {
315 Self::NotFound(name) => ProblemDetails::new(
316 "/errors/not-found",
317 "Stream Not Found",
318 self.status_code(),
319 "NOT_FOUND",
320 )
321 .with_detail(format!("Stream not found: {name}")),
322 Self::ConfigMismatch
323 | Self::ContentTypeMismatch { .. }
324 | Self::StreamClosed
325 | Self::SequenceGap { .. }
326 | Self::SeqOrderingViolation { .. }
327 | Self::ForkFromTombstone(_)
328 | Self::StreamPathBlocked(_) => self.conflict_problem(),
329 Self::InvalidOffset(_)
330 | Self::EpochFenced { .. }
331 | Self::InvalidProducerState(_)
332 | Self::InvalidTtl(_)
333 | Self::ConflictingExpiration
334 | Self::InvalidJson(_)
335 | Self::EmptyBody
336 | Self::EmptyArray
337 | Self::InvalidHeader { .. }
338 | Self::InvalidStreamName(_)
339 | Self::ForkOffsetBeyondTail => self.client_problem(),
340 Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => self.simple_problem(
341 "/errors/payload-too-large",
342 "Payload Too Large",
343 "PAYLOAD_TOO_LARGE",
344 ),
345 Self::Unavailable(_) | Self::InsufficientStorage(_) | Self::Storage(_) => {
346 self.storage_problem()
347 }
348 Self::StreamExpired => {
349 self.simple_problem("/errors/not-found", "Stream Not Found", "NOT_FOUND")
350 }
351 Self::StreamGone(name) => {
352 ProblemDetails::new("/errors/gone", "Stream Gone", self.status_code(), "GONE")
353 .with_detail(format!("Stream is gone: {name}"))
354 }
355 }
356 }
357
358 #[must_use]
359 pub fn storage_unavailable(
360 backend: &'static str,
361 operation: impl Into<String>,
362 detail: impl Into<String>,
363 ) -> Self {
364 Self::Unavailable(StorageFailure {
365 class: StorageFailureClass::Unavailable,
366 backend,
367 operation: operation.into(),
368 detail: detail.into(),
369 retry_after_secs: Some(DEFAULT_STORAGE_RETRY_AFTER_SECS),
370 })
371 }
372
373 #[must_use]
374 pub fn storage_insufficient(
375 backend: &'static str,
376 operation: impl Into<String>,
377 detail: impl Into<String>,
378 ) -> Self {
379 Self::InsufficientStorage(StorageFailure {
380 class: StorageFailureClass::InsufficientStorage,
381 backend,
382 operation: operation.into(),
383 detail: detail.into(),
384 retry_after_secs: None,
385 })
386 }
387
388 #[must_use]
389 pub fn classify_io_failure(
390 backend: &'static str,
391 operation: impl Into<String>,
392 detail: impl Into<String>,
393 error: &io::Error,
394 ) -> Self {
395 match error.kind() {
396 io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
397 Self::storage_unavailable(backend, operation, detail)
398 }
399 io::ErrorKind::StorageFull
400 | io::ErrorKind::QuotaExceeded
401 | io::ErrorKind::FileTooLarge => Self::storage_insufficient(backend, operation, detail),
402 _ => Self::Storage(detail.into()),
403 }
404 }
405
406 #[must_use]
407 pub fn is_retryable_io_error(error: &io::Error) -> bool {
408 matches!(
409 error.kind(),
410 io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
411 )
412 }
413
414 #[must_use]
420 fn telemetry(&self) -> Option<ProblemTelemetry> {
421 match self {
422 Self::Unavailable(failure) | Self::InsufficientStorage(failure) => {
423 let mut t = ProblemTelemetry::from(&self.problem_details());
424 t.error_class = Some(failure.class.as_str().to_string());
425 t.storage_backend = Some(failure.backend.to_string());
426 t.storage_operation = Some(failure.operation.clone());
427 t.internal_detail = Some(failure.detail.clone());
428 if let Self::Unavailable(f) = self {
429 t.retry_after_secs = f.retry_after_secs;
430 }
431 Some(t)
432 }
433 Self::Storage(detail) => {
434 let mut t = ProblemTelemetry::from(&self.problem_details());
435 t.error_class = Some("internal".to_string());
436 t.internal_detail = Some(detail.clone());
437 Some(t)
438 }
439 _ => None,
440 }
441 }
442}
443
444pub type Result<T> = std::result::Result<T, Error>;
446
447impl From<Error> for ProblemResponse {
448 fn from(error: Error) -> Self {
449 let problem = error.problem_details();
450 let telemetry = error.telemetry();
451 let mut response = ProblemResponse::new(problem);
452
453 if let Some(retry_after_secs) = match &error {
454 Error::Unavailable(failure) => failure.retry_after_secs,
455 _ => None,
456 } {
457 response = response.with_header(
458 RETRY_AFTER,
459 HeaderValue::from_str(&retry_after_secs.to_string())
460 .expect("retry-after header value must be valid"),
461 );
462 }
463
464 if let Some(telemetry) = telemetry {
465 response = response.with_telemetry(telemetry);
466 }
467
468 response
469 }
470}
471
472impl axum::response::IntoResponse for Error {
474 fn into_response(self) -> axum::response::Response {
475 ProblemResponse::from(self).into_response()
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::Error;
482 use axum::http::HeaderValue;
483 use axum::response::IntoResponse;
484 use std::io;
485
486 #[test]
487 fn classify_io_failure_maps_transient_errors_to_503() {
488 let error = io::Error::new(io::ErrorKind::TimedOut, "backend timed out");
489 let response = Error::classify_io_failure(
490 "file",
491 "append stream log",
492 "failed to append stream log: backend timed out",
493 &error,
494 )
495 .into_response();
496
497 assert_eq!(
498 response.status(),
499 axum::http::StatusCode::SERVICE_UNAVAILABLE
500 );
501 assert_eq!(
502 response.headers().get("retry-after").unwrap(),
503 &HeaderValue::from_static("1")
504 );
505 }
506
507 #[test]
508 fn classify_io_failure_maps_capacity_errors_to_507() {
509 let error = io::Error::new(io::ErrorKind::StorageFull, "disk full");
510 let response = Error::classify_io_failure(
511 "file",
512 "sync stream log",
513 "failed to sync stream log: disk full",
514 &error,
515 )
516 .into_response();
517
518 assert_eq!(
519 response.status(),
520 axum::http::StatusCode::from_u16(507).unwrap()
521 );
522 assert!(response.headers().get("retry-after").is_none());
523 }
524}