dataflow_rs/engine/
error.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5/// Main error type for the dataflow engine
6#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8    /// Validation errors occurring during rule evaluation
9    #[error("Validation error: {0}")]
10    Validation(String),
11
12    /// Errors during function execution
13    #[error("Function execution error: {context}")]
14    FunctionExecution {
15        context: String,
16        #[source]
17        #[serde(skip)]
18        source: Option<Box<DataflowError>>,
19    },
20
21    /// Workflow-related errors
22    #[error("Workflow error: {0}")]
23    Workflow(String),
24
25    /// Task-related errors
26    #[error("Task error: {0}")]
27    Task(String),
28
29    /// Function not found errors
30    #[error("Function not found: {0}")]
31    FunctionNotFound(String),
32
33    /// JSON serialization/deserialization errors
34    #[error("Deserialization error: {0}")]
35    Deserialization(String),
36
37    /// I/O errors (file reading, etc.)
38    #[error("IO error: {0}")]
39    Io(String),
40
41    /// JSONLogic/DataLogic evaluation errors
42    #[error("Logic evaluation error: {0}")]
43    LogicEvaluation(String),
44
45    /// HTTP request errors
46    #[error("HTTP error: {status} - {message}")]
47    Http { status: u16, message: String },
48
49    /// Timeout errors
50    #[error("Timeout error: {0}")]
51    Timeout(String),
52
53    /// Any other errors
54    #[error("Unknown error: {0}")]
55    Unknown(String),
56}
57
58impl DataflowError {
59    /// Creates a new function execution error with context
60    pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
61        DataflowError::FunctionExecution {
62            context: context.into(),
63            source: source.map(Box::new),
64        }
65    }
66
67    /// Creates a new HTTP error
68    pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
69        DataflowError::Http {
70            status,
71            message: message.into(),
72        }
73    }
74
75    /// Convert from std::io::Error
76    pub fn from_io(err: std::io::Error) -> Self {
77        DataflowError::Io(err.to_string())
78    }
79
80    /// Convert from serde_json::Error
81    pub fn from_serde(err: serde_json::Error) -> Self {
82        DataflowError::Deserialization(err.to_string())
83    }
84
85    /// Determines if this error is retryable (worth retrying)
86    ///
87    /// Retryable errors are typically transient infrastructure failures that might succeed on retry.
88    /// Non-retryable errors are typically data validation, logic, or configuration errors that
89    /// will consistently fail on retry.
90    pub fn retryable(&self) -> bool {
91        match self {
92            // Retryable errors - infrastructure/transient failures
93            DataflowError::Http { status, .. } => {
94                // Retry on server errors (5xx) and specific client errors that might be transient
95                *status >= 500 || *status == 429 || *status == 408 || *status == 0
96                // 0 means connection error
97            }
98            DataflowError::Timeout(_) => true,
99            DataflowError::Io(_) => true,
100            DataflowError::FunctionExecution { source, .. } => {
101                // Inherit retryability from the source error if present
102                source.as_ref().map(|e| e.retryable()).unwrap_or(false)
103            }
104
105            // Non-retryable errors - data/logic/configuration issues
106            DataflowError::Validation(_) => false,
107            DataflowError::LogicEvaluation(_) => false,
108            DataflowError::Deserialization(_) => false,
109            DataflowError::Workflow(_) => false,
110            DataflowError::Task(_) => false,
111            DataflowError::FunctionNotFound(_) => false,
112            DataflowError::Unknown(_) => false,
113        }
114    }
115}
116
117/// Type alias for Result with DataflowError
118pub type Result<T> = std::result::Result<T, DataflowError>;
119
120/// Structured error information for error tracking in messages
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ErrorInfo {
123    /// Error code (e.g., "WORKFLOW_ERROR", "TASK_ERROR", "VALIDATION_ERROR")
124    pub code: String,
125
126    /// Human-readable error message
127    pub message: String,
128
129    /// Optional path to the error location (e.g., "workflow.id", "task.id", "data.field")
130    pub path: Option<String>,
131
132    /// ID of the workflow where the error occurred (if available)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub workflow_id: Option<String>,
135
136    /// ID of the task where the error occurred (if available)
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub task_id: Option<String>,
139
140    /// Timestamp when the error occurred
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub timestamp: Option<String>,
143
144    /// Whether a retry was attempted
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub retry_attempted: Option<bool>,
147
148    /// Number of retries attempted
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub retry_count: Option<u32>,
151}
152
153impl ErrorInfo {
154    /// Create a new error info entry with all fields
155    pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
156        Self {
157            code: match &error {
158                DataflowError::Validation(_) => "VALIDATION_ERROR".to_string(),
159                DataflowError::Workflow(_) => "WORKFLOW_ERROR".to_string(),
160                DataflowError::Task(_) => "TASK_ERROR".to_string(),
161                DataflowError::FunctionNotFound(_) => "FUNCTION_NOT_FOUND".to_string(),
162                DataflowError::FunctionExecution { .. } => "FUNCTION_ERROR".to_string(),
163                DataflowError::LogicEvaluation(_) => "LOGIC_ERROR".to_string(),
164                DataflowError::Http { .. } => "HTTP_ERROR".to_string(),
165                DataflowError::Timeout(_) => "TIMEOUT_ERROR".to_string(),
166                DataflowError::Io(_) => "IO_ERROR".to_string(),
167                DataflowError::Deserialization(_) => "DESERIALIZATION_ERROR".to_string(),
168                DataflowError::Unknown(_) => "UNKNOWN_ERROR".to_string(),
169            },
170            message: error.to_string(),
171            path: None,
172            workflow_id,
173            task_id,
174            timestamp: Some(Utc::now().to_rfc3339()),
175            retry_attempted: Some(false),
176            retry_count: Some(0),
177        }
178    }
179
180    /// Create a simple error info with just code, message, and optional path
181    pub fn simple(code: String, message: String, path: Option<String>) -> Self {
182        Self {
183            code,
184            message,
185            path,
186            workflow_id: None,
187            task_id: None,
188            timestamp: Some(Utc::now().to_rfc3339()),
189            retry_attempted: None,
190            retry_count: None,
191        }
192    }
193
194    /// Create a simple error info from references (avoids cloning when possible)
195    pub fn simple_ref(code: &str, message: &str, path: Option<&str>) -> Self {
196        Self {
197            code: code.to_string(),
198            message: message.to_string(),
199            path: path.map(|s| s.to_string()),
200            workflow_id: None,
201            task_id: None,
202            timestamp: Some(Utc::now().to_rfc3339()),
203            retry_attempted: None,
204            retry_count: None,
205        }
206    }
207
208    /// Mark that a retry was attempted
209    pub fn with_retry(mut self) -> Self {
210        self.retry_attempted = Some(true);
211        self.retry_count = Some(self.retry_count.unwrap_or(0) + 1);
212        self
213    }
214
215    /// Create a builder for ErrorInfo
216    pub fn builder(code: impl Into<String>, message: impl Into<String>) -> ErrorInfoBuilder {
217        ErrorInfoBuilder::new(code, message)
218    }
219}
220
221/// Builder for creating ErrorInfo instances with a fluent API
222pub struct ErrorInfoBuilder {
223    code: String,
224    message: String,
225    path: Option<String>,
226    workflow_id: Option<String>,
227    task_id: Option<String>,
228    timestamp: Option<String>,
229    retry_attempted: Option<bool>,
230    retry_count: Option<u32>,
231}
232
233impl ErrorInfoBuilder {
234    /// Create a new ErrorInfoBuilder with required fields
235    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
236        Self {
237            code: code.into(),
238            message: message.into(),
239            path: None,
240            workflow_id: None,
241            task_id: None,
242            timestamp: Some(Utc::now().to_rfc3339()),
243            retry_attempted: None,
244            retry_count: None,
245        }
246    }
247
248    /// Set the error path
249    pub fn path(mut self, path: impl Into<String>) -> Self {
250        self.path = Some(path.into());
251        self
252    }
253
254    /// Set the workflow ID
255    pub fn workflow_id(mut self, id: impl Into<String>) -> Self {
256        self.workflow_id = Some(id.into());
257        self
258    }
259
260    /// Set the task ID
261    pub fn task_id(mut self, id: impl Into<String>) -> Self {
262        self.task_id = Some(id.into());
263        self
264    }
265
266    /// Set custom timestamp (defaults to now if not set)
267    pub fn timestamp(mut self, timestamp: impl Into<String>) -> Self {
268        self.timestamp = Some(timestamp.into());
269        self
270    }
271
272    /// Mark as retry attempted
273    pub fn retry_attempted(mut self, attempted: bool) -> Self {
274        self.retry_attempted = Some(attempted);
275        self
276    }
277
278    /// Set retry count
279    pub fn retry_count(mut self, count: u32) -> Self {
280        self.retry_count = Some(count);
281        self
282    }
283
284    /// Build the ErrorInfo instance
285    pub fn build(self) -> ErrorInfo {
286        ErrorInfo {
287            code: self.code,
288            message: self.message,
289            path: self.path,
290            workflow_id: self.workflow_id,
291            task_id: self.task_id,
292            timestamp: self.timestamp,
293            retry_attempted: self.retry_attempted,
294            retry_count: self.retry_count,
295        }
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_retryable_errors() {
305        // Test retryable errors
306        assert!(
307            DataflowError::Http {
308                status: 500,
309                message: "Internal Server Error".to_string()
310            }
311            .retryable()
312        );
313        assert!(
314            DataflowError::Http {
315                status: 502,
316                message: "Bad Gateway".to_string()
317            }
318            .retryable()
319        );
320        assert!(
321            DataflowError::Http {
322                status: 503,
323                message: "Service Unavailable".to_string()
324            }
325            .retryable()
326        );
327        assert!(
328            DataflowError::Http {
329                status: 429,
330                message: "Too Many Requests".to_string()
331            }
332            .retryable()
333        );
334        assert!(
335            DataflowError::Http {
336                status: 408,
337                message: "Request Timeout".to_string()
338            }
339            .retryable()
340        );
341        assert!(
342            DataflowError::Http {
343                status: 0,
344                message: "Connection Error".to_string()
345            }
346            .retryable()
347        );
348        assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
349        assert!(DataflowError::Io("Network error".to_string()).retryable());
350    }
351
352    #[test]
353    fn test_non_retryable_errors() {
354        // Test non-retryable errors
355        assert!(
356            !DataflowError::Http {
357                status: 400,
358                message: "Bad Request".to_string()
359            }
360            .retryable()
361        );
362        assert!(
363            !DataflowError::Http {
364                status: 401,
365                message: "Unauthorized".to_string()
366            }
367            .retryable()
368        );
369        assert!(
370            !DataflowError::Http {
371                status: 403,
372                message: "Forbidden".to_string()
373            }
374            .retryable()
375        );
376        assert!(
377            !DataflowError::Http {
378                status: 404,
379                message: "Not Found".to_string()
380            }
381            .retryable()
382        );
383        assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
384        assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
385        assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
386        assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
387        assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
388    }
389
390    #[test]
391    fn test_function_execution_error_retryability() {
392        // Test that function execution errors inherit retryability from source
393        let retryable_source = DataflowError::Http {
394            status: 500,
395            message: "Server Error".to_string(),
396        };
397        let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
398
399        let retryable_func_error =
400            DataflowError::function_execution("HTTP call failed", Some(retryable_source));
401        let non_retryable_func_error =
402            DataflowError::function_execution("Validation failed", Some(non_retryable_source));
403        let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
404
405        assert!(retryable_func_error.retryable());
406        assert!(!non_retryable_func_error.retryable());
407        assert!(!no_source_func_error.retryable());
408    }
409
410    #[test]
411    fn test_error_info_builder() {
412        // Test basic builder
413        let error = ErrorInfo::builder("TEST_ERROR", "Test message").build();
414        assert_eq!(error.code, "TEST_ERROR");
415        assert_eq!(error.message, "Test message");
416        assert!(error.timestamp.is_some());
417        assert!(error.path.is_none());
418
419        // Test full builder
420        let error = ErrorInfo::builder("VALIDATION_ERROR", "Field validation failed")
421            .path("data.email")
422            .workflow_id("workflow_1")
423            .task_id("validate_email")
424            .retry_attempted(true)
425            .retry_count(2)
426            .build();
427
428        assert_eq!(error.code, "VALIDATION_ERROR");
429        assert_eq!(error.message, "Field validation failed");
430        assert_eq!(error.path, Some("data.email".to_string()));
431        assert_eq!(error.workflow_id, Some("workflow_1".to_string()));
432        assert_eq!(error.task_id, Some("validate_email".to_string()));
433        assert_eq!(error.retry_attempted, Some(true));
434        assert_eq!(error.retry_count, Some(2));
435    }
436}