1use crate::protocol::problem::{ProblemDetails, ProblemResponse, ProblemTelemetry};
2use axum::http::{HeaderValue, StatusCode, header::RETRY_AFTER};
3use std::io;
4use thiserror::Error;
5
6pub const DEFAULT_STORAGE_RETRY_AFTER_SECS: u32 = 1;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum StorageFailureClass {
12 Unavailable,
13 InsufficientStorage,
14}
15
16impl StorageFailureClass {
17 #[must_use]
18 pub fn as_str(self) -> &'static str {
19 match self {
20 Self::Unavailable => "unavailable",
21 Self::InsufficientStorage => "insufficient_storage",
22 }
23 }
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct StorageFailure {
29 pub class: StorageFailureClass,
30 pub backend: &'static str,
31 pub operation: String,
32 pub detail: String,
33 pub retry_after_secs: Option<u32>,
34}
35
36impl std::fmt::Display for StorageFailure {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "{} {}: {}", self.backend, self.operation, self.detail)
39 }
40}
41
42#[derive(Debug, Error)]
47pub enum Error {
48 #[error("Stream not found: {0}")]
50 NotFound(String),
51
52 #[error("Stream already exists with different configuration")]
54 ConfigMismatch,
55
56 #[error("Invalid offset format: {0}")]
58 InvalidOffset(String),
59
60 #[error("Content type mismatch: expected {expected}, got {actual}")]
62 ContentTypeMismatch { expected: String, actual: String },
63
64 #[error("Stream is closed")]
66 StreamClosed,
67
68 #[error("Producer sequence gap: expected {expected}, got {actual}")]
70 SequenceGap { expected: u64, actual: u64 },
71
72 #[error("Producer epoch fenced: current {current}, received {received}")]
74 EpochFenced { current: u64, received: u64 },
75
76 #[error("Invalid producer state: {0}")]
78 InvalidProducerState(String),
79
80 #[error("Memory limit exceeded")]
82 MemoryLimitExceeded,
83
84 #[error("Stream size limit exceeded")]
86 StreamSizeLimitExceeded,
87
88 #[error("Invalid TTL format: {0}")]
90 InvalidTtl(String),
91
92 #[error("Cannot specify both TTL and Expires-At")]
94 ConflictingExpiration,
95
96 #[error("Stream has expired")]
98 StreamExpired,
99
100 #[error("Invalid JSON: {0}")]
102 InvalidJson(String),
103
104 #[error("Empty request body requires Stream-Closed: true")]
106 EmptyBody,
107
108 #[error("Empty JSON arrays are not permitted for append")]
110 EmptyArray,
111
112 #[error("Invalid header value for {header}: {reason}")]
114 InvalidHeader { header: String, reason: String },
115
116 #[error("Invalid stream name: {0}")]
118 InvalidStreamName(String),
119
120 #[error("Stream-Seq ordering violation: last={last}, received={received}")]
122 SeqOrderingViolation { last: String, received: String },
123
124 #[error("Storage temporarily unavailable: {0}")]
126 Unavailable(StorageFailure),
127
128 #[error("Storage capacity exhausted: {0}")]
130 InsufficientStorage(StorageFailure),
131
132 #[error("Storage error: {0}")]
134 Storage(String),
135}
136
137impl Error {
138 #[must_use]
143 pub fn status_code(&self) -> StatusCode {
144 match self {
145 Self::NotFound(_) | Self::StreamExpired => StatusCode::NOT_FOUND,
146 Self::ConfigMismatch
147 | Self::ContentTypeMismatch { .. }
148 | Self::StreamClosed
149 | Self::SequenceGap { .. }
150 | Self::SeqOrderingViolation { .. } => StatusCode::CONFLICT,
151 Self::EpochFenced { .. } => StatusCode::FORBIDDEN,
152 Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => {
153 StatusCode::PAYLOAD_TOO_LARGE
154 }
155 Self::InvalidOffset(_)
156 | Self::InvalidProducerState(_)
157 | Self::InvalidTtl(_)
158 | Self::ConflictingExpiration
159 | Self::InvalidJson(_)
160 | Self::InvalidHeader { .. }
161 | Self::InvalidStreamName(_)
162 | Self::EmptyBody
163 | Self::EmptyArray => StatusCode::BAD_REQUEST,
164 Self::Unavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
165 Self::InsufficientStorage(_) => {
166 StatusCode::from_u16(507).expect("507 is a valid status code")
167 }
168 Self::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
169 }
170 }
171
172 #[must_use]
173 fn problem_details(&self) -> ProblemDetails {
174 match self {
175 Self::NotFound(name) => ProblemDetails::new(
176 "/errors/not-found",
177 "Stream Not Found",
178 self.status_code(),
179 "NOT_FOUND",
180 )
181 .with_detail(format!("Stream not found: {name}")),
182 Self::ConfigMismatch => ProblemDetails::new(
183 "/errors/already-exists",
184 "Stream Already Exists",
185 self.status_code(),
186 "ALREADY_EXISTS",
187 )
188 .with_detail(self.to_string()),
189 Self::InvalidOffset(_) => ProblemDetails::new(
190 "/errors/invalid-offset",
191 "Invalid Offset",
192 self.status_code(),
193 "INVALID_OFFSET",
194 )
195 .with_detail(self.to_string()),
196 Self::ContentTypeMismatch { .. } => ProblemDetails::new(
197 "/errors/content-type-mismatch",
198 "Content Type Mismatch",
199 self.status_code(),
200 "CONTENT_TYPE_MISMATCH",
201 )
202 .with_detail(self.to_string()),
203 Self::StreamClosed => ProblemDetails::new(
204 "/errors/stream-closed",
205 "Stream Closed",
206 self.status_code(),
207 "STREAM_CLOSED",
208 )
209 .with_detail(self.to_string()),
210 Self::SequenceGap { .. } | Self::SeqOrderingViolation { .. } => ProblemDetails::new(
211 "/errors/sequence-conflict",
212 "Sequence Conflict",
213 self.status_code(),
214 "SEQUENCE_CONFLICT",
215 )
216 .with_detail(self.to_string()),
217 Self::EpochFenced { .. } => ProblemDetails::new(
218 "/errors/producer-epoch-fenced",
219 "Producer Epoch Fenced",
220 self.status_code(),
221 "PRODUCER_EPOCH_FENCED",
222 )
223 .with_detail(self.to_string()),
224 Self::InvalidProducerState(_)
225 | Self::InvalidTtl(_)
226 | Self::ConflictingExpiration
227 | Self::InvalidHeader { .. } => ProblemDetails::new(
228 "/errors/bad-request",
229 "Bad Request",
230 self.status_code(),
231 "BAD_REQUEST",
232 )
233 .with_detail(self.to_string()),
234 Self::InvalidStreamName(_) => ProblemDetails::new(
235 "/errors/invalid-stream-name",
236 "Invalid Stream Name",
237 self.status_code(),
238 "INVALID_STREAM_NAME",
239 )
240 .with_detail(self.to_string()),
241 Self::InvalidJson(_) => ProblemDetails::new(
242 "/errors/invalid-json",
243 "Invalid JSON",
244 self.status_code(),
245 "INVALID_JSON",
246 )
247 .with_detail(self.to_string()),
248 Self::EmptyBody => ProblemDetails::new(
249 "/errors/empty-body",
250 "Empty Body",
251 self.status_code(),
252 "EMPTY_BODY",
253 )
254 .with_detail(self.to_string()),
255 Self::EmptyArray => ProblemDetails::new(
256 "/errors/empty-array",
257 "Empty Array",
258 self.status_code(),
259 "EMPTY_ARRAY",
260 )
261 .with_detail(self.to_string()),
262 Self::MemoryLimitExceeded | Self::StreamSizeLimitExceeded => ProblemDetails::new(
263 "/errors/payload-too-large",
264 "Payload Too Large",
265 self.status_code(),
266 "PAYLOAD_TOO_LARGE",
267 )
268 .with_detail(self.to_string()),
269 Self::Unavailable(_) => ProblemDetails::new(
270 "/errors/unavailable",
271 "Service Unavailable",
272 self.status_code(),
273 "UNAVAILABLE",
274 )
275 .with_detail("The server is temporarily unable to complete the request."),
276 Self::InsufficientStorage(_) => ProblemDetails::new(
277 "/errors/insufficient-storage",
278 "Insufficient Storage",
279 self.status_code(),
280 "INSUFFICIENT_STORAGE",
281 )
282 .with_detail(
283 "The server does not have enough storage capacity to complete the request.",
284 ),
285 Self::StreamExpired => ProblemDetails::new(
286 "/errors/not-found",
287 "Stream Not Found",
288 self.status_code(),
289 "NOT_FOUND",
290 )
291 .with_detail(self.to_string()),
292 Self::Storage(_) => ProblemDetails::new(
293 "/errors/internal",
294 "Internal Server Error",
295 self.status_code(),
296 "INTERNAL_ERROR",
297 )
298 .with_detail("The server encountered an internal error."),
299 }
300 }
301
302 #[must_use]
303 pub fn storage_unavailable(
304 backend: &'static str,
305 operation: impl Into<String>,
306 detail: impl Into<String>,
307 ) -> Self {
308 Self::Unavailable(StorageFailure {
309 class: StorageFailureClass::Unavailable,
310 backend,
311 operation: operation.into(),
312 detail: detail.into(),
313 retry_after_secs: Some(DEFAULT_STORAGE_RETRY_AFTER_SECS),
314 })
315 }
316
317 #[must_use]
318 pub fn storage_insufficient(
319 backend: &'static str,
320 operation: impl Into<String>,
321 detail: impl Into<String>,
322 ) -> Self {
323 Self::InsufficientStorage(StorageFailure {
324 class: StorageFailureClass::InsufficientStorage,
325 backend,
326 operation: operation.into(),
327 detail: detail.into(),
328 retry_after_secs: None,
329 })
330 }
331
332 #[must_use]
333 pub fn classify_io_failure(
334 backend: &'static str,
335 operation: impl Into<String>,
336 detail: impl Into<String>,
337 error: &io::Error,
338 ) -> Self {
339 match error.kind() {
340 io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
341 Self::storage_unavailable(backend, operation, detail)
342 }
343 io::ErrorKind::StorageFull
344 | io::ErrorKind::QuotaExceeded
345 | io::ErrorKind::FileTooLarge => Self::storage_insufficient(backend, operation, detail),
346 _ => Self::Storage(detail.into()),
347 }
348 }
349
350 #[must_use]
351 pub fn is_retryable_io_error(error: &io::Error) -> bool {
352 matches!(
353 error.kind(),
354 io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
355 )
356 }
357
358 #[must_use]
364 fn telemetry(&self) -> Option<ProblemTelemetry> {
365 match self {
366 Self::Unavailable(failure) | Self::InsufficientStorage(failure) => {
367 let mut t = ProblemTelemetry::from(&self.problem_details());
368 t.error_class = Some(failure.class.as_str().to_string());
369 t.storage_backend = Some(failure.backend.to_string());
370 t.storage_operation = Some(failure.operation.clone());
371 t.internal_detail = Some(failure.detail.clone());
372 if let Self::Unavailable(f) = self {
373 t.retry_after_secs = f.retry_after_secs;
374 }
375 Some(t)
376 }
377 Self::Storage(detail) => {
378 let mut t = ProblemTelemetry::from(&self.problem_details());
379 t.error_class = Some("internal".to_string());
380 t.internal_detail = Some(detail.clone());
381 Some(t)
382 }
383 _ => None,
384 }
385 }
386}
387
388pub type Result<T> = std::result::Result<T, Error>;
390
391impl From<Error> for ProblemResponse {
392 fn from(error: Error) -> Self {
393 let problem = error.problem_details();
394 let telemetry = error.telemetry();
395 let mut response = ProblemResponse::new(problem);
396
397 if let Some(retry_after_secs) = match &error {
398 Error::Unavailable(failure) => failure.retry_after_secs,
399 _ => None,
400 } {
401 response = response.with_header(
402 RETRY_AFTER,
403 HeaderValue::from_str(&retry_after_secs.to_string())
404 .expect("retry-after header value must be valid"),
405 );
406 }
407
408 if let Some(telemetry) = telemetry {
409 response = response.with_telemetry(telemetry);
410 }
411
412 response
413 }
414}
415
416impl axum::response::IntoResponse for Error {
418 fn into_response(self) -> axum::response::Response {
419 ProblemResponse::from(self).into_response()
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::Error;
426 use axum::http::HeaderValue;
427 use axum::response::IntoResponse;
428 use std::io;
429
430 #[test]
431 fn classify_io_failure_maps_transient_errors_to_503() {
432 let error = io::Error::new(io::ErrorKind::TimedOut, "backend timed out");
433 let response = Error::classify_io_failure(
434 "file",
435 "append stream log",
436 "failed to append stream log: backend timed out",
437 &error,
438 )
439 .into_response();
440
441 assert_eq!(
442 response.status(),
443 axum::http::StatusCode::SERVICE_UNAVAILABLE
444 );
445 assert_eq!(
446 response.headers().get("retry-after").unwrap(),
447 &HeaderValue::from_static("1")
448 );
449 }
450
451 #[test]
452 fn classify_io_failure_maps_capacity_errors_to_507() {
453 let error = io::Error::new(io::ErrorKind::StorageFull, "disk full");
454 let response = Error::classify_io_failure(
455 "file",
456 "sync stream log",
457 "failed to sync stream log: disk full",
458 &error,
459 )
460 .into_response();
461
462 assert_eq!(
463 response.status(),
464 axum::http::StatusCode::from_u16(507).unwrap()
465 );
466 assert!(response.headers().get("retry-after").is_none());
467 }
468}