Skip to main content

allsource_core/
error.rs

1use axum::{
2    http::StatusCode,
3    response::{IntoResponse, Response},
4};
5
6/// AllSource error types
7#[derive(Debug, thiserror::Error)]
8pub enum AllSourceError {
9    #[error("Event not found: {0}")]
10    EventNotFound(String),
11
12    #[error("Entity not found: {0}")]
13    EntityNotFound(String),
14
15    #[error("Tenant already exists: {0}")]
16    TenantAlreadyExists(String),
17
18    #[error("Tenant not found: {0}")]
19    TenantNotFound(String),
20
21    #[error("Invalid event: {0}")]
22    InvalidEvent(String),
23
24    #[error("Invalid query: {0}")]
25    InvalidQuery(String),
26
27    #[error("Invalid input: {0}")]
28    InvalidInput(String),
29
30    #[error("Storage error: {0}")]
31    StorageError(String),
32
33    #[error("Serialization error: {0}")]
34    SerializationError(#[from] serde_json::Error),
35
36    #[error("Arrow error: {0}")]
37    ArrowError(String),
38
39    #[error("Index error: {0}")]
40    IndexError(String),
41
42    #[error("Validation error: {0}")]
43    ValidationError(String),
44
45    #[error("Concurrency error: {0}")]
46    ConcurrencyError(String),
47
48    #[error("Queue full: {0}")]
49    QueueFull(String),
50
51    #[error("Internal error: {0}")]
52    InternalError(String),
53}
54
55impl AllSourceError {
56    /// Returns true for transient errors that may succeed on retry
57    /// (storage I/O, concurrency conflicts, queue pressure).
58    pub fn is_retryable(&self) -> bool {
59        matches!(
60            self,
61            AllSourceError::StorageError(_)
62                | AllSourceError::ConcurrencyError(_)
63                | AllSourceError::QueueFull(_)
64        )
65    }
66}
67
68// Alias for domain layer convenience
69pub use AllSourceError as Error;
70
71impl From<arrow::error::ArrowError> for AllSourceError {
72    fn from(err: arrow::error::ArrowError) -> Self {
73        AllSourceError::ArrowError(err.to_string())
74    }
75}
76
77impl From<parquet::errors::ParquetError> for AllSourceError {
78    fn from(err: parquet::errors::ParquetError) -> Self {
79        AllSourceError::StorageError(err.to_string())
80    }
81}
82
83impl From<crate::infrastructure::persistence::SimdJsonError> for AllSourceError {
84    fn from(err: crate::infrastructure::persistence::SimdJsonError) -> Self {
85        AllSourceError::SerializationError(serde_json::Error::io(std::io::Error::new(
86            std::io::ErrorKind::InvalidData,
87            err.to_string(),
88        )))
89    }
90}
91
92#[cfg(feature = "postgres")]
93impl From<sqlx::Error> for AllSourceError {
94    fn from(err: sqlx::Error) -> Self {
95        AllSourceError::StorageError(format!("Database error: {err}"))
96    }
97}
98
99/// Custom Result type for AllSource operations
100pub type Result<T> = std::result::Result<T, AllSourceError>;
101
102/// Implement IntoResponse for axum error handling
103impl IntoResponse for AllSourceError {
104    fn into_response(self) -> Response {
105        let (status, error_message) = match self {
106            AllSourceError::EventNotFound(_)
107            | AllSourceError::EntityNotFound(_)
108            | AllSourceError::TenantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
109            AllSourceError::InvalidEvent(_)
110            | AllSourceError::InvalidQuery(_)
111            | AllSourceError::InvalidInput(_)
112            | AllSourceError::ValidationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
113            AllSourceError::TenantAlreadyExists(_) | AllSourceError::ConcurrencyError(_) => {
114                (StatusCode::CONFLICT, self.to_string())
115            }
116            AllSourceError::QueueFull(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
117            AllSourceError::StorageError(_)
118            | AllSourceError::ArrowError(_)
119            | AllSourceError::IndexError(_)
120            | AllSourceError::InternalError(_) => {
121                (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
122            }
123            AllSourceError::SerializationError(_) => {
124                (StatusCode::UNPROCESSABLE_ENTITY, self.to_string())
125            }
126        };
127
128        let body = serde_json::json!({
129            "error": error_message,
130        });
131
132        (status, axum::Json(body)).into_response()
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_error_display() {
142        let err = AllSourceError::EventNotFound("event-123".to_string());
143        assert_eq!(err.to_string(), "Event not found: event-123");
144
145        let err = AllSourceError::EntityNotFound("entity-456".to_string());
146        assert_eq!(err.to_string(), "Entity not found: entity-456");
147
148        let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
149        assert_eq!(err.to_string(), "Tenant already exists: tenant-1");
150
151        let err = AllSourceError::TenantNotFound("tenant-2".to_string());
152        assert_eq!(err.to_string(), "Tenant not found: tenant-2");
153    }
154
155    #[test]
156    fn test_error_variants() {
157        let errors: Vec<AllSourceError> = vec![
158            AllSourceError::InvalidEvent("bad event".to_string()),
159            AllSourceError::InvalidQuery("bad query".to_string()),
160            AllSourceError::InvalidInput("bad input".to_string()),
161            AllSourceError::StorageError("storage failed".to_string()),
162            AllSourceError::ArrowError("arrow failed".to_string()),
163            AllSourceError::IndexError("index failed".to_string()),
164            AllSourceError::ValidationError("validation failed".to_string()),
165            AllSourceError::ConcurrencyError("conflict".to_string()),
166            AllSourceError::QueueFull("queue full".to_string()),
167            AllSourceError::InternalError("internal error".to_string()),
168        ];
169
170        for err in errors {
171            let msg = err.to_string();
172            assert!(!msg.is_empty());
173        }
174    }
175
176    #[test]
177    fn test_into_response_not_found() {
178        let err = AllSourceError::EventNotFound("event-123".to_string());
179        let response = err.into_response();
180        assert_eq!(response.status(), StatusCode::NOT_FOUND);
181
182        let err = AllSourceError::EntityNotFound("entity-456".to_string());
183        let response = err.into_response();
184        assert_eq!(response.status(), StatusCode::NOT_FOUND);
185
186        let err = AllSourceError::TenantNotFound("tenant-1".to_string());
187        let response = err.into_response();
188        assert_eq!(response.status(), StatusCode::NOT_FOUND);
189    }
190
191    #[test]
192    fn test_into_response_bad_request() {
193        let err = AllSourceError::InvalidEvent("bad event".to_string());
194        let response = err.into_response();
195        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
196
197        let err = AllSourceError::InvalidQuery("bad query".to_string());
198        let response = err.into_response();
199        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
200
201        let err = AllSourceError::InvalidInput("bad input".to_string());
202        let response = err.into_response();
203        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
204
205        let err = AllSourceError::ValidationError("validation failed".to_string());
206        let response = err.into_response();
207        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
208    }
209
210    #[test]
211    fn test_into_response_conflict() {
212        let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
213        let response = err.into_response();
214        assert_eq!(response.status(), StatusCode::CONFLICT);
215
216        let err = AllSourceError::ConcurrencyError("conflict".to_string());
217        let response = err.into_response();
218        assert_eq!(response.status(), StatusCode::CONFLICT);
219    }
220
221    #[test]
222    fn test_into_response_service_unavailable() {
223        let err = AllSourceError::QueueFull("queue is full".to_string());
224        let response = err.into_response();
225        assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
226    }
227
228    #[test]
229    fn test_into_response_internal_error() {
230        let err = AllSourceError::StorageError("storage error".to_string());
231        let response = err.into_response();
232        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
233
234        let err = AllSourceError::ArrowError("arrow error".to_string());
235        let response = err.into_response();
236        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
237
238        let err = AllSourceError::IndexError("index error".to_string());
239        let response = err.into_response();
240        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
241
242        let err = AllSourceError::InternalError("internal error".to_string());
243        let response = err.into_response();
244        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
245    }
246
247    #[test]
248    fn test_from_arrow_error() {
249        let arrow_err = arrow::error::ArrowError::InvalidArgumentError("test".to_string());
250        let err: AllSourceError = arrow_err.into();
251        assert!(matches!(err, AllSourceError::ArrowError(_)));
252    }
253
254    #[test]
255    fn test_from_parquet_error() {
256        let parquet_err = parquet::errors::ParquetError::General("test".to_string());
257        let err: AllSourceError = parquet_err.into();
258        assert!(matches!(err, AllSourceError::StorageError(_)));
259    }
260
261    #[test]
262    fn test_error_debug() {
263        let err = AllSourceError::EventNotFound("test".to_string());
264        let debug_str = format!("{:?}", err);
265        assert!(debug_str.contains("EventNotFound"));
266    }
267
268    #[test]
269    fn test_is_retryable() {
270        // Retryable errors
271        assert!(AllSourceError::StorageError("io".to_string()).is_retryable());
272        assert!(AllSourceError::ConcurrencyError("conflict".to_string()).is_retryable());
273        assert!(AllSourceError::QueueFull("backpressure".to_string()).is_retryable());
274
275        // Non-retryable errors
276        assert!(!AllSourceError::EventNotFound("e1".to_string()).is_retryable());
277        assert!(!AllSourceError::InvalidEvent("bad".to_string()).is_retryable());
278        assert!(!AllSourceError::InvalidQuery("bad".to_string()).is_retryable());
279        assert!(!AllSourceError::ValidationError("bad".to_string()).is_retryable());
280        assert!(!AllSourceError::TenantNotFound("t1".to_string()).is_retryable());
281        assert!(!AllSourceError::TenantAlreadyExists("t1".to_string()).is_retryable());
282        assert!(!AllSourceError::InternalError("bug".to_string()).is_retryable());
283    }
284}