dataflow_rs/engine/
error.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5/// Main error type for the dataflow engine
6#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8    /// Validation errors occurring during rule evaluation
9    #[error("Validation error: {0}")]
10    Validation(String),
11
12    /// Errors during function execution
13    #[error("Function execution error: {context}")]
14    FunctionExecution {
15        context: String,
16        #[source]
17        #[serde(skip)]
18        source: Option<Box<DataflowError>>,
19    },
20
21    /// Workflow-related errors
22    #[error("Workflow error: {0}")]
23    Workflow(String),
24
25    /// JSON serialization/deserialization errors
26    #[error("Deserialization error: {0}")]
27    Deserialization(String),
28
29    /// I/O errors (file reading, etc.)
30    #[error("IO error: {0}")]
31    Io(String),
32
33    /// JSONLogic/DataLogic evaluation errors
34    #[error("Logic evaluation error: {0}")]
35    LogicEvaluation(String),
36
37    /// HTTP request errors
38    #[error("HTTP error: {status} - {message}")]
39    Http { status: u16, message: String },
40
41    /// Timeout errors
42    #[error("Timeout error: {0}")]
43    Timeout(String),
44
45    /// Any other errors
46    #[error("Unknown error: {0}")]
47    Unknown(String),
48}
49
50impl DataflowError {
51    /// Creates a new function execution error with context
52    pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
53        DataflowError::FunctionExecution {
54            context: context.into(),
55            source: source.map(Box::new),
56        }
57    }
58
59    /// Creates a new HTTP error
60    pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
61        DataflowError::Http {
62            status,
63            message: message.into(),
64        }
65    }
66
67    /// Convert from std::io::Error
68    pub fn from_io(err: std::io::Error) -> Self {
69        DataflowError::Io(err.to_string())
70    }
71
72    /// Convert from serde_json::Error
73    pub fn from_serde(err: serde_json::Error) -> Self {
74        DataflowError::Deserialization(err.to_string())
75    }
76
77    /// Determines if this error is retryable (worth retrying)
78    ///
79    /// Retryable errors are typically transient infrastructure failures that might succeed on retry.
80    /// Non-retryable errors are typically data validation, logic, or configuration errors that
81    /// will consistently fail on retry.
82    pub fn retryable(&self) -> bool {
83        match self {
84            // Retryable errors - infrastructure/transient failures
85            DataflowError::Http { status, .. } => {
86                // Retry on server errors (5xx) and specific client errors that might be transient
87                *status >= 500 || *status == 429 || *status == 408 || *status == 0
88                // 0 means connection error
89            }
90            DataflowError::Timeout(_) => true,
91            DataflowError::Io(_) => true,
92            DataflowError::FunctionExecution { source, .. } => {
93                // Inherit retryability from the source error if present
94                source.as_ref().map(|e| e.retryable()).unwrap_or(false)
95            }
96
97            // Non-retryable errors - data/logic/configuration issues
98            DataflowError::Validation(_) => false,
99            DataflowError::LogicEvaluation(_) => false,
100            DataflowError::Deserialization(_) => false,
101            DataflowError::Workflow(_) => false,
102            DataflowError::Unknown(_) => false,
103        }
104    }
105}
106
107/// Type alias for Result with DataflowError
108pub type Result<T> = std::result::Result<T, DataflowError>;
109
110/// Structured error information for error tracking in messages
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ErrorInfo {
113    /// ID of the workflow where the error occurred (if available)
114    pub workflow_id: Option<String>,
115
116    /// ID of the task where the error occurred (if available)
117    pub task_id: Option<String>,
118
119    /// Timestamp when the error occurred
120    pub timestamp: String,
121
122    /// The actual error
123    pub error_message: String,
124
125    /// Whether a retry was attempted
126    pub retry_attempted: bool,
127
128    /// Number of retries attempted
129    pub retry_count: u32,
130}
131
132impl ErrorInfo {
133    /// Create a new error info entry
134    pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
135        Self {
136            workflow_id,
137            task_id,
138            timestamp: Utc::now().to_rfc3339(),
139            error_message: error.to_string(),
140            retry_attempted: false,
141            retry_count: 0,
142        }
143    }
144
145    /// Mark that a retry was attempted
146    pub fn with_retry(mut self) -> Self {
147        self.retry_attempted = true;
148        self.retry_count += 1;
149        self
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_retryable_errors() {
159        // Test retryable errors
160        assert!(DataflowError::Http {
161            status: 500,
162            message: "Internal Server Error".to_string()
163        }
164        .retryable());
165        assert!(DataflowError::Http {
166            status: 502,
167            message: "Bad Gateway".to_string()
168        }
169        .retryable());
170        assert!(DataflowError::Http {
171            status: 503,
172            message: "Service Unavailable".to_string()
173        }
174        .retryable());
175        assert!(DataflowError::Http {
176            status: 429,
177            message: "Too Many Requests".to_string()
178        }
179        .retryable());
180        assert!(DataflowError::Http {
181            status: 408,
182            message: "Request Timeout".to_string()
183        }
184        .retryable());
185        assert!(DataflowError::Http {
186            status: 0,
187            message: "Connection Error".to_string()
188        }
189        .retryable());
190        assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
191        assert!(DataflowError::Io("Network error".to_string()).retryable());
192    }
193
194    #[test]
195    fn test_non_retryable_errors() {
196        // Test non-retryable errors
197        assert!(!DataflowError::Http {
198            status: 400,
199            message: "Bad Request".to_string()
200        }
201        .retryable());
202        assert!(!DataflowError::Http {
203            status: 401,
204            message: "Unauthorized".to_string()
205        }
206        .retryable());
207        assert!(!DataflowError::Http {
208            status: 403,
209            message: "Forbidden".to_string()
210        }
211        .retryable());
212        assert!(!DataflowError::Http {
213            status: 404,
214            message: "Not Found".to_string()
215        }
216        .retryable());
217        assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
218        assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
219        assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
220        assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
221        assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
222    }
223
224    #[test]
225    fn test_function_execution_error_retryability() {
226        // Test that function execution errors inherit retryability from source
227        let retryable_source = DataflowError::Http {
228            status: 500,
229            message: "Server Error".to_string(),
230        };
231        let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
232
233        let retryable_func_error =
234            DataflowError::function_execution("HTTP call failed", Some(retryable_source));
235        let non_retryable_func_error =
236            DataflowError::function_execution("Validation failed", Some(non_retryable_source));
237        let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
238
239        assert!(retryable_func_error.retryable());
240        assert!(!non_retryable_func_error.retryable());
241        assert!(!no_source_func_error.retryable());
242    }
243}