Skip to main content

allsource_core/
error.rs

1/// AllSource error types
2#[derive(Debug, thiserror::Error)]
3pub enum AllSourceError {
4    #[error("Event not found: {0}")]
5    EventNotFound(String),
6
7    #[error("Entity not found: {0}")]
8    EntityNotFound(String),
9
10    #[error("Tenant already exists: {0}")]
11    TenantAlreadyExists(String),
12
13    #[error("Tenant not found: {0}")]
14    TenantNotFound(String),
15
16    #[error("Invalid event: {0}")]
17    InvalidEvent(String),
18
19    #[error("Invalid query: {0}")]
20    InvalidQuery(String),
21
22    #[error("Invalid input: {0}")]
23    InvalidInput(String),
24
25    #[error("Storage error: {0}")]
26    StorageError(String),
27
28    #[error("Serialization error: {0}")]
29    SerializationError(#[from] serde_json::Error),
30
31    #[error("Arrow error: {0}")]
32    ArrowError(String),
33
34    #[error("Index error: {0}")]
35    IndexError(String),
36
37    #[error("Validation error: {0}")]
38    ValidationError(String),
39
40    #[error("Concurrency error: {0}")]
41    ConcurrencyError(String),
42
43    #[error("Version conflict: expected {expected}, current {current}")]
44    VersionConflict { expected: u64, current: u64 },
45
46    #[error("Queue full: {0}")]
47    QueueFull(String),
48
49    #[error("Internal error: {0}")]
50    InternalError(String),
51}
52
53impl AllSourceError {
54    /// Returns true for transient errors that may succeed on retry
55    /// (storage I/O, concurrency conflicts, queue pressure).
56    pub fn is_retryable(&self) -> bool {
57        matches!(
58            self,
59            AllSourceError::StorageError(_)
60                | AllSourceError::ConcurrencyError(_)
61                | AllSourceError::VersionConflict { .. }
62                | AllSourceError::QueueFull(_)
63        )
64    }
65}
66
67// Alias for domain layer convenience
68pub use AllSourceError as Error;
69
70impl From<arrow::error::ArrowError> for AllSourceError {
71    fn from(err: arrow::error::ArrowError) -> Self {
72        AllSourceError::ArrowError(err.to_string())
73    }
74}
75
76impl From<parquet::errors::ParquetError> for AllSourceError {
77    fn from(err: parquet::errors::ParquetError) -> Self {
78        AllSourceError::StorageError(err.to_string())
79    }
80}
81
82impl From<crate::infrastructure::persistence::SimdJsonError> for AllSourceError {
83    fn from(err: crate::infrastructure::persistence::SimdJsonError) -> Self {
84        AllSourceError::SerializationError(serde_json::Error::io(std::io::Error::new(
85            std::io::ErrorKind::InvalidData,
86            err.to_string(),
87        )))
88    }
89}
90
91#[cfg(feature = "postgres")]
92impl From<sqlx::Error> for AllSourceError {
93    fn from(err: sqlx::Error) -> Self {
94        AllSourceError::StorageError(format!("Database error: {err}"))
95    }
96}
97
98/// Custom Result type for AllSource operations
99pub type Result<T> = std::result::Result<T, AllSourceError>;
100
101#[cfg(feature = "server")]
102mod axum_impl {
103    use super::AllSourceError;
104    use axum::{
105        http::StatusCode,
106        response::{IntoResponse, Response},
107    };
108
109    /// Implement IntoResponse for axum error handling
110    impl IntoResponse for AllSourceError {
111        fn into_response(self) -> Response {
112            let (status, error_message) = match self {
113                AllSourceError::EventNotFound(_)
114                | AllSourceError::EntityNotFound(_)
115                | AllSourceError::TenantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
116                AllSourceError::InvalidEvent(_)
117                | AllSourceError::InvalidQuery(_)
118                | AllSourceError::InvalidInput(_)
119                | AllSourceError::ValidationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
120                AllSourceError::VersionConflict { expected, current } => {
121                    let body = serde_json::json!({
122                        "error": "version_conflict",
123                        "expected_version": expected,
124                        "current_version": current,
125                    });
126                    return (StatusCode::CONFLICT, axum::Json(body)).into_response();
127                }
128                AllSourceError::TenantAlreadyExists(_) | AllSourceError::ConcurrencyError(_) => {
129                    (StatusCode::CONFLICT, self.to_string())
130                }
131                AllSourceError::QueueFull(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
132                AllSourceError::StorageError(_)
133                | AllSourceError::ArrowError(_)
134                | AllSourceError::IndexError(_)
135                | AllSourceError::InternalError(_) => {
136                    (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
137                }
138                AllSourceError::SerializationError(_) => {
139                    (StatusCode::UNPROCESSABLE_ENTITY, self.to_string())
140                }
141            };
142
143            let body = serde_json::json!({
144                "error": error_message,
145            });
146
147            (status, axum::Json(body)).into_response()
148        }
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[cfg(feature = "server")]
157    use axum::{http::StatusCode, response::IntoResponse};
158
159    #[test]
160    fn test_error_display() {
161        let err = AllSourceError::EventNotFound("event-123".to_string());
162        assert_eq!(err.to_string(), "Event not found: event-123");
163
164        let err = AllSourceError::EntityNotFound("entity-456".to_string());
165        assert_eq!(err.to_string(), "Entity not found: entity-456");
166
167        let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
168        assert_eq!(err.to_string(), "Tenant already exists: tenant-1");
169
170        let err = AllSourceError::TenantNotFound("tenant-2".to_string());
171        assert_eq!(err.to_string(), "Tenant not found: tenant-2");
172    }
173
174    #[test]
175    fn test_error_variants() {
176        let errors: Vec<AllSourceError> = vec![
177            AllSourceError::InvalidEvent("bad event".to_string()),
178            AllSourceError::InvalidQuery("bad query".to_string()),
179            AllSourceError::InvalidInput("bad input".to_string()),
180            AllSourceError::StorageError("storage failed".to_string()),
181            AllSourceError::ArrowError("arrow failed".to_string()),
182            AllSourceError::IndexError("index failed".to_string()),
183            AllSourceError::ValidationError("validation failed".to_string()),
184            AllSourceError::ConcurrencyError("conflict".to_string()),
185            AllSourceError::QueueFull("queue full".to_string()),
186            AllSourceError::InternalError("internal error".to_string()),
187        ];
188
189        for err in errors {
190            let msg = err.to_string();
191            assert!(!msg.is_empty());
192        }
193    }
194
195    #[cfg(feature = "server")]
196    #[test]
197    fn test_into_response_not_found() {
198        let err = AllSourceError::EventNotFound("event-123".to_string());
199        let response = err.into_response();
200        assert_eq!(response.status(), StatusCode::NOT_FOUND);
201
202        let err = AllSourceError::EntityNotFound("entity-456".to_string());
203        let response = err.into_response();
204        assert_eq!(response.status(), StatusCode::NOT_FOUND);
205
206        let err = AllSourceError::TenantNotFound("tenant-1".to_string());
207        let response = err.into_response();
208        assert_eq!(response.status(), StatusCode::NOT_FOUND);
209    }
210
211    #[cfg(feature = "server")]
212    #[test]
213    fn test_into_response_bad_request() {
214        let err = AllSourceError::InvalidEvent("bad event".to_string());
215        let response = err.into_response();
216        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
217
218        let err = AllSourceError::InvalidQuery("bad query".to_string());
219        let response = err.into_response();
220        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
221
222        let err = AllSourceError::InvalidInput("bad input".to_string());
223        let response = err.into_response();
224        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
225
226        let err = AllSourceError::ValidationError("validation failed".to_string());
227        let response = err.into_response();
228        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
229    }
230
231    #[cfg(feature = "server")]
232    #[test]
233    fn test_into_response_conflict() {
234        let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
235        let response = err.into_response();
236        assert_eq!(response.status(), StatusCode::CONFLICT);
237
238        let err = AllSourceError::ConcurrencyError("conflict".to_string());
239        let response = err.into_response();
240        assert_eq!(response.status(), StatusCode::CONFLICT);
241    }
242
243    #[cfg(feature = "server")]
244    #[test]
245    fn test_into_response_service_unavailable() {
246        let err = AllSourceError::QueueFull("queue is full".to_string());
247        let response = err.into_response();
248        assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
249    }
250
251    #[cfg(feature = "server")]
252    #[test]
253    fn test_into_response_internal_error() {
254        let err = AllSourceError::StorageError("storage error".to_string());
255        let response = err.into_response();
256        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
257
258        let err = AllSourceError::ArrowError("arrow error".to_string());
259        let response = err.into_response();
260        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
261
262        let err = AllSourceError::IndexError("index error".to_string());
263        let response = err.into_response();
264        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
265
266        let err = AllSourceError::InternalError("internal error".to_string());
267        let response = err.into_response();
268        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
269    }
270
271    #[test]
272    fn test_from_arrow_error() {
273        let arrow_err = arrow::error::ArrowError::InvalidArgumentError("test".to_string());
274        let err: AllSourceError = arrow_err.into();
275        assert!(matches!(err, AllSourceError::ArrowError(_)));
276    }
277
278    #[test]
279    fn test_from_parquet_error() {
280        let parquet_err = parquet::errors::ParquetError::General("test".to_string());
281        let err: AllSourceError = parquet_err.into();
282        assert!(matches!(err, AllSourceError::StorageError(_)));
283    }
284
285    #[test]
286    fn test_error_debug() {
287        let err = AllSourceError::EventNotFound("test".to_string());
288        let debug_str = format!("{err:?}");
289        assert!(debug_str.contains("EventNotFound"));
290    }
291
292    #[test]
293    fn test_is_retryable() {
294        // Retryable errors
295        assert!(AllSourceError::StorageError("io".to_string()).is_retryable());
296        assert!(AllSourceError::ConcurrencyError("conflict".to_string()).is_retryable());
297        assert!(AllSourceError::QueueFull("backpressure".to_string()).is_retryable());
298
299        // Non-retryable errors
300        assert!(!AllSourceError::EventNotFound("e1".to_string()).is_retryable());
301        assert!(!AllSourceError::InvalidEvent("bad".to_string()).is_retryable());
302        assert!(!AllSourceError::InvalidQuery("bad".to_string()).is_retryable());
303        assert!(!AllSourceError::ValidationError("bad".to_string()).is_retryable());
304        assert!(!AllSourceError::TenantNotFound("t1".to_string()).is_retryable());
305        assert!(!AllSourceError::TenantAlreadyExists("t1".to_string()).is_retryable());
306        assert!(!AllSourceError::InternalError("bug".to_string()).is_retryable());
307    }
308}