Skip to main content

allsource_core/
error.rs

1/// AllSource error types
2#[derive(Debug, thiserror::Error)]
3pub enum AllSourceError {
4    #[error("Event not found: {0}")]
5    EventNotFound(String),
6
7    #[error("Entity not found: {0}")]
8    EntityNotFound(String),
9
10    #[error("Tenant already exists: {0}")]
11    TenantAlreadyExists(String),
12
13    #[error("Tenant not found: {0}")]
14    TenantNotFound(String),
15
16    #[error("Invalid event: {0}")]
17    InvalidEvent(String),
18
19    #[error("Invalid query: {0}")]
20    InvalidQuery(String),
21
22    #[error("Invalid input: {0}")]
23    InvalidInput(String),
24
25    #[error("Storage error: {0}")]
26    StorageError(String),
27
28    #[error("Serialization error: {0}")]
29    SerializationError(#[from] serde_json::Error),
30
31    #[error("Arrow error: {0}")]
32    ArrowError(String),
33
34    #[error("Index error: {0}")]
35    IndexError(String),
36
37    #[error("Validation error: {0}")]
38    ValidationError(String),
39
40    #[error("Concurrency error: {0}")]
41    ConcurrencyError(String),
42
43    #[error("Queue full: {0}")]
44    QueueFull(String),
45
46    #[error("Internal error: {0}")]
47    InternalError(String),
48}
49
50impl AllSourceError {
51    /// Returns true for transient errors that may succeed on retry
52    /// (storage I/O, concurrency conflicts, queue pressure).
53    pub fn is_retryable(&self) -> bool {
54        matches!(
55            self,
56            AllSourceError::StorageError(_)
57                | AllSourceError::ConcurrencyError(_)
58                | AllSourceError::QueueFull(_)
59        )
60    }
61}
62
63// Alias for domain layer convenience
64pub use AllSourceError as Error;
65
66impl From<arrow::error::ArrowError> for AllSourceError {
67    fn from(err: arrow::error::ArrowError) -> Self {
68        AllSourceError::ArrowError(err.to_string())
69    }
70}
71
72impl From<parquet::errors::ParquetError> for AllSourceError {
73    fn from(err: parquet::errors::ParquetError) -> Self {
74        AllSourceError::StorageError(err.to_string())
75    }
76}
77
78impl From<crate::infrastructure::persistence::SimdJsonError> for AllSourceError {
79    fn from(err: crate::infrastructure::persistence::SimdJsonError) -> Self {
80        AllSourceError::SerializationError(serde_json::Error::io(std::io::Error::new(
81            std::io::ErrorKind::InvalidData,
82            err.to_string(),
83        )))
84    }
85}
86
87#[cfg(feature = "postgres")]
88impl From<sqlx::Error> for AllSourceError {
89    fn from(err: sqlx::Error) -> Self {
90        AllSourceError::StorageError(format!("Database error: {err}"))
91    }
92}
93
94/// Custom Result type for AllSource operations
95pub type Result<T> = std::result::Result<T, AllSourceError>;
96
97#[cfg(feature = "server")]
98mod axum_impl {
99    use super::AllSourceError;
100    use axum::{
101        http::StatusCode,
102        response::{IntoResponse, Response},
103    };
104
105    /// Implement IntoResponse for axum error handling
106    impl IntoResponse for AllSourceError {
107        fn into_response(self) -> Response {
108            let (status, error_message) = match self {
109                AllSourceError::EventNotFound(_)
110                | AllSourceError::EntityNotFound(_)
111                | AllSourceError::TenantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
112                AllSourceError::InvalidEvent(_)
113                | AllSourceError::InvalidQuery(_)
114                | AllSourceError::InvalidInput(_)
115                | AllSourceError::ValidationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
116                AllSourceError::TenantAlreadyExists(_) | AllSourceError::ConcurrencyError(_) => {
117                    (StatusCode::CONFLICT, self.to_string())
118                }
119                AllSourceError::QueueFull(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
120                AllSourceError::StorageError(_)
121                | AllSourceError::ArrowError(_)
122                | AllSourceError::IndexError(_)
123                | AllSourceError::InternalError(_) => {
124                    (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
125                }
126                AllSourceError::SerializationError(_) => {
127                    (StatusCode::UNPROCESSABLE_ENTITY, self.to_string())
128                }
129            };
130
131            let body = serde_json::json!({
132                "error": error_message,
133            });
134
135            (status, axum::Json(body)).into_response()
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    #[cfg(feature = "server")]
145    use axum::{http::StatusCode, response::IntoResponse};
146
147    #[test]
148    fn test_error_display() {
149        let err = AllSourceError::EventNotFound("event-123".to_string());
150        assert_eq!(err.to_string(), "Event not found: event-123");
151
152        let err = AllSourceError::EntityNotFound("entity-456".to_string());
153        assert_eq!(err.to_string(), "Entity not found: entity-456");
154
155        let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
156        assert_eq!(err.to_string(), "Tenant already exists: tenant-1");
157
158        let err = AllSourceError::TenantNotFound("tenant-2".to_string());
159        assert_eq!(err.to_string(), "Tenant not found: tenant-2");
160    }
161
162    #[test]
163    fn test_error_variants() {
164        let errors: Vec<AllSourceError> = vec![
165            AllSourceError::InvalidEvent("bad event".to_string()),
166            AllSourceError::InvalidQuery("bad query".to_string()),
167            AllSourceError::InvalidInput("bad input".to_string()),
168            AllSourceError::StorageError("storage failed".to_string()),
169            AllSourceError::ArrowError("arrow failed".to_string()),
170            AllSourceError::IndexError("index failed".to_string()),
171            AllSourceError::ValidationError("validation failed".to_string()),
172            AllSourceError::ConcurrencyError("conflict".to_string()),
173            AllSourceError::QueueFull("queue full".to_string()),
174            AllSourceError::InternalError("internal error".to_string()),
175        ];
176
177        for err in errors {
178            let msg = err.to_string();
179            assert!(!msg.is_empty());
180        }
181    }
182
183    #[cfg(feature = "server")]
184    #[test]
185    fn test_into_response_not_found() {
186        let err = AllSourceError::EventNotFound("event-123".to_string());
187        let response = err.into_response();
188        assert_eq!(response.status(), StatusCode::NOT_FOUND);
189
190        let err = AllSourceError::EntityNotFound("entity-456".to_string());
191        let response = err.into_response();
192        assert_eq!(response.status(), StatusCode::NOT_FOUND);
193
194        let err = AllSourceError::TenantNotFound("tenant-1".to_string());
195        let response = err.into_response();
196        assert_eq!(response.status(), StatusCode::NOT_FOUND);
197    }
198
199    #[cfg(feature = "server")]
200    #[test]
201    fn test_into_response_bad_request() {
202        let err = AllSourceError::InvalidEvent("bad event".to_string());
203        let response = err.into_response();
204        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
205
206        let err = AllSourceError::InvalidQuery("bad query".to_string());
207        let response = err.into_response();
208        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
209
210        let err = AllSourceError::InvalidInput("bad input".to_string());
211        let response = err.into_response();
212        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
213
214        let err = AllSourceError::ValidationError("validation failed".to_string());
215        let response = err.into_response();
216        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
217    }
218
219    #[cfg(feature = "server")]
220    #[test]
221    fn test_into_response_conflict() {
222        let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
223        let response = err.into_response();
224        assert_eq!(response.status(), StatusCode::CONFLICT);
225
226        let err = AllSourceError::ConcurrencyError("conflict".to_string());
227        let response = err.into_response();
228        assert_eq!(response.status(), StatusCode::CONFLICT);
229    }
230
231    #[cfg(feature = "server")]
232    #[test]
233    fn test_into_response_service_unavailable() {
234        let err = AllSourceError::QueueFull("queue is full".to_string());
235        let response = err.into_response();
236        assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
237    }
238
239    #[cfg(feature = "server")]
240    #[test]
241    fn test_into_response_internal_error() {
242        let err = AllSourceError::StorageError("storage error".to_string());
243        let response = err.into_response();
244        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
245
246        let err = AllSourceError::ArrowError("arrow error".to_string());
247        let response = err.into_response();
248        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
249
250        let err = AllSourceError::IndexError("index error".to_string());
251        let response = err.into_response();
252        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
253
254        let err = AllSourceError::InternalError("internal error".to_string());
255        let response = err.into_response();
256        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
257    }
258
259    #[test]
260    fn test_from_arrow_error() {
261        let arrow_err = arrow::error::ArrowError::InvalidArgumentError("test".to_string());
262        let err: AllSourceError = arrow_err.into();
263        assert!(matches!(err, AllSourceError::ArrowError(_)));
264    }
265
266    #[test]
267    fn test_from_parquet_error() {
268        let parquet_err = parquet::errors::ParquetError::General("test".to_string());
269        let err: AllSourceError = parquet_err.into();
270        assert!(matches!(err, AllSourceError::StorageError(_)));
271    }
272
273    #[test]
274    fn test_error_debug() {
275        let err = AllSourceError::EventNotFound("test".to_string());
276        let debug_str = format!("{:?}", err);
277        assert!(debug_str.contains("EventNotFound"));
278    }
279
280    #[test]
281    fn test_is_retryable() {
282        // Retryable errors
283        assert!(AllSourceError::StorageError("io".to_string()).is_retryable());
284        assert!(AllSourceError::ConcurrencyError("conflict".to_string()).is_retryable());
285        assert!(AllSourceError::QueueFull("backpressure".to_string()).is_retryable());
286
287        // Non-retryable errors
288        assert!(!AllSourceError::EventNotFound("e1".to_string()).is_retryable());
289        assert!(!AllSourceError::InvalidEvent("bad".to_string()).is_retryable());
290        assert!(!AllSourceError::InvalidQuery("bad".to_string()).is_retryable());
291        assert!(!AllSourceError::ValidationError("bad".to_string()).is_retryable());
292        assert!(!AllSourceError::TenantNotFound("t1".to_string()).is_retryable());
293        assert!(!AllSourceError::TenantAlreadyExists("t1".to_string()).is_retryable());
294        assert!(!AllSourceError::InternalError("bug".to_string()).is_retryable());
295    }
296}