1#[derive(Debug, thiserror::Error)]
3pub enum AllSourceError {
4 #[error("Event not found: {0}")]
5 EventNotFound(String),
6
7 #[error("Entity not found: {0}")]
8 EntityNotFound(String),
9
10 #[error("Tenant already exists: {0}")]
11 TenantAlreadyExists(String),
12
13 #[error("Tenant not found: {0}")]
14 TenantNotFound(String),
15
16 #[error("Invalid event: {0}")]
17 InvalidEvent(String),
18
19 #[error("Invalid query: {0}")]
20 InvalidQuery(String),
21
22 #[error("Invalid input: {0}")]
23 InvalidInput(String),
24
25 #[error("Storage error: {0}")]
26 StorageError(String),
27
28 #[error("Serialization error: {0}")]
29 SerializationError(#[from] serde_json::Error),
30
31 #[error("Arrow error: {0}")]
32 ArrowError(String),
33
34 #[error("Index error: {0}")]
35 IndexError(String),
36
37 #[error("Validation error: {0}")]
38 ValidationError(String),
39
40 #[error("Concurrency error: {0}")]
41 ConcurrencyError(String),
42
43 #[error("Version conflict: expected {expected}, current {current}")]
44 VersionConflict { expected: u64, current: u64 },
45
46 #[error("Queue full: {0}")]
47 QueueFull(String),
48
49 #[error("Internal error: {0}")]
50 InternalError(String),
51}
52
53impl AllSourceError {
54 pub fn is_retryable(&self) -> bool {
57 matches!(
58 self,
59 AllSourceError::StorageError(_)
60 | AllSourceError::ConcurrencyError(_)
61 | AllSourceError::VersionConflict { .. }
62 | AllSourceError::QueueFull(_)
63 )
64 }
65}
66
67pub use AllSourceError as Error;
69
70impl From<arrow::error::ArrowError> for AllSourceError {
71 fn from(err: arrow::error::ArrowError) -> Self {
72 AllSourceError::ArrowError(err.to_string())
73 }
74}
75
76impl From<parquet::errors::ParquetError> for AllSourceError {
77 fn from(err: parquet::errors::ParquetError) -> Self {
78 AllSourceError::StorageError(err.to_string())
79 }
80}
81
82impl From<crate::infrastructure::persistence::SimdJsonError> for AllSourceError {
83 fn from(err: crate::infrastructure::persistence::SimdJsonError) -> Self {
84 AllSourceError::SerializationError(serde_json::Error::io(std::io::Error::new(
85 std::io::ErrorKind::InvalidData,
86 err.to_string(),
87 )))
88 }
89}
90
91#[cfg(feature = "postgres")]
92impl From<sqlx::Error> for AllSourceError {
93 fn from(err: sqlx::Error) -> Self {
94 AllSourceError::StorageError(format!("Database error: {err}"))
95 }
96}
97
98pub type Result<T> = std::result::Result<T, AllSourceError>;
100
101#[cfg(feature = "server")]
102mod axum_impl {
103 use super::AllSourceError;
104 use axum::{
105 http::StatusCode,
106 response::{IntoResponse, Response},
107 };
108
109 impl IntoResponse for AllSourceError {
111 fn into_response(self) -> Response {
112 let (status, error_message) = match self {
113 AllSourceError::EventNotFound(_)
114 | AllSourceError::EntityNotFound(_)
115 | AllSourceError::TenantNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
116 AllSourceError::InvalidEvent(_)
117 | AllSourceError::InvalidQuery(_)
118 | AllSourceError::InvalidInput(_)
119 | AllSourceError::ValidationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
120 AllSourceError::VersionConflict { expected, current } => {
121 let body = serde_json::json!({
122 "error": "version_conflict",
123 "expected_version": expected,
124 "current_version": current,
125 });
126 return (StatusCode::CONFLICT, axum::Json(body)).into_response();
127 }
128 AllSourceError::TenantAlreadyExists(_) | AllSourceError::ConcurrencyError(_) => {
129 (StatusCode::CONFLICT, self.to_string())
130 }
131 AllSourceError::QueueFull(_) => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
132 AllSourceError::StorageError(_)
133 | AllSourceError::ArrowError(_)
134 | AllSourceError::IndexError(_)
135 | AllSourceError::InternalError(_) => {
136 (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
137 }
138 AllSourceError::SerializationError(_) => {
139 (StatusCode::UNPROCESSABLE_ENTITY, self.to_string())
140 }
141 };
142
143 let body = serde_json::json!({
144 "error": error_message,
145 });
146
147 (status, axum::Json(body)).into_response()
148 }
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[cfg(feature = "server")]
157 use axum::{http::StatusCode, response::IntoResponse};
158
159 #[test]
160 fn test_error_display() {
161 let err = AllSourceError::EventNotFound("event-123".to_string());
162 assert_eq!(err.to_string(), "Event not found: event-123");
163
164 let err = AllSourceError::EntityNotFound("entity-456".to_string());
165 assert_eq!(err.to_string(), "Entity not found: entity-456");
166
167 let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
168 assert_eq!(err.to_string(), "Tenant already exists: tenant-1");
169
170 let err = AllSourceError::TenantNotFound("tenant-2".to_string());
171 assert_eq!(err.to_string(), "Tenant not found: tenant-2");
172 }
173
174 #[test]
175 fn test_error_variants() {
176 let errors: Vec<AllSourceError> = vec![
177 AllSourceError::InvalidEvent("bad event".to_string()),
178 AllSourceError::InvalidQuery("bad query".to_string()),
179 AllSourceError::InvalidInput("bad input".to_string()),
180 AllSourceError::StorageError("storage failed".to_string()),
181 AllSourceError::ArrowError("arrow failed".to_string()),
182 AllSourceError::IndexError("index failed".to_string()),
183 AllSourceError::ValidationError("validation failed".to_string()),
184 AllSourceError::ConcurrencyError("conflict".to_string()),
185 AllSourceError::QueueFull("queue full".to_string()),
186 AllSourceError::InternalError("internal error".to_string()),
187 ];
188
189 for err in errors {
190 let msg = err.to_string();
191 assert!(!msg.is_empty());
192 }
193 }
194
195 #[cfg(feature = "server")]
196 #[test]
197 fn test_into_response_not_found() {
198 let err = AllSourceError::EventNotFound("event-123".to_string());
199 let response = err.into_response();
200 assert_eq!(response.status(), StatusCode::NOT_FOUND);
201
202 let err = AllSourceError::EntityNotFound("entity-456".to_string());
203 let response = err.into_response();
204 assert_eq!(response.status(), StatusCode::NOT_FOUND);
205
206 let err = AllSourceError::TenantNotFound("tenant-1".to_string());
207 let response = err.into_response();
208 assert_eq!(response.status(), StatusCode::NOT_FOUND);
209 }
210
211 #[cfg(feature = "server")]
212 #[test]
213 fn test_into_response_bad_request() {
214 let err = AllSourceError::InvalidEvent("bad event".to_string());
215 let response = err.into_response();
216 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
217
218 let err = AllSourceError::InvalidQuery("bad query".to_string());
219 let response = err.into_response();
220 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
221
222 let err = AllSourceError::InvalidInput("bad input".to_string());
223 let response = err.into_response();
224 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
225
226 let err = AllSourceError::ValidationError("validation failed".to_string());
227 let response = err.into_response();
228 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
229 }
230
231 #[cfg(feature = "server")]
232 #[test]
233 fn test_into_response_conflict() {
234 let err = AllSourceError::TenantAlreadyExists("tenant-1".to_string());
235 let response = err.into_response();
236 assert_eq!(response.status(), StatusCode::CONFLICT);
237
238 let err = AllSourceError::ConcurrencyError("conflict".to_string());
239 let response = err.into_response();
240 assert_eq!(response.status(), StatusCode::CONFLICT);
241 }
242
243 #[cfg(feature = "server")]
244 #[test]
245 fn test_into_response_service_unavailable() {
246 let err = AllSourceError::QueueFull("queue is full".to_string());
247 let response = err.into_response();
248 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
249 }
250
251 #[cfg(feature = "server")]
252 #[test]
253 fn test_into_response_internal_error() {
254 let err = AllSourceError::StorageError("storage error".to_string());
255 let response = err.into_response();
256 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
257
258 let err = AllSourceError::ArrowError("arrow error".to_string());
259 let response = err.into_response();
260 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
261
262 let err = AllSourceError::IndexError("index error".to_string());
263 let response = err.into_response();
264 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
265
266 let err = AllSourceError::InternalError("internal error".to_string());
267 let response = err.into_response();
268 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
269 }
270
271 #[test]
272 fn test_from_arrow_error() {
273 let arrow_err = arrow::error::ArrowError::InvalidArgumentError("test".to_string());
274 let err: AllSourceError = arrow_err.into();
275 assert!(matches!(err, AllSourceError::ArrowError(_)));
276 }
277
278 #[test]
279 fn test_from_parquet_error() {
280 let parquet_err = parquet::errors::ParquetError::General("test".to_string());
281 let err: AllSourceError = parquet_err.into();
282 assert!(matches!(err, AllSourceError::StorageError(_)));
283 }
284
285 #[test]
286 fn test_error_debug() {
287 let err = AllSourceError::EventNotFound("test".to_string());
288 let debug_str = format!("{err:?}");
289 assert!(debug_str.contains("EventNotFound"));
290 }
291
292 #[test]
293 fn test_is_retryable() {
294 assert!(AllSourceError::StorageError("io".to_string()).is_retryable());
296 assert!(AllSourceError::ConcurrencyError("conflict".to_string()).is_retryable());
297 assert!(AllSourceError::QueueFull("backpressure".to_string()).is_retryable());
298
299 assert!(!AllSourceError::EventNotFound("e1".to_string()).is_retryable());
301 assert!(!AllSourceError::InvalidEvent("bad".to_string()).is_retryable());
302 assert!(!AllSourceError::InvalidQuery("bad".to_string()).is_retryable());
303 assert!(!AllSourceError::ValidationError("bad".to_string()).is_retryable());
304 assert!(!AllSourceError::TenantNotFound("t1".to_string()).is_retryable());
305 assert!(!AllSourceError::TenantAlreadyExists("t1".to_string()).is_retryable());
306 assert!(!AllSourceError::InternalError("bug".to_string()).is_retryable());
307 }
308}