dataflow_rs/engine/
error.rs1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8 #[error("Validation error: {0}")]
10 Validation(String),
11
12 #[error("Function execution error: {context}")]
14 FunctionExecution {
15 context: String,
16 #[source]
17 #[serde(skip)]
18 source: Option<Box<DataflowError>>,
19 },
20
21 #[error("Workflow error: {0}")]
23 Workflow(String),
24
25 #[error("Deserialization error: {0}")]
27 Deserialization(String),
28
29 #[error("IO error: {0}")]
31 Io(String),
32
33 #[error("Logic evaluation error: {0}")]
35 LogicEvaluation(String),
36
37 #[error("HTTP error: {status} - {message}")]
39 Http { status: u16, message: String },
40
41 #[error("Timeout error: {0}")]
43 Timeout(String),
44
45 #[error("Unknown error: {0}")]
47 Unknown(String),
48}
49
50impl DataflowError {
51 pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
53 DataflowError::FunctionExecution {
54 context: context.into(),
55 source: source.map(Box::new),
56 }
57 }
58
59 pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
61 DataflowError::Http {
62 status,
63 message: message.into(),
64 }
65 }
66
67 pub fn from_io(err: std::io::Error) -> Self {
69 DataflowError::Io(err.to_string())
70 }
71
72 pub fn from_serde(err: serde_json::Error) -> Self {
74 DataflowError::Deserialization(err.to_string())
75 }
76
77 pub fn retryable(&self) -> bool {
83 match self {
84 DataflowError::Http { status, .. } => {
86 *status >= 500 || *status == 429 || *status == 408 || *status == 0
88 }
90 DataflowError::Timeout(_) => true,
91 DataflowError::Io(_) => true,
92 DataflowError::FunctionExecution { source, .. } => {
93 source.as_ref().map(|e| e.retryable()).unwrap_or(false)
95 }
96
97 DataflowError::Validation(_) => false,
99 DataflowError::LogicEvaluation(_) => false,
100 DataflowError::Deserialization(_) => false,
101 DataflowError::Workflow(_) => false,
102 DataflowError::Unknown(_) => false,
103 }
104 }
105}
106
107pub type Result<T> = std::result::Result<T, DataflowError>;
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ErrorInfo {
113 pub workflow_id: Option<String>,
115
116 pub task_id: Option<String>,
118
119 pub timestamp: String,
121
122 pub error_message: String,
124
125 pub retry_attempted: bool,
127
128 pub retry_count: u32,
130}
131
132impl ErrorInfo {
133 pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
135 Self {
136 workflow_id,
137 task_id,
138 timestamp: Utc::now().to_rfc3339(),
139 error_message: error.to_string(),
140 retry_attempted: false,
141 retry_count: 0,
142 }
143 }
144
145 pub fn with_retry(mut self) -> Self {
147 self.retry_attempted = true;
148 self.retry_count += 1;
149 self
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_retryable_errors() {
159 assert!(DataflowError::Http {
161 status: 500,
162 message: "Internal Server Error".to_string()
163 }
164 .retryable());
165 assert!(DataflowError::Http {
166 status: 502,
167 message: "Bad Gateway".to_string()
168 }
169 .retryable());
170 assert!(DataflowError::Http {
171 status: 503,
172 message: "Service Unavailable".to_string()
173 }
174 .retryable());
175 assert!(DataflowError::Http {
176 status: 429,
177 message: "Too Many Requests".to_string()
178 }
179 .retryable());
180 assert!(DataflowError::Http {
181 status: 408,
182 message: "Request Timeout".to_string()
183 }
184 .retryable());
185 assert!(DataflowError::Http {
186 status: 0,
187 message: "Connection Error".to_string()
188 }
189 .retryable());
190 assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
191 assert!(DataflowError::Io("Network error".to_string()).retryable());
192 }
193
194 #[test]
195 fn test_non_retryable_errors() {
196 assert!(!DataflowError::Http {
198 status: 400,
199 message: "Bad Request".to_string()
200 }
201 .retryable());
202 assert!(!DataflowError::Http {
203 status: 401,
204 message: "Unauthorized".to_string()
205 }
206 .retryable());
207 assert!(!DataflowError::Http {
208 status: 403,
209 message: "Forbidden".to_string()
210 }
211 .retryable());
212 assert!(!DataflowError::Http {
213 status: 404,
214 message: "Not Found".to_string()
215 }
216 .retryable());
217 assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
218 assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
219 assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
220 assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
221 assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
222 }
223
224 #[test]
225 fn test_function_execution_error_retryability() {
226 let retryable_source = DataflowError::Http {
228 status: 500,
229 message: "Server Error".to_string(),
230 };
231 let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
232
233 let retryable_func_error =
234 DataflowError::function_execution("HTTP call failed", Some(retryable_source));
235 let non_retryable_func_error =
236 DataflowError::function_execution("Validation failed", Some(non_retryable_source));
237 let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
238
239 assert!(retryable_func_error.retryable());
240 assert!(!non_retryable_func_error.retryable());
241 assert!(!no_source_func_error.retryable());
242 }
243}