1use axum::{
2 http::StatusCode,
3 response::{IntoResponse, Response},
4};
5
6#[derive(Debug, thiserror::Error)]
8pub enum AllSourceError {
9 #[error("Event not found: {0}")]
10 EventNotFound(String),
11
12 #[error("Entity not found: {0}")]
13 EntityNotFound(String),
14
15 #[error("Tenant already exists: {0}")]
16 TenantAlreadyExists(String),
17
18 #[error("Tenant not found: {0}")]
19 TenantNotFound(String),
20
21 #[error("Invalid event: {0}")]
22 InvalidEvent(String),
23
24 #[error("Invalid query: {0}")]
25 InvalidQuery(String),
26
27 #[error("Invalid input: {0}")]
28 InvalidInput(String),
29
30 #[error("Storage error: {0}")]
31 StorageError(String),
32
33 #[error("Serialization error: {0}")]
34 SerializationError(#[from] serde_json::Error),
35
36 #[error("Arrow error: {0}")]
37 ArrowError(String),
38
39 #[error("Index error: {0}")]
40 IndexError(String),
41
42 #[error("Validation error: {0}")]
43 ValidationError(String),
44
45 #[error("Concurrency error: {0}")]
46 ConcurrencyError(String),
47
48 #[error("Queue full: {0}")]
49 QueueFull(String),
50
51 #[error("Internal error: {0}")]
52 InternalError(String),
53}
54
55impl AllSourceError {
56 pub fn is_retryable(&self) -> bool {
59 matches!(
60 self,
61 AllSourceError::StorageError(_)
62 | AllSourceError::ConcurrencyError(_)
63 | AllSourceError::QueueFull(_)
64 )
65 }
66}
67
68pub use AllSourceError as Error;
70
71impl From<arrow::error::ArrowError> for AllSourceError {
72 fn from(err: arrow::error::ArrowError) -> Self {
73 AllSourceError::ArrowError(err.to_string())
74 }
75}
76
77impl From<parquet::errors::ParquetError> for AllSourceError {
78 fn from(err: parquet::errors::ParquetError) -> Self {
79 AllSourceError::StorageError(err.to_string())
80 }
81}
82
83impl From<crate::infrastructure::persistence::SimdJsonError> for AllSourceError {
84 fn from(err: crate::infrastructure::persistence::SimdJsonError) -> Self {
85 AllSourceError::SerializationError(serde_json::Error::io(std::io::Error::new(
86 std::io::ErrorKind::InvalidData,
87 err.to_string(),
88 )))
89 }
90}
91
92#[cfg(feature = "postgres")]
93impl From<sqlx::Error> for AllSourceError {
94 fn from(err: sqlx::Error) -> Self {
95 AllSourceError::StorageError(format!("Database error: {err}"))
96 }
97}
98
99pub type Result<T> = std::result::Result<T, AllSourceError>;
101
102impl IntoResponse for AllSourceError {
104 fn into_response(self) -> Response {
105 let (status, error_message) = match self {
106 AllSourceError::EventNotFound(_)
107 | AllSourceError::EntityNotFound(_)
108 | AllSourceError::TenantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
109 AllSourceError::InvalidEvent(_)
110 | AllSourceError::InvalidQuery(_)
111 | AllSourceError::InvalidInput(_)
112 | AllSourceError::ValidationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
113 AllSourceError::TenantAlreadyExists(_) | AllSourceError::ConcurrencyError(_) => {
114 (StatusCode::CONFLICT, self.to_string())
115 }
116 AllSourceError::QueueFull(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
117 AllSourceError::StorageError(_)
118 | AllSourceError::ArrowError(_)
119 | AllSourceError::IndexError(_)
120 | AllSourceError::InternalError(_) => {
121 (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
122 }
123 AllSourceError::SerializationError(_) => {
124 (StatusCode::UNPROCESSABLE_ENTITY, self.to_string())
125 }
126 };
127
128 let body = serde_json::json!({
129 "error": error_message,
130 });
131
132 (status, axum::Json(body)).into_response()
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_error_display() {
142 let err = AllSourceError::EventNotFound("event-123".to_string());
143 assert_eq!(err.to_string(), "Event not found: event-123");
144
145 let err = AllSourceError::EntityNotFound("entity-456".to_string());
146 assert_eq!(err.to_string(), "Entity not found: entity-456");
147
148 let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
149 assert_eq!(err.to_string(), "Tenant already exists: tenant-1");
150
151 let err = AllSourceError::TenantNotFound("tenant-2".to_string());
152 assert_eq!(err.to_string(), "Tenant not found: tenant-2");
153 }
154
155 #[test]
156 fn test_error_variants() {
157 let errors: Vec<AllSourceError> = vec![
158 AllSourceError::InvalidEvent("bad event".to_string()),
159 AllSourceError::InvalidQuery("bad query".to_string()),
160 AllSourceError::InvalidInput("bad input".to_string()),
161 AllSourceError::StorageError("storage failed".to_string()),
162 AllSourceError::ArrowError("arrow failed".to_string()),
163 AllSourceError::IndexError("index failed".to_string()),
164 AllSourceError::ValidationError("validation failed".to_string()),
165 AllSourceError::ConcurrencyError("conflict".to_string()),
166 AllSourceError::QueueFull("queue full".to_string()),
167 AllSourceError::InternalError("internal error".to_string()),
168 ];
169
170 for err in errors {
171 let msg = err.to_string();
172 assert!(!msg.is_empty());
173 }
174 }
175
176 #[test]
177 fn test_into_response_not_found() {
178 let err = AllSourceError::EventNotFound("event-123".to_string());
179 let response = err.into_response();
180 assert_eq!(response.status(), StatusCode::NOT_FOUND);
181
182 let err = AllSourceError::EntityNotFound("entity-456".to_string());
183 let response = err.into_response();
184 assert_eq!(response.status(), StatusCode::NOT_FOUND);
185
186 let err = AllSourceError::TenantNotFound("tenant-1".to_string());
187 let response = err.into_response();
188 assert_eq!(response.status(), StatusCode::NOT_FOUND);
189 }
190
191 #[test]
192 fn test_into_response_bad_request() {
193 let err = AllSourceError::InvalidEvent("bad event".to_string());
194 let response = err.into_response();
195 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
196
197 let err = AllSourceError::InvalidQuery("bad query".to_string());
198 let response = err.into_response();
199 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
200
201 let err = AllSourceError::InvalidInput("bad input".to_string());
202 let response = err.into_response();
203 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
204
205 let err = AllSourceError::ValidationError("validation failed".to_string());
206 let response = err.into_response();
207 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
208 }
209
210 #[test]
211 fn test_into_response_conflict() {
212 let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
213 let response = err.into_response();
214 assert_eq!(response.status(), StatusCode::CONFLICT);
215
216 let err = AllSourceError::ConcurrencyError("conflict".to_string());
217 let response = err.into_response();
218 assert_eq!(response.status(), StatusCode::CONFLICT);
219 }
220
221 #[test]
222 fn test_into_response_service_unavailable() {
223 let err = AllSourceError::QueueFull("queue is full".to_string());
224 let response = err.into_response();
225 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
226 }
227
228 #[test]
229 fn test_into_response_internal_error() {
230 let err = AllSourceError::StorageError("storage error".to_string());
231 let response = err.into_response();
232 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
233
234 let err = AllSourceError::ArrowError("arrow error".to_string());
235 let response = err.into_response();
236 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
237
238 let err = AllSourceError::IndexError("index error".to_string());
239 let response = err.into_response();
240 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
241
242 let err = AllSourceError::InternalError("internal error".to_string());
243 let response = err.into_response();
244 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
245 }
246
247 #[test]
248 fn test_from_arrow_error() {
249 let arrow_err = arrow::error::ArrowError::InvalidArgumentError("test".to_string());
250 let err: AllSourceError = arrow_err.into();
251 assert!(matches!(err, AllSourceError::ArrowError(_)));
252 }
253
254 #[test]
255 fn test_from_parquet_error() {
256 let parquet_err = parquet::errors::ParquetError::General("test".to_string());
257 let err: AllSourceError = parquet_err.into();
258 assert!(matches!(err, AllSourceError::StorageError(_)));
259 }
260
261 #[test]
262 fn test_error_debug() {
263 let err = AllSourceError::EventNotFound("test".to_string());
264 let debug_str = format!("{:?}", err);
265 assert!(debug_str.contains("EventNotFound"));
266 }
267
268 #[test]
269 fn test_is_retryable() {
270 assert!(AllSourceError::StorageError("io".to_string()).is_retryable());
272 assert!(AllSourceError::ConcurrencyError("conflict".to_string()).is_retryable());
273 assert!(AllSourceError::QueueFull("backpressure".to_string()).is_retryable());
274
275 assert!(!AllSourceError::EventNotFound("e1".to_string()).is_retryable());
277 assert!(!AllSourceError::InvalidEvent("bad".to_string()).is_retryable());
278 assert!(!AllSourceError::InvalidQuery("bad".to_string()).is_retryable());
279 assert!(!AllSourceError::ValidationError("bad".to_string()).is_retryable());
280 assert!(!AllSourceError::TenantNotFound("t1".to_string()).is_retryable());
281 assert!(!AllSourceError::TenantAlreadyExists("t1".to_string()).is_retryable());
282 assert!(!AllSourceError::InternalError("bug".to_string()).is_retryable());
283 }
284}