1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8 #[error("Validation error: {0}")]
10 Validation(String),
11
12 #[error("Function execution error: {context}")]
14 FunctionExecution {
15 context: String,
16 #[source]
17 #[serde(skip)]
18 source: Option<Box<DataflowError>>,
19 },
20
21 #[error("Workflow error: {0}")]
23 Workflow(String),
24
25 #[error("Task error: {0}")]
27 Task(String),
28
29 #[error("Function not found: {0}")]
31 FunctionNotFound(String),
32
33 #[error("Deserialization error: {0}")]
35 Deserialization(String),
36
37 #[error("IO error: {0}")]
39 Io(String),
40
41 #[error("Logic evaluation error: {0}")]
43 LogicEvaluation(String),
44
45 #[error("HTTP error: {status} - {message}")]
47 Http { status: u16, message: String },
48
49 #[error("Timeout error: {0}")]
51 Timeout(String),
52
53 #[error("Unknown error: {0}")]
55 Unknown(String),
56}
57
58impl DataflowError {
59 pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
61 DataflowError::FunctionExecution {
62 context: context.into(),
63 source: source.map(Box::new),
64 }
65 }
66
67 pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
69 DataflowError::Http {
70 status,
71 message: message.into(),
72 }
73 }
74
75 pub fn from_io(err: std::io::Error) -> Self {
77 DataflowError::Io(err.to_string())
78 }
79
80 pub fn from_serde(err: serde_json::Error) -> Self {
82 DataflowError::Deserialization(err.to_string())
83 }
84
85 pub fn retryable(&self) -> bool {
91 match self {
92 DataflowError::Http { status, .. } => {
94 *status >= 500 || *status == 429 || *status == 408 || *status == 0
96 }
98 DataflowError::Timeout(_) => true,
99 DataflowError::Io(_) => true,
100 DataflowError::FunctionExecution { source, .. } => {
101 source.as_ref().map(|e| e.retryable()).unwrap_or(false)
103 }
104
105 DataflowError::Validation(_) => false,
107 DataflowError::LogicEvaluation(_) => false,
108 DataflowError::Deserialization(_) => false,
109 DataflowError::Workflow(_) => false,
110 DataflowError::Task(_) => false,
111 DataflowError::FunctionNotFound(_) => false,
112 DataflowError::Unknown(_) => false,
113 }
114 }
115}
116
117pub type Result<T> = std::result::Result<T, DataflowError>;
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ErrorInfo {
123 pub code: String,
125
126 pub message: String,
128
129 pub path: Option<String>,
131
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub workflow_id: Option<String>,
135
136 #[serde(skip_serializing_if = "Option::is_none")]
138 pub task_id: Option<String>,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub timestamp: Option<String>,
143
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub retry_attempted: Option<bool>,
147
148 #[serde(skip_serializing_if = "Option::is_none")]
150 pub retry_count: Option<u32>,
151}
152
153impl ErrorInfo {
154 pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
156 Self {
157 code: match &error {
158 DataflowError::Validation(_) => "VALIDATION_ERROR".to_string(),
159 DataflowError::Workflow(_) => "WORKFLOW_ERROR".to_string(),
160 DataflowError::Task(_) => "TASK_ERROR".to_string(),
161 DataflowError::FunctionNotFound(_) => "FUNCTION_NOT_FOUND".to_string(),
162 DataflowError::FunctionExecution { .. } => "FUNCTION_ERROR".to_string(),
163 DataflowError::LogicEvaluation(_) => "LOGIC_ERROR".to_string(),
164 DataflowError::Http { .. } => "HTTP_ERROR".to_string(),
165 DataflowError::Timeout(_) => "TIMEOUT_ERROR".to_string(),
166 DataflowError::Io(_) => "IO_ERROR".to_string(),
167 DataflowError::Deserialization(_) => "DESERIALIZATION_ERROR".to_string(),
168 DataflowError::Unknown(_) => "UNKNOWN_ERROR".to_string(),
169 },
170 message: error.to_string(),
171 path: None,
172 workflow_id,
173 task_id,
174 timestamp: Some(Utc::now().to_rfc3339()),
175 retry_attempted: Some(false),
176 retry_count: Some(0),
177 }
178 }
179
180 pub fn simple(code: String, message: String, path: Option<String>) -> Self {
182 Self {
183 code,
184 message,
185 path,
186 workflow_id: None,
187 task_id: None,
188 timestamp: Some(Utc::now().to_rfc3339()),
189 retry_attempted: None,
190 retry_count: None,
191 }
192 }
193
194 pub fn simple_ref(code: &str, message: &str, path: Option<&str>) -> Self {
196 Self {
197 code: code.to_string(),
198 message: message.to_string(),
199 path: path.map(|s| s.to_string()),
200 workflow_id: None,
201 task_id: None,
202 timestamp: Some(Utc::now().to_rfc3339()),
203 retry_attempted: None,
204 retry_count: None,
205 }
206 }
207
208 pub fn with_retry(mut self) -> Self {
210 self.retry_attempted = Some(true);
211 self.retry_count = Some(self.retry_count.unwrap_or(0) + 1);
212 self
213 }
214
215 pub fn builder(code: impl Into<String>, message: impl Into<String>) -> ErrorInfoBuilder {
217 ErrorInfoBuilder::new(code, message)
218 }
219}
220
221#[must_use = "ErrorInfoBuilder must be `.build()` to produce an ErrorInfo"]
223pub struct ErrorInfoBuilder {
224 code: String,
225 message: String,
226 path: Option<String>,
227 workflow_id: Option<String>,
228 task_id: Option<String>,
229 timestamp: Option<String>,
230 retry_attempted: Option<bool>,
231 retry_count: Option<u32>,
232}
233
234impl ErrorInfoBuilder {
235 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
237 Self {
238 code: code.into(),
239 message: message.into(),
240 path: None,
241 workflow_id: None,
242 task_id: None,
243 timestamp: Some(Utc::now().to_rfc3339()),
244 retry_attempted: None,
245 retry_count: None,
246 }
247 }
248
249 pub fn path(mut self, path: impl Into<String>) -> Self {
251 self.path = Some(path.into());
252 self
253 }
254
255 pub fn workflow_id(mut self, id: impl Into<String>) -> Self {
257 self.workflow_id = Some(id.into());
258 self
259 }
260
261 pub fn task_id(mut self, id: impl Into<String>) -> Self {
263 self.task_id = Some(id.into());
264 self
265 }
266
267 pub fn timestamp(mut self, timestamp: impl Into<String>) -> Self {
269 self.timestamp = Some(timestamp.into());
270 self
271 }
272
273 pub fn retry_attempted(mut self, attempted: bool) -> Self {
275 self.retry_attempted = Some(attempted);
276 self
277 }
278
279 pub fn retry_count(mut self, count: u32) -> Self {
281 self.retry_count = Some(count);
282 self
283 }
284
285 pub fn build(self) -> ErrorInfo {
287 ErrorInfo {
288 code: self.code,
289 message: self.message,
290 path: self.path,
291 workflow_id: self.workflow_id,
292 task_id: self.task_id,
293 timestamp: self.timestamp,
294 retry_attempted: self.retry_attempted,
295 retry_count: self.retry_count,
296 }
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn test_retryable_errors() {
306 assert!(
308 DataflowError::Http {
309 status: 500,
310 message: "Internal Server Error".to_string()
311 }
312 .retryable()
313 );
314 assert!(
315 DataflowError::Http {
316 status: 502,
317 message: "Bad Gateway".to_string()
318 }
319 .retryable()
320 );
321 assert!(
322 DataflowError::Http {
323 status: 503,
324 message: "Service Unavailable".to_string()
325 }
326 .retryable()
327 );
328 assert!(
329 DataflowError::Http {
330 status: 429,
331 message: "Too Many Requests".to_string()
332 }
333 .retryable()
334 );
335 assert!(
336 DataflowError::Http {
337 status: 408,
338 message: "Request Timeout".to_string()
339 }
340 .retryable()
341 );
342 assert!(
343 DataflowError::Http {
344 status: 0,
345 message: "Connection Error".to_string()
346 }
347 .retryable()
348 );
349 assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
350 assert!(DataflowError::Io("Network error".to_string()).retryable());
351 }
352
353 #[test]
354 fn test_non_retryable_errors() {
355 assert!(
357 !DataflowError::Http {
358 status: 400,
359 message: "Bad Request".to_string()
360 }
361 .retryable()
362 );
363 assert!(
364 !DataflowError::Http {
365 status: 401,
366 message: "Unauthorized".to_string()
367 }
368 .retryable()
369 );
370 assert!(
371 !DataflowError::Http {
372 status: 403,
373 message: "Forbidden".to_string()
374 }
375 .retryable()
376 );
377 assert!(
378 !DataflowError::Http {
379 status: 404,
380 message: "Not Found".to_string()
381 }
382 .retryable()
383 );
384 assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
385 assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
386 assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
387 assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
388 assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
389 }
390
391 #[test]
392 fn test_function_execution_error_retryability() {
393 let retryable_source = DataflowError::Http {
395 status: 500,
396 message: "Server Error".to_string(),
397 };
398 let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
399
400 let retryable_func_error =
401 DataflowError::function_execution("HTTP call failed", Some(retryable_source));
402 let non_retryable_func_error =
403 DataflowError::function_execution("Validation failed", Some(non_retryable_source));
404 let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
405
406 assert!(retryable_func_error.retryable());
407 assert!(!non_retryable_func_error.retryable());
408 assert!(!no_source_func_error.retryable());
409 }
410
411 #[test]
412 fn test_error_info_builder() {
413 let error = ErrorInfo::builder("TEST_ERROR", "Test message").build();
415 assert_eq!(error.code, "TEST_ERROR");
416 assert_eq!(error.message, "Test message");
417 assert!(error.timestamp.is_some());
418 assert!(error.path.is_none());
419
420 let error = ErrorInfo::builder("VALIDATION_ERROR", "Field validation failed")
422 .path("data.email")
423 .workflow_id("workflow_1")
424 .task_id("validate_email")
425 .retry_attempted(true)
426 .retry_count(2)
427 .build();
428
429 assert_eq!(error.code, "VALIDATION_ERROR");
430 assert_eq!(error.message, "Field validation failed");
431 assert_eq!(error.path, Some("data.email".to_string()));
432 assert_eq!(error.workflow_id, Some("workflow_1".to_string()));
433 assert_eq!(error.task_id, Some("validate_email".to_string()));
434 assert_eq!(error.retry_attempted, Some(true));
435 assert_eq!(error.retry_count, Some(2));
436 }
437
438 #[test]
439 fn test_error_info_new_from_dataflow_error() {
440 let test_cases = vec![
442 (
443 DataflowError::Validation("test".to_string()),
444 "VALIDATION_ERROR",
445 ),
446 (
447 DataflowError::Workflow("test".to_string()),
448 "WORKFLOW_ERROR",
449 ),
450 (DataflowError::Task("test".to_string()), "TASK_ERROR"),
451 (
452 DataflowError::FunctionNotFound("test".to_string()),
453 "FUNCTION_NOT_FOUND",
454 ),
455 (
456 DataflowError::function_execution("test", None),
457 "FUNCTION_ERROR",
458 ),
459 (
460 DataflowError::LogicEvaluation("test".to_string()),
461 "LOGIC_ERROR",
462 ),
463 (DataflowError::http(404, "Not Found"), "HTTP_ERROR"),
464 (DataflowError::Timeout("test".to_string()), "TIMEOUT_ERROR"),
465 (DataflowError::Io("test".to_string()), "IO_ERROR"),
466 (
467 DataflowError::Deserialization("test".to_string()),
468 "DESERIALIZATION_ERROR",
469 ),
470 (DataflowError::Unknown("test".to_string()), "UNKNOWN_ERROR"),
471 ];
472
473 for (error, expected_code) in test_cases {
474 let info = ErrorInfo::new(
475 Some("workflow_1".to_string()),
476 Some("task_1".to_string()),
477 error,
478 );
479 assert_eq!(info.code, expected_code);
480 assert_eq!(info.workflow_id, Some("workflow_1".to_string()));
481 assert_eq!(info.task_id, Some("task_1".to_string()));
482 assert!(info.timestamp.is_some());
483 assert_eq!(info.retry_attempted, Some(false));
484 assert_eq!(info.retry_count, Some(0));
485 }
486 }
487
488 #[test]
489 fn test_error_info_simple_constructors() {
490 let error = ErrorInfo::simple(
492 "CUSTOM_ERROR".to_string(),
493 "Custom message".to_string(),
494 Some("data.field".to_string()),
495 );
496 assert_eq!(error.code, "CUSTOM_ERROR");
497 assert_eq!(error.message, "Custom message");
498 assert_eq!(error.path, Some("data.field".to_string()));
499 assert!(error.workflow_id.is_none());
500 assert!(error.task_id.is_none());
501 assert!(error.timestamp.is_some());
502
503 let error = ErrorInfo::simple_ref("REF_ERROR", "Ref message", Some("data.path"));
505 assert_eq!(error.code, "REF_ERROR");
506 assert_eq!(error.message, "Ref message");
507 assert_eq!(error.path, Some("data.path".to_string()));
508
509 let error = ErrorInfo::simple_ref("NO_PATH", "No path message", None);
511 assert!(error.path.is_none());
512 }
513
514 #[test]
515 fn test_error_info_with_retry() {
516 let error = ErrorInfo::simple_ref("TEST", "Test", None);
517 assert!(error.retry_attempted.is_none());
518 assert!(error.retry_count.is_none());
519
520 let error = error.with_retry();
521 assert_eq!(error.retry_attempted, Some(true));
522 assert_eq!(error.retry_count, Some(1));
523
524 let error = error.with_retry();
525 assert_eq!(error.retry_attempted, Some(true));
526 assert_eq!(error.retry_count, Some(2));
527 }
528
529 #[test]
530 fn test_error_display_messages() {
531 assert_eq!(
533 DataflowError::Validation("test".to_string()).to_string(),
534 "Validation error: test"
535 );
536 assert_eq!(
537 DataflowError::Workflow("test".to_string()).to_string(),
538 "Workflow error: test"
539 );
540 assert_eq!(
541 DataflowError::Task("test".to_string()).to_string(),
542 "Task error: test"
543 );
544 assert_eq!(
545 DataflowError::FunctionNotFound("test".to_string()).to_string(),
546 "Function not found: test"
547 );
548 assert_eq!(
549 DataflowError::http(404, "Not Found").to_string(),
550 "HTTP error: 404 - Not Found"
551 );
552 assert_eq!(
553 DataflowError::Timeout("test".to_string()).to_string(),
554 "Timeout error: test"
555 );
556 }
557
558 #[test]
559 fn test_error_conversions() {
560 let json_str = "invalid json";
562 let serde_result: std::result::Result<serde_json::Value, _> =
563 serde_json::from_str(json_str);
564 if let Err(e) = serde_result {
565 let dataflow_err = DataflowError::from_serde(e);
566 assert!(matches!(dataflow_err, DataflowError::Deserialization(_)));
567 }
568 }
569}