1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8 #[error("Validation error: {0}")]
10 Validation(String),
11
12 #[error("Function execution error: {context}")]
14 FunctionExecution {
15 context: String,
16 #[source]
17 #[serde(skip)]
18 source: Option<Box<DataflowError>>,
19 },
20
21 #[error("Workflow error: {0}")]
23 Workflow(String),
24
25 #[error("Task error: {0}")]
27 Task(String),
28
29 #[error("Function not found: {0}")]
31 FunctionNotFound(String),
32
33 #[error("Deserialization error: {0}")]
35 Deserialization(String),
36
37 #[error("IO error: {0}")]
39 Io(String),
40
41 #[error("Logic evaluation error: {0}")]
43 LogicEvaluation(String),
44
45 #[error("HTTP error: {status} - {message}")]
47 Http { status: u16, message: String },
48
49 #[error("Timeout error: {0}")]
51 Timeout(String),
52
53 #[error("Unknown error: {0}")]
55 Unknown(String),
56}
57
58impl DataflowError {
59 pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
61 DataflowError::FunctionExecution {
62 context: context.into(),
63 source: source.map(Box::new),
64 }
65 }
66
67 pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
69 DataflowError::Http {
70 status,
71 message: message.into(),
72 }
73 }
74
75 pub fn from_io(err: std::io::Error) -> Self {
77 DataflowError::Io(err.to_string())
78 }
79
80 pub fn from_serde(err: serde_json::Error) -> Self {
82 DataflowError::Deserialization(err.to_string())
83 }
84
85 pub fn retryable(&self) -> bool {
91 match self {
92 DataflowError::Http { status, .. } => {
94 *status >= 500 || *status == 429 || *status == 408 || *status == 0
96 }
98 DataflowError::Timeout(_) => true,
99 DataflowError::Io(_) => true,
100 DataflowError::FunctionExecution { source, .. } => {
101 source.as_ref().map(|e| e.retryable()).unwrap_or(false)
103 }
104
105 DataflowError::Validation(_) => false,
107 DataflowError::LogicEvaluation(_) => false,
108 DataflowError::Deserialization(_) => false,
109 DataflowError::Workflow(_) => false,
110 DataflowError::Task(_) => false,
111 DataflowError::FunctionNotFound(_) => false,
112 DataflowError::Unknown(_) => false,
113 }
114 }
115}
116
117pub type Result<T> = std::result::Result<T, DataflowError>;
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ErrorInfo {
123 pub code: String,
125
126 pub message: String,
128
129 pub path: Option<String>,
131
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub workflow_id: Option<String>,
135
136 #[serde(skip_serializing_if = "Option::is_none")]
138 pub task_id: Option<String>,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub timestamp: Option<String>,
143
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub retry_attempted: Option<bool>,
147
148 #[serde(skip_serializing_if = "Option::is_none")]
150 pub retry_count: Option<u32>,
151}
152
153impl ErrorInfo {
154 pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
156 Self {
157 code: match &error {
158 DataflowError::Validation(_) => "VALIDATION_ERROR".to_string(),
159 DataflowError::Workflow(_) => "WORKFLOW_ERROR".to_string(),
160 DataflowError::Task(_) => "TASK_ERROR".to_string(),
161 DataflowError::FunctionNotFound(_) => "FUNCTION_NOT_FOUND".to_string(),
162 DataflowError::FunctionExecution { .. } => "FUNCTION_ERROR".to_string(),
163 DataflowError::LogicEvaluation(_) => "LOGIC_ERROR".to_string(),
164 DataflowError::Http { .. } => "HTTP_ERROR".to_string(),
165 DataflowError::Timeout(_) => "TIMEOUT_ERROR".to_string(),
166 DataflowError::Io(_) => "IO_ERROR".to_string(),
167 DataflowError::Deserialization(_) => "DESERIALIZATION_ERROR".to_string(),
168 DataflowError::Unknown(_) => "UNKNOWN_ERROR".to_string(),
169 },
170 message: error.to_string(),
171 path: None,
172 workflow_id,
173 task_id,
174 timestamp: Some(Utc::now().to_rfc3339()),
175 retry_attempted: Some(false),
176 retry_count: Some(0),
177 }
178 }
179
180 pub fn simple(code: String, message: String, path: Option<String>) -> Self {
182 Self {
183 code,
184 message,
185 path,
186 workflow_id: None,
187 task_id: None,
188 timestamp: Some(Utc::now().to_rfc3339()),
189 retry_attempted: None,
190 retry_count: None,
191 }
192 }
193
194 pub fn simple_ref(code: &str, message: &str, path: Option<&str>) -> Self {
196 Self {
197 code: code.to_string(),
198 message: message.to_string(),
199 path: path.map(|s| s.to_string()),
200 workflow_id: None,
201 task_id: None,
202 timestamp: Some(Utc::now().to_rfc3339()),
203 retry_attempted: None,
204 retry_count: None,
205 }
206 }
207
208 pub fn with_retry(mut self) -> Self {
210 self.retry_attempted = Some(true);
211 self.retry_count = Some(self.retry_count.unwrap_or(0) + 1);
212 self
213 }
214
215 pub fn builder(code: impl Into<String>, message: impl Into<String>) -> ErrorInfoBuilder {
217 ErrorInfoBuilder::new(code, message)
218 }
219}
220
221pub struct ErrorInfoBuilder {
223 code: String,
224 message: String,
225 path: Option<String>,
226 workflow_id: Option<String>,
227 task_id: Option<String>,
228 timestamp: Option<String>,
229 retry_attempted: Option<bool>,
230 retry_count: Option<u32>,
231}
232
233impl ErrorInfoBuilder {
234 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
236 Self {
237 code: code.into(),
238 message: message.into(),
239 path: None,
240 workflow_id: None,
241 task_id: None,
242 timestamp: Some(Utc::now().to_rfc3339()),
243 retry_attempted: None,
244 retry_count: None,
245 }
246 }
247
248 pub fn path(mut self, path: impl Into<String>) -> Self {
250 self.path = Some(path.into());
251 self
252 }
253
254 pub fn workflow_id(mut self, id: impl Into<String>) -> Self {
256 self.workflow_id = Some(id.into());
257 self
258 }
259
260 pub fn task_id(mut self, id: impl Into<String>) -> Self {
262 self.task_id = Some(id.into());
263 self
264 }
265
266 pub fn timestamp(mut self, timestamp: impl Into<String>) -> Self {
268 self.timestamp = Some(timestamp.into());
269 self
270 }
271
272 pub fn retry_attempted(mut self, attempted: bool) -> Self {
274 self.retry_attempted = Some(attempted);
275 self
276 }
277
278 pub fn retry_count(mut self, count: u32) -> Self {
280 self.retry_count = Some(count);
281 self
282 }
283
284 pub fn build(self) -> ErrorInfo {
286 ErrorInfo {
287 code: self.code,
288 message: self.message,
289 path: self.path,
290 workflow_id: self.workflow_id,
291 task_id: self.task_id,
292 timestamp: self.timestamp,
293 retry_attempted: self.retry_attempted,
294 retry_count: self.retry_count,
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_retryable_errors() {
305 assert!(
307 DataflowError::Http {
308 status: 500,
309 message: "Internal Server Error".to_string()
310 }
311 .retryable()
312 );
313 assert!(
314 DataflowError::Http {
315 status: 502,
316 message: "Bad Gateway".to_string()
317 }
318 .retryable()
319 );
320 assert!(
321 DataflowError::Http {
322 status: 503,
323 message: "Service Unavailable".to_string()
324 }
325 .retryable()
326 );
327 assert!(
328 DataflowError::Http {
329 status: 429,
330 message: "Too Many Requests".to_string()
331 }
332 .retryable()
333 );
334 assert!(
335 DataflowError::Http {
336 status: 408,
337 message: "Request Timeout".to_string()
338 }
339 .retryable()
340 );
341 assert!(
342 DataflowError::Http {
343 status: 0,
344 message: "Connection Error".to_string()
345 }
346 .retryable()
347 );
348 assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
349 assert!(DataflowError::Io("Network error".to_string()).retryable());
350 }
351
352 #[test]
353 fn test_non_retryable_errors() {
354 assert!(
356 !DataflowError::Http {
357 status: 400,
358 message: "Bad Request".to_string()
359 }
360 .retryable()
361 );
362 assert!(
363 !DataflowError::Http {
364 status: 401,
365 message: "Unauthorized".to_string()
366 }
367 .retryable()
368 );
369 assert!(
370 !DataflowError::Http {
371 status: 403,
372 message: "Forbidden".to_string()
373 }
374 .retryable()
375 );
376 assert!(
377 !DataflowError::Http {
378 status: 404,
379 message: "Not Found".to_string()
380 }
381 .retryable()
382 );
383 assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
384 assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
385 assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
386 assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
387 assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
388 }
389
390 #[test]
391 fn test_function_execution_error_retryability() {
392 let retryable_source = DataflowError::Http {
394 status: 500,
395 message: "Server Error".to_string(),
396 };
397 let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
398
399 let retryable_func_error =
400 DataflowError::function_execution("HTTP call failed", Some(retryable_source));
401 let non_retryable_func_error =
402 DataflowError::function_execution("Validation failed", Some(non_retryable_source));
403 let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
404
405 assert!(retryable_func_error.retryable());
406 assert!(!non_retryable_func_error.retryable());
407 assert!(!no_source_func_error.retryable());
408 }
409
410 #[test]
411 fn test_error_info_builder() {
412 let error = ErrorInfo::builder("TEST_ERROR", "Test message").build();
414 assert_eq!(error.code, "TEST_ERROR");
415 assert_eq!(error.message, "Test message");
416 assert!(error.timestamp.is_some());
417 assert!(error.path.is_none());
418
419 let error = ErrorInfo::builder("VALIDATION_ERROR", "Field validation failed")
421 .path("data.email")
422 .workflow_id("workflow_1")
423 .task_id("validate_email")
424 .retry_attempted(true)
425 .retry_count(2)
426 .build();
427
428 assert_eq!(error.code, "VALIDATION_ERROR");
429 assert_eq!(error.message, "Field validation failed");
430 assert_eq!(error.path, Some("data.email".to_string()));
431 assert_eq!(error.workflow_id, Some("workflow_1".to_string()));
432 assert_eq!(error.task_id, Some("validate_email".to_string()));
433 assert_eq!(error.retry_attempted, Some(true));
434 assert_eq!(error.retry_count, Some(2));
435 }
436}