1#[derive(Debug, thiserror::Error)]
3pub enum AllSourceError {
4 #[error("Event not found: {0}")]
5 EventNotFound(String),
6
7 #[error("Entity not found: {0}")]
8 EntityNotFound(String),
9
10 #[error("Tenant already exists: {0}")]
11 TenantAlreadyExists(String),
12
13 #[error("Tenant not found: {0}")]
14 TenantNotFound(String),
15
16 #[error("Invalid event: {0}")]
17 InvalidEvent(String),
18
19 #[error("Invalid query: {0}")]
20 InvalidQuery(String),
21
22 #[error("Invalid input: {0}")]
23 InvalidInput(String),
24
25 #[error("Storage error: {0}")]
26 StorageError(String),
27
28 #[error("Serialization error: {0}")]
29 SerializationError(#[from] serde_json::Error),
30
31 #[error("Arrow error: {0}")]
32 ArrowError(String),
33
34 #[error("Index error: {0}")]
35 IndexError(String),
36
37 #[error("Validation error: {0}")]
38 ValidationError(String),
39
40 #[error("Concurrency error: {0}")]
41 ConcurrencyError(String),
42
43 #[error("Queue full: {0}")]
44 QueueFull(String),
45
46 #[error("Internal error: {0}")]
47 InternalError(String),
48}
49
50impl AllSourceError {
51 pub fn is_retryable(&self) -> bool {
54 matches!(
55 self,
56 AllSourceError::StorageError(_)
57 | AllSourceError::ConcurrencyError(_)
58 | AllSourceError::QueueFull(_)
59 )
60 }
61}
62
63pub use AllSourceError as Error;
65
66impl From<arrow::error::ArrowError> for AllSourceError {
67 fn from(err: arrow::error::ArrowError) -> Self {
68 AllSourceError::ArrowError(err.to_string())
69 }
70}
71
72impl From<parquet::errors::ParquetError> for AllSourceError {
73 fn from(err: parquet::errors::ParquetError) -> Self {
74 AllSourceError::StorageError(err.to_string())
75 }
76}
77
78impl From<crate::infrastructure::persistence::SimdJsonError> for AllSourceError {
79 fn from(err: crate::infrastructure::persistence::SimdJsonError) -> Self {
80 AllSourceError::SerializationError(serde_json::Error::io(std::io::Error::new(
81 std::io::ErrorKind::InvalidData,
82 err.to_string(),
83 )))
84 }
85}
86
87#[cfg(feature = "postgres")]
88impl From<sqlx::Error> for AllSourceError {
89 fn from(err: sqlx::Error) -> Self {
90 AllSourceError::StorageError(format!("Database error: {err}"))
91 }
92}
93
94pub type Result<T> = std::result::Result<T, AllSourceError>;
96
97#[cfg(feature = "server")]
98mod axum_impl {
99 use super::AllSourceError;
100 use axum::{
101 http::StatusCode,
102 response::{IntoResponse, Response},
103 };
104
105 impl IntoResponse for AllSourceError {
107 fn into_response(self) -> Response {
108 let (status, error_message) = match self {
109 AllSourceError::EventNotFound(_)
110 | AllSourceError::EntityNotFound(_)
111 | AllSourceError::TenantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
112 AllSourceError::InvalidEvent(_)
113 | AllSourceError::InvalidQuery(_)
114 | AllSourceError::InvalidInput(_)
115 | AllSourceError::ValidationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
116 AllSourceError::TenantAlreadyExists(_) | AllSourceError::ConcurrencyError(_) => {
117 (StatusCode::CONFLICT, self.to_string())
118 }
119 AllSourceError::QueueFull(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
120 AllSourceError::StorageError(_)
121 | AllSourceError::ArrowError(_)
122 | AllSourceError::IndexError(_)
123 | AllSourceError::InternalError(_) => {
124 (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
125 }
126 AllSourceError::SerializationError(_) => {
127 (StatusCode::UNPROCESSABLE_ENTITY, self.to_string())
128 }
129 };
130
131 let body = serde_json::json!({
132 "error": error_message,
133 });
134
135 (status, axum::Json(body)).into_response()
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[cfg(feature = "server")]
145 use axum::{http::StatusCode, response::IntoResponse};
146
147 #[test]
148 fn test_error_display() {
149 let err = AllSourceError::EventNotFound("event-123".to_string());
150 assert_eq!(err.to_string(), "Event not found: event-123");
151
152 let err = AllSourceError::EntityNotFound("entity-456".to_string());
153 assert_eq!(err.to_string(), "Entity not found: entity-456");
154
155 let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
156 assert_eq!(err.to_string(), "Tenant already exists: tenant-1");
157
158 let err = AllSourceError::TenantNotFound("tenant-2".to_string());
159 assert_eq!(err.to_string(), "Tenant not found: tenant-2");
160 }
161
162 #[test]
163 fn test_error_variants() {
164 let errors: Vec<AllSourceError> = vec![
165 AllSourceError::InvalidEvent("bad event".to_string()),
166 AllSourceError::InvalidQuery("bad query".to_string()),
167 AllSourceError::InvalidInput("bad input".to_string()),
168 AllSourceError::StorageError("storage failed".to_string()),
169 AllSourceError::ArrowError("arrow failed".to_string()),
170 AllSourceError::IndexError("index failed".to_string()),
171 AllSourceError::ValidationError("validation failed".to_string()),
172 AllSourceError::ConcurrencyError("conflict".to_string()),
173 AllSourceError::QueueFull("queue full".to_string()),
174 AllSourceError::InternalError("internal error".to_string()),
175 ];
176
177 for err in errors {
178 let msg = err.to_string();
179 assert!(!msg.is_empty());
180 }
181 }
182
183 #[cfg(feature = "server")]
184 #[test]
185 fn test_into_response_not_found() {
186 let err = AllSourceError::EventNotFound("event-123".to_string());
187 let response = err.into_response();
188 assert_eq!(response.status(), StatusCode::NOT_FOUND);
189
190 let err = AllSourceError::EntityNotFound("entity-456".to_string());
191 let response = err.into_response();
192 assert_eq!(response.status(), StatusCode::NOT_FOUND);
193
194 let err = AllSourceError::TenantNotFound("tenant-1".to_string());
195 let response = err.into_response();
196 assert_eq!(response.status(), StatusCode::NOT_FOUND);
197 }
198
199 #[cfg(feature = "server")]
200 #[test]
201 fn test_into_response_bad_request() {
202 let err = AllSourceError::InvalidEvent("bad event".to_string());
203 let response = err.into_response();
204 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
205
206 let err = AllSourceError::InvalidQuery("bad query".to_string());
207 let response = err.into_response();
208 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
209
210 let err = AllSourceError::InvalidInput("bad input".to_string());
211 let response = err.into_response();
212 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
213
214 let err = AllSourceError::ValidationError("validation failed".to_string());
215 let response = err.into_response();
216 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
217 }
218
219 #[cfg(feature = "server")]
220 #[test]
221 fn test_into_response_conflict() {
222 let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
223 let response = err.into_response();
224 assert_eq!(response.status(), StatusCode::CONFLICT);
225
226 let err = AllSourceError::ConcurrencyError("conflict".to_string());
227 let response = err.into_response();
228 assert_eq!(response.status(), StatusCode::CONFLICT);
229 }
230
231 #[cfg(feature = "server")]
232 #[test]
233 fn test_into_response_service_unavailable() {
234 let err = AllSourceError::QueueFull("queue is full".to_string());
235 let response = err.into_response();
236 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
237 }
238
239 #[cfg(feature = "server")]
240 #[test]
241 fn test_into_response_internal_error() {
242 let err = AllSourceError::StorageError("storage error".to_string());
243 let response = err.into_response();
244 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
245
246 let err = AllSourceError::ArrowError("arrow error".to_string());
247 let response = err.into_response();
248 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
249
250 let err = AllSourceError::IndexError("index error".to_string());
251 let response = err.into_response();
252 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
253
254 let err = AllSourceError::InternalError("internal error".to_string());
255 let response = err.into_response();
256 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
257 }
258
259 #[test]
260 fn test_from_arrow_error() {
261 let arrow_err = arrow::error::ArrowError::InvalidArgumentError("test".to_string());
262 let err: AllSourceError = arrow_err.into();
263 assert!(matches!(err, AllSourceError::ArrowError(_)));
264 }
265
266 #[test]
267 fn test_from_parquet_error() {
268 let parquet_err = parquet::errors::ParquetError::General("test".to_string());
269 let err: AllSourceError = parquet_err.into();
270 assert!(matches!(err, AllSourceError::StorageError(_)));
271 }
272
273 #[test]
274 fn test_error_debug() {
275 let err = AllSourceError::EventNotFound("test".to_string());
276 let debug_str = format!("{:?}", err);
277 assert!(debug_str.contains("EventNotFound"));
278 }
279
280 #[test]
281 fn test_is_retryable() {
282 assert!(AllSourceError::StorageError("io".to_string()).is_retryable());
284 assert!(AllSourceError::ConcurrencyError("conflict".to_string()).is_retryable());
285 assert!(AllSourceError::QueueFull("backpressure".to_string()).is_retryable());
286
287 assert!(!AllSourceError::EventNotFound("e1".to_string()).is_retryable());
289 assert!(!AllSourceError::InvalidEvent("bad".to_string()).is_retryable());
290 assert!(!AllSourceError::InvalidQuery("bad".to_string()).is_retryable());
291 assert!(!AllSourceError::ValidationError("bad".to_string()).is_retryable());
292 assert!(!AllSourceError::TenantNotFound("t1".to_string()).is_retryable());
293 assert!(!AllSourceError::TenantAlreadyExists("t1".to_string()).is_retryable());
294 assert!(!AllSourceError::InternalError("bug".to_string()).is_retryable());
295 }
296}