dataflow_rs/engine/
error.rs1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8 #[error("Validation error: {0}")]
10 Validation(String),
11
12 #[error("Function execution error: {context}")]
14 FunctionExecution {
15 context: String,
16 #[source]
17 #[serde(skip)]
18 source: Option<Box<DataflowError>>,
19 },
20
21 #[error("Workflow error: {0}")]
23 Workflow(String),
24
25 #[error("Deserialization error: {0}")]
27 Deserialization(String),
28
29 #[error("IO error: {0}")]
31 Io(String),
32
33 #[error("Logic evaluation error: {0}")]
35 LogicEvaluation(String),
36
37 #[error("HTTP error: {status} - {message}")]
39 Http { status: u16, message: String },
40
41 #[error("Timeout error: {0}")]
43 Timeout(String),
44
45 #[error("Unknown error: {0}")]
47 Unknown(String),
48}
49
50impl DataflowError {
51 pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
53 DataflowError::FunctionExecution {
54 context: context.into(),
55 source: source.map(Box::new),
56 }
57 }
58
59 pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
61 DataflowError::Http {
62 status,
63 message: message.into(),
64 }
65 }
66
67 pub fn from_io(err: std::io::Error) -> Self {
69 DataflowError::Io(err.to_string())
70 }
71
72 pub fn from_serde(err: serde_json::Error) -> Self {
74 DataflowError::Deserialization(err.to_string())
75 }
76
77 pub fn retryable(&self) -> bool {
83 match self {
84 DataflowError::Http { status, .. } => {
86 *status >= 500 || *status == 429 || *status == 408 || *status == 0
88 }
90 DataflowError::Timeout(_) => true,
91 DataflowError::Io(_) => true,
92 DataflowError::FunctionExecution { source, .. } => {
93 source.as_ref().map(|e| e.retryable()).unwrap_or(false)
95 }
96
97 DataflowError::Validation(_) => false,
99 DataflowError::LogicEvaluation(_) => false,
100 DataflowError::Deserialization(_) => false,
101 DataflowError::Workflow(_) => false,
102 DataflowError::Unknown(_) => false,
103 }
104 }
105}
106
107pub type Result<T> = std::result::Result<T, DataflowError>;
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ErrorInfo {
113 pub workflow_id: Option<String>,
115
116 pub task_id: Option<String>,
118
119 pub timestamp: String,
121
122 pub error_message: String,
124
125 pub retry_attempted: bool,
127
128 pub retry_count: u32,
130}
131
132impl ErrorInfo {
133 pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
135 Self {
136 workflow_id,
137 task_id,
138 timestamp: Utc::now().to_rfc3339(),
139 error_message: error.to_string(),
140 retry_attempted: false,
141 retry_count: 0,
142 }
143 }
144
145 pub fn with_retry(mut self) -> Self {
147 self.retry_attempted = true;
148 self.retry_count += 1;
149 self
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_retryable_errors() {
159 assert!(
161 DataflowError::Http {
162 status: 500,
163 message: "Internal Server Error".to_string()
164 }
165 .retryable()
166 );
167 assert!(
168 DataflowError::Http {
169 status: 502,
170 message: "Bad Gateway".to_string()
171 }
172 .retryable()
173 );
174 assert!(
175 DataflowError::Http {
176 status: 503,
177 message: "Service Unavailable".to_string()
178 }
179 .retryable()
180 );
181 assert!(
182 DataflowError::Http {
183 status: 429,
184 message: "Too Many Requests".to_string()
185 }
186 .retryable()
187 );
188 assert!(
189 DataflowError::Http {
190 status: 408,
191 message: "Request Timeout".to_string()
192 }
193 .retryable()
194 );
195 assert!(
196 DataflowError::Http {
197 status: 0,
198 message: "Connection Error".to_string()
199 }
200 .retryable()
201 );
202 assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
203 assert!(DataflowError::Io("Network error".to_string()).retryable());
204 }
205
206 #[test]
207 fn test_non_retryable_errors() {
208 assert!(
210 !DataflowError::Http {
211 status: 400,
212 message: "Bad Request".to_string()
213 }
214 .retryable()
215 );
216 assert!(
217 !DataflowError::Http {
218 status: 401,
219 message: "Unauthorized".to_string()
220 }
221 .retryable()
222 );
223 assert!(
224 !DataflowError::Http {
225 status: 403,
226 message: "Forbidden".to_string()
227 }
228 .retryable()
229 );
230 assert!(
231 !DataflowError::Http {
232 status: 404,
233 message: "Not Found".to_string()
234 }
235 .retryable()
236 );
237 assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
238 assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
239 assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
240 assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
241 assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
242 }
243
244 #[test]
245 fn test_function_execution_error_retryability() {
246 let retryable_source = DataflowError::Http {
248 status: 500,
249 message: "Server Error".to_string(),
250 };
251 let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
252
253 let retryable_func_error =
254 DataflowError::function_execution("HTTP call failed", Some(retryable_source));
255 let non_retryable_func_error =
256 DataflowError::function_execution("Validation failed", Some(non_retryable_source));
257 let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
258
259 assert!(retryable_func_error.retryable());
260 assert!(!non_retryable_func_error.retryable());
261 assert!(!no_source_func_error.retryable());
262 }
263}