Skip to main content

dataflow_rs/engine/
error.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5/// Main error type for the dataflow engine
6#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8    /// Validation errors occurring during rule evaluation
9    #[error("Validation error: {0}")]
10    Validation(String),
11
12    /// Errors during function execution
13    #[error("Function execution error: {context}")]
14    FunctionExecution {
15        context: String,
16        #[source]
17        #[serde(skip)]
18        source: Option<Box<DataflowError>>,
19    },
20
21    /// Workflow-related errors
22    #[error("Workflow error: {0}")]
23    Workflow(String),
24
25    /// Task-related errors
26    #[error("Task error: {0}")]
27    Task(String),
28
29    /// Function not found errors
30    #[error("Function not found: {0}")]
31    FunctionNotFound(String),
32
33    /// JSON serialization/deserialization errors
34    #[error("Deserialization error: {0}")]
35    Deserialization(String),
36
37    /// I/O errors (file reading, etc.)
38    #[error("IO error: {0}")]
39    Io(String),
40
41    /// JSONLogic/DataLogic evaluation errors
42    #[error("Logic evaluation error: {0}")]
43    LogicEvaluation(String),
44
45    /// HTTP request errors
46    #[error("HTTP error: {status} - {message}")]
47    Http { status: u16, message: String },
48
49    /// Timeout errors
50    #[error("Timeout error: {0}")]
51    Timeout(String),
52
53    /// Any other errors
54    #[error("Unknown error: {0}")]
55    Unknown(String),
56}
57
58impl DataflowError {
59    /// Creates a new function execution error with context
60    pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
61        DataflowError::FunctionExecution {
62            context: context.into(),
63            source: source.map(Box::new),
64        }
65    }
66
67    /// Creates a new HTTP error
68    pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
69        DataflowError::Http {
70            status,
71            message: message.into(),
72        }
73    }
74
75    /// Convert from std::io::Error
76    pub fn from_io(err: std::io::Error) -> Self {
77        DataflowError::Io(err.to_string())
78    }
79
80    /// Convert from serde_json::Error
81    pub fn from_serde(err: serde_json::Error) -> Self {
82        DataflowError::Deserialization(err.to_string())
83    }
84
85    /// Determines if this error is retryable (worth retrying)
86    ///
87    /// Retryable errors are typically transient infrastructure failures that might succeed on retry.
88    /// Non-retryable errors are typically data validation, logic, or configuration errors that
89    /// will consistently fail on retry.
90    pub fn retryable(&self) -> bool {
91        match self {
92            // Retryable errors - infrastructure/transient failures
93            DataflowError::Http { status, .. } => {
94                // Retry on server errors (5xx) and specific client errors that might be transient
95                *status >= 500 || *status == 429 || *status == 408 || *status == 0
96                // 0 means connection error
97            }
98            DataflowError::Timeout(_) => true,
99            DataflowError::Io(_) => true,
100            DataflowError::FunctionExecution { source, .. } => {
101                // Inherit retryability from the source error if present
102                source.as_ref().map(|e| e.retryable()).unwrap_or(false)
103            }
104
105            // Non-retryable errors - data/logic/configuration issues
106            DataflowError::Validation(_) => false,
107            DataflowError::LogicEvaluation(_) => false,
108            DataflowError::Deserialization(_) => false,
109            DataflowError::Workflow(_) => false,
110            DataflowError::Task(_) => false,
111            DataflowError::FunctionNotFound(_) => false,
112            DataflowError::Unknown(_) => false,
113        }
114    }
115}
116
117/// Type alias for Result with DataflowError
118pub type Result<T> = std::result::Result<T, DataflowError>;
119
120/// Structured error information for error tracking in messages
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ErrorInfo {
123    /// Error code (e.g., "WORKFLOW_ERROR", "TASK_ERROR", "VALIDATION_ERROR")
124    pub code: String,
125
126    /// Human-readable error message
127    pub message: String,
128
129    /// Optional path to the error location (e.g., "workflow.id", "task.id", "data.field")
130    pub path: Option<String>,
131
132    /// ID of the workflow where the error occurred (if available)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub workflow_id: Option<String>,
135
136    /// ID of the task where the error occurred (if available)
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub task_id: Option<String>,
139
140    /// Timestamp when the error occurred
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub timestamp: Option<String>,
143
144    /// Whether a retry was attempted
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub retry_attempted: Option<bool>,
147
148    /// Number of retries attempted
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub retry_count: Option<u32>,
151}
152
153impl ErrorInfo {
154    /// Create a new error info entry with all fields
155    pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
156        Self {
157            code: match &error {
158                DataflowError::Validation(_) => "VALIDATION_ERROR".to_string(),
159                DataflowError::Workflow(_) => "WORKFLOW_ERROR".to_string(),
160                DataflowError::Task(_) => "TASK_ERROR".to_string(),
161                DataflowError::FunctionNotFound(_) => "FUNCTION_NOT_FOUND".to_string(),
162                DataflowError::FunctionExecution { .. } => "FUNCTION_ERROR".to_string(),
163                DataflowError::LogicEvaluation(_) => "LOGIC_ERROR".to_string(),
164                DataflowError::Http { .. } => "HTTP_ERROR".to_string(),
165                DataflowError::Timeout(_) => "TIMEOUT_ERROR".to_string(),
166                DataflowError::Io(_) => "IO_ERROR".to_string(),
167                DataflowError::Deserialization(_) => "DESERIALIZATION_ERROR".to_string(),
168                DataflowError::Unknown(_) => "UNKNOWN_ERROR".to_string(),
169            },
170            message: error.to_string(),
171            path: None,
172            workflow_id,
173            task_id,
174            timestamp: Some(Utc::now().to_rfc3339()),
175            retry_attempted: Some(false),
176            retry_count: Some(0),
177        }
178    }
179
180    /// Create a simple error info with just code, message, and optional path
181    pub fn simple(code: String, message: String, path: Option<String>) -> Self {
182        Self {
183            code,
184            message,
185            path,
186            workflow_id: None,
187            task_id: None,
188            timestamp: Some(Utc::now().to_rfc3339()),
189            retry_attempted: None,
190            retry_count: None,
191        }
192    }
193
194    /// Create a simple error info from references (avoids cloning when possible)
195    pub fn simple_ref(code: &str, message: &str, path: Option<&str>) -> Self {
196        Self {
197            code: code.to_string(),
198            message: message.to_string(),
199            path: path.map(|s| s.to_string()),
200            workflow_id: None,
201            task_id: None,
202            timestamp: Some(Utc::now().to_rfc3339()),
203            retry_attempted: None,
204            retry_count: None,
205        }
206    }
207
208    /// Mark that a retry was attempted
209    pub fn with_retry(mut self) -> Self {
210        self.retry_attempted = Some(true);
211        self.retry_count = Some(self.retry_count.unwrap_or(0) + 1);
212        self
213    }
214
215    /// Create a builder for ErrorInfo
216    pub fn builder(code: impl Into<String>, message: impl Into<String>) -> ErrorInfoBuilder {
217        ErrorInfoBuilder::new(code, message)
218    }
219}
220
221/// Builder for creating ErrorInfo instances with a fluent API
222#[must_use = "ErrorInfoBuilder must be `.build()` to produce an ErrorInfo"]
223pub struct ErrorInfoBuilder {
224    code: String,
225    message: String,
226    path: Option<String>,
227    workflow_id: Option<String>,
228    task_id: Option<String>,
229    timestamp: Option<String>,
230    retry_attempted: Option<bool>,
231    retry_count: Option<u32>,
232}
233
234impl ErrorInfoBuilder {
235    /// Create a new ErrorInfoBuilder with required fields
236    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
237        Self {
238            code: code.into(),
239            message: message.into(),
240            path: None,
241            workflow_id: None,
242            task_id: None,
243            timestamp: Some(Utc::now().to_rfc3339()),
244            retry_attempted: None,
245            retry_count: None,
246        }
247    }
248
249    /// Set the error path
250    pub fn path(mut self, path: impl Into<String>) -> Self {
251        self.path = Some(path.into());
252        self
253    }
254
255    /// Set the workflow ID
256    pub fn workflow_id(mut self, id: impl Into<String>) -> Self {
257        self.workflow_id = Some(id.into());
258        self
259    }
260
261    /// Set the task ID
262    pub fn task_id(mut self, id: impl Into<String>) -> Self {
263        self.task_id = Some(id.into());
264        self
265    }
266
267    /// Set custom timestamp (defaults to now if not set)
268    pub fn timestamp(mut self, timestamp: impl Into<String>) -> Self {
269        self.timestamp = Some(timestamp.into());
270        self
271    }
272
273    /// Mark as retry attempted
274    pub fn retry_attempted(mut self, attempted: bool) -> Self {
275        self.retry_attempted = Some(attempted);
276        self
277    }
278
279    /// Set retry count
280    pub fn retry_count(mut self, count: u32) -> Self {
281        self.retry_count = Some(count);
282        self
283    }
284
285    /// Build the ErrorInfo instance
286    pub fn build(self) -> ErrorInfo {
287        ErrorInfo {
288            code: self.code,
289            message: self.message,
290            path: self.path,
291            workflow_id: self.workflow_id,
292            task_id: self.task_id,
293            timestamp: self.timestamp,
294            retry_attempted: self.retry_attempted,
295            retry_count: self.retry_count,
296        }
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn test_retryable_errors() {
306        // Test retryable errors
307        assert!(
308            DataflowError::Http {
309                status: 500,
310                message: "Internal Server Error".to_string()
311            }
312            .retryable()
313        );
314        assert!(
315            DataflowError::Http {
316                status: 502,
317                message: "Bad Gateway".to_string()
318            }
319            .retryable()
320        );
321        assert!(
322            DataflowError::Http {
323                status: 503,
324                message: "Service Unavailable".to_string()
325            }
326            .retryable()
327        );
328        assert!(
329            DataflowError::Http {
330                status: 429,
331                message: "Too Many Requests".to_string()
332            }
333            .retryable()
334        );
335        assert!(
336            DataflowError::Http {
337                status: 408,
338                message: "Request Timeout".to_string()
339            }
340            .retryable()
341        );
342        assert!(
343            DataflowError::Http {
344                status: 0,
345                message: "Connection Error".to_string()
346            }
347            .retryable()
348        );
349        assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
350        assert!(DataflowError::Io("Network error".to_string()).retryable());
351    }
352
353    #[test]
354    fn test_non_retryable_errors() {
355        // Test non-retryable errors
356        assert!(
357            !DataflowError::Http {
358                status: 400,
359                message: "Bad Request".to_string()
360            }
361            .retryable()
362        );
363        assert!(
364            !DataflowError::Http {
365                status: 401,
366                message: "Unauthorized".to_string()
367            }
368            .retryable()
369        );
370        assert!(
371            !DataflowError::Http {
372                status: 403,
373                message: "Forbidden".to_string()
374            }
375            .retryable()
376        );
377        assert!(
378            !DataflowError::Http {
379                status: 404,
380                message: "Not Found".to_string()
381            }
382            .retryable()
383        );
384        assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
385        assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
386        assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
387        assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
388        assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
389    }
390
391    #[test]
392    fn test_function_execution_error_retryability() {
393        // Test that function execution errors inherit retryability from source
394        let retryable_source = DataflowError::Http {
395            status: 500,
396            message: "Server Error".to_string(),
397        };
398        let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
399
400        let retryable_func_error =
401            DataflowError::function_execution("HTTP call failed", Some(retryable_source));
402        let non_retryable_func_error =
403            DataflowError::function_execution("Validation failed", Some(non_retryable_source));
404        let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
405
406        assert!(retryable_func_error.retryable());
407        assert!(!non_retryable_func_error.retryable());
408        assert!(!no_source_func_error.retryable());
409    }
410
411    #[test]
412    fn test_error_info_builder() {
413        // Test basic builder
414        let error = ErrorInfo::builder("TEST_ERROR", "Test message").build();
415        assert_eq!(error.code, "TEST_ERROR");
416        assert_eq!(error.message, "Test message");
417        assert!(error.timestamp.is_some());
418        assert!(error.path.is_none());
419
420        // Test full builder
421        let error = ErrorInfo::builder("VALIDATION_ERROR", "Field validation failed")
422            .path("data.email")
423            .workflow_id("workflow_1")
424            .task_id("validate_email")
425            .retry_attempted(true)
426            .retry_count(2)
427            .build();
428
429        assert_eq!(error.code, "VALIDATION_ERROR");
430        assert_eq!(error.message, "Field validation failed");
431        assert_eq!(error.path, Some("data.email".to_string()));
432        assert_eq!(error.workflow_id, Some("workflow_1".to_string()));
433        assert_eq!(error.task_id, Some("validate_email".to_string()));
434        assert_eq!(error.retry_attempted, Some(true));
435        assert_eq!(error.retry_count, Some(2));
436    }
437
438    #[test]
439    fn test_error_info_new_from_dataflow_error() {
440        // Test ErrorInfo::new generates correct codes for each error type
441        let test_cases = vec![
442            (
443                DataflowError::Validation("test".to_string()),
444                "VALIDATION_ERROR",
445            ),
446            (
447                DataflowError::Workflow("test".to_string()),
448                "WORKFLOW_ERROR",
449            ),
450            (DataflowError::Task("test".to_string()), "TASK_ERROR"),
451            (
452                DataflowError::FunctionNotFound("test".to_string()),
453                "FUNCTION_NOT_FOUND",
454            ),
455            (
456                DataflowError::function_execution("test", None),
457                "FUNCTION_ERROR",
458            ),
459            (
460                DataflowError::LogicEvaluation("test".to_string()),
461                "LOGIC_ERROR",
462            ),
463            (DataflowError::http(404, "Not Found"), "HTTP_ERROR"),
464            (DataflowError::Timeout("test".to_string()), "TIMEOUT_ERROR"),
465            (DataflowError::Io("test".to_string()), "IO_ERROR"),
466            (
467                DataflowError::Deserialization("test".to_string()),
468                "DESERIALIZATION_ERROR",
469            ),
470            (DataflowError::Unknown("test".to_string()), "UNKNOWN_ERROR"),
471        ];
472
473        for (error, expected_code) in test_cases {
474            let info = ErrorInfo::new(
475                Some("workflow_1".to_string()),
476                Some("task_1".to_string()),
477                error,
478            );
479            assert_eq!(info.code, expected_code);
480            assert_eq!(info.workflow_id, Some("workflow_1".to_string()));
481            assert_eq!(info.task_id, Some("task_1".to_string()));
482            assert!(info.timestamp.is_some());
483            assert_eq!(info.retry_attempted, Some(false));
484            assert_eq!(info.retry_count, Some(0));
485        }
486    }
487
488    #[test]
489    fn test_error_info_simple_constructors() {
490        // Test simple constructor
491        let error = ErrorInfo::simple(
492            "CUSTOM_ERROR".to_string(),
493            "Custom message".to_string(),
494            Some("data.field".to_string()),
495        );
496        assert_eq!(error.code, "CUSTOM_ERROR");
497        assert_eq!(error.message, "Custom message");
498        assert_eq!(error.path, Some("data.field".to_string()));
499        assert!(error.workflow_id.is_none());
500        assert!(error.task_id.is_none());
501        assert!(error.timestamp.is_some());
502
503        // Test simple_ref constructor
504        let error = ErrorInfo::simple_ref("REF_ERROR", "Ref message", Some("data.path"));
505        assert_eq!(error.code, "REF_ERROR");
506        assert_eq!(error.message, "Ref message");
507        assert_eq!(error.path, Some("data.path".to_string()));
508
509        // Test simple_ref with None path
510        let error = ErrorInfo::simple_ref("NO_PATH", "No path message", None);
511        assert!(error.path.is_none());
512    }
513
514    #[test]
515    fn test_error_info_with_retry() {
516        let error = ErrorInfo::simple_ref("TEST", "Test", None);
517        assert!(error.retry_attempted.is_none());
518        assert!(error.retry_count.is_none());
519
520        let error = error.with_retry();
521        assert_eq!(error.retry_attempted, Some(true));
522        assert_eq!(error.retry_count, Some(1));
523
524        let error = error.with_retry();
525        assert_eq!(error.retry_attempted, Some(true));
526        assert_eq!(error.retry_count, Some(2));
527    }
528
529    #[test]
530    fn test_error_display_messages() {
531        // Test that error display messages are correct
532        assert_eq!(
533            DataflowError::Validation("test".to_string()).to_string(),
534            "Validation error: test"
535        );
536        assert_eq!(
537            DataflowError::Workflow("test".to_string()).to_string(),
538            "Workflow error: test"
539        );
540        assert_eq!(
541            DataflowError::Task("test".to_string()).to_string(),
542            "Task error: test"
543        );
544        assert_eq!(
545            DataflowError::FunctionNotFound("test".to_string()).to_string(),
546            "Function not found: test"
547        );
548        assert_eq!(
549            DataflowError::http(404, "Not Found").to_string(),
550            "HTTP error: 404 - Not Found"
551        );
552        assert_eq!(
553            DataflowError::Timeout("test".to_string()).to_string(),
554            "Timeout error: test"
555        );
556    }
557
558    #[test]
559    fn test_error_conversions() {
560        // Test from_serde (we can't easily create a real serde error, but we can test the conversion works)
561        let json_str = "invalid json";
562        let serde_result: std::result::Result<serde_json::Value, _> =
563            serde_json::from_str(json_str);
564        if let Err(e) = serde_result {
565            let dataflow_err = DataflowError::from_serde(e);
566            assert!(matches!(dataflow_err, DataflowError::Deserialization(_)));
567        }
568    }
569}