dataflow_rs/engine/
error.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5/// Main error type for the dataflow engine
6#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8    /// Validation errors occurring during rule evaluation
9    #[error("Validation error: {0}")]
10    Validation(String),
11
12    /// Errors during function execution
13    #[error("Function execution error: {context}")]
14    FunctionExecution {
15        context: String,
16        #[source]
17        #[serde(skip)]
18        source: Option<Box<DataflowError>>,
19    },
20
21    /// Workflow-related errors
22    #[error("Workflow error: {0}")]
23    Workflow(String),
24
25    /// JSON serialization/deserialization errors
26    #[error("Deserialization error: {0}")]
27    Deserialization(String),
28
29    /// I/O errors (file reading, etc.)
30    #[error("IO error: {0}")]
31    Io(String),
32
33    /// JSONLogic/DataLogic evaluation errors
34    #[error("Logic evaluation error: {0}")]
35    LogicEvaluation(String),
36
37    /// HTTP request errors
38    #[error("HTTP error: {status} - {message}")]
39    Http { status: u16, message: String },
40
41    /// Timeout errors
42    #[error("Timeout error: {0}")]
43    Timeout(String),
44
45    /// Any other errors
46    #[error("Unknown error: {0}")]
47    Unknown(String),
48}
49
50impl DataflowError {
51    /// Creates a new function execution error with context
52    pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
53        DataflowError::FunctionExecution {
54            context: context.into(),
55            source: source.map(Box::new),
56        }
57    }
58
59    /// Creates a new HTTP error
60    pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
61        DataflowError::Http {
62            status,
63            message: message.into(),
64        }
65    }
66
67    /// Convert from std::io::Error
68    pub fn from_io(err: std::io::Error) -> Self {
69        DataflowError::Io(err.to_string())
70    }
71
72    /// Convert from serde_json::Error
73    pub fn from_serde(err: serde_json::Error) -> Self {
74        DataflowError::Deserialization(err.to_string())
75    }
76
77    /// Determines if this error is retryable (worth retrying)
78    ///
79    /// Retryable errors are typically transient infrastructure failures that might succeed on retry.
80    /// Non-retryable errors are typically data validation, logic, or configuration errors that
81    /// will consistently fail on retry.
82    pub fn retryable(&self) -> bool {
83        match self {
84            // Retryable errors - infrastructure/transient failures
85            DataflowError::Http { status, .. } => {
86                // Retry on server errors (5xx) and specific client errors that might be transient
87                *status >= 500 || *status == 429 || *status == 408 || *status == 0
88                // 0 means connection error
89            }
90            DataflowError::Timeout(_) => true,
91            DataflowError::Io(_) => true,
92            DataflowError::FunctionExecution { source, .. } => {
93                // Inherit retryability from the source error if present
94                source.as_ref().map(|e| e.retryable()).unwrap_or(false)
95            }
96
97            // Non-retryable errors - data/logic/configuration issues
98            DataflowError::Validation(_) => false,
99            DataflowError::LogicEvaluation(_) => false,
100            DataflowError::Deserialization(_) => false,
101            DataflowError::Workflow(_) => false,
102            DataflowError::Unknown(_) => false,
103        }
104    }
105}
106
107/// Type alias for Result with DataflowError
108pub type Result<T> = std::result::Result<T, DataflowError>;
109
110/// Structured error information for error tracking in messages
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ErrorInfo {
113    /// ID of the workflow where the error occurred (if available)
114    pub workflow_id: Option<String>,
115
116    /// ID of the task where the error occurred (if available)
117    pub task_id: Option<String>,
118
119    /// Timestamp when the error occurred
120    pub timestamp: String,
121
122    /// The actual error
123    pub error_message: String,
124
125    /// Whether a retry was attempted
126    pub retry_attempted: bool,
127
128    /// Number of retries attempted
129    pub retry_count: u32,
130}
131
132impl ErrorInfo {
133    /// Create a new error info entry
134    pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
135        Self {
136            workflow_id,
137            task_id,
138            timestamp: Utc::now().to_rfc3339(),
139            error_message: error.to_string(),
140            retry_attempted: false,
141            retry_count: 0,
142        }
143    }
144
145    /// Mark that a retry was attempted
146    pub fn with_retry(mut self) -> Self {
147        self.retry_attempted = true;
148        self.retry_count += 1;
149        self
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_retryable_errors() {
159        // Test retryable errors
160        assert!(
161            DataflowError::Http {
162                status: 500,
163                message: "Internal Server Error".to_string()
164            }
165            .retryable()
166        );
167        assert!(
168            DataflowError::Http {
169                status: 502,
170                message: "Bad Gateway".to_string()
171            }
172            .retryable()
173        );
174        assert!(
175            DataflowError::Http {
176                status: 503,
177                message: "Service Unavailable".to_string()
178            }
179            .retryable()
180        );
181        assert!(
182            DataflowError::Http {
183                status: 429,
184                message: "Too Many Requests".to_string()
185            }
186            .retryable()
187        );
188        assert!(
189            DataflowError::Http {
190                status: 408,
191                message: "Request Timeout".to_string()
192            }
193            .retryable()
194        );
195        assert!(
196            DataflowError::Http {
197                status: 0,
198                message: "Connection Error".to_string()
199            }
200            .retryable()
201        );
202        assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
203        assert!(DataflowError::Io("Network error".to_string()).retryable());
204    }
205
206    #[test]
207    fn test_non_retryable_errors() {
208        // Test non-retryable errors
209        assert!(
210            !DataflowError::Http {
211                status: 400,
212                message: "Bad Request".to_string()
213            }
214            .retryable()
215        );
216        assert!(
217            !DataflowError::Http {
218                status: 401,
219                message: "Unauthorized".to_string()
220            }
221            .retryable()
222        );
223        assert!(
224            !DataflowError::Http {
225                status: 403,
226                message: "Forbidden".to_string()
227            }
228            .retryable()
229        );
230        assert!(
231            !DataflowError::Http {
232                status: 404,
233                message: "Not Found".to_string()
234            }
235            .retryable()
236        );
237        assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
238        assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
239        assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
240        assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
241        assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
242    }
243
244    #[test]
245    fn test_function_execution_error_retryability() {
246        // Test that function execution errors inherit retryability from source
247        let retryable_source = DataflowError::Http {
248            status: 500,
249            message: "Server Error".to_string(),
250        };
251        let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
252
253        let retryable_func_error =
254            DataflowError::function_execution("HTTP call failed", Some(retryable_source));
255        let non_retryable_func_error =
256            DataflowError::function_execution("Validation failed", Some(non_retryable_source));
257        let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
258
259        assert!(retryable_func_error.retryable());
260        assert!(!non_retryable_func_error.retryable());
261        assert!(!no_source_func_error.retryable());
262    }
263}