dataflow_rs/engine/
error.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use thiserror::Error;
4
5/// Main error type for the dataflow engine
6#[derive(Debug, Error, Clone, Serialize, Deserialize)]
7pub enum DataflowError {
8    /// Validation errors occurring during rule evaluation
9    #[error("Validation error: {0}")]
10    Validation(String),
11
12    /// Errors during function execution
13    #[error("Function execution error: {context}")]
14    FunctionExecution {
15        context: String,
16        #[source]
17        #[serde(skip)]
18        source: Option<Box<DataflowError>>,
19    },
20
21    /// Workflow-related errors
22    #[error("Workflow error: {0}")]
23    Workflow(String),
24
25    /// JSON serialization/deserialization errors
26    #[error("Deserialization error: {0}")]
27    Deserialization(String),
28
29    /// I/O errors (file reading, etc.)
30    #[error("IO error: {0}")]
31    Io(String),
32
33    /// JSONLogic/DataLogic evaluation errors
34    #[error("Logic evaluation error: {0}")]
35    LogicEvaluation(String),
36
37    /// HTTP request errors
38    #[error("HTTP error: {status} - {message}")]
39    Http { status: u16, message: String },
40
41    /// Timeout errors
42    #[error("Timeout error: {0}")]
43    Timeout(String),
44
45    /// Any other errors
46    #[error("Unknown error: {0}")]
47    Unknown(String),
48}
49
50impl DataflowError {
51    /// Creates a new function execution error with context
52    pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
53        DataflowError::FunctionExecution {
54            context: context.into(),
55            source: source.map(Box::new),
56        }
57    }
58
59    /// Creates a new HTTP error
60    pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
61        DataflowError::Http {
62            status,
63            message: message.into(),
64        }
65    }
66
67    /// Convert from std::io::Error
68    pub fn from_io(err: std::io::Error) -> Self {
69        DataflowError::Io(err.to_string())
70    }
71
72    /// Convert from serde_json::Error
73    pub fn from_serde(err: serde_json::Error) -> Self {
74        DataflowError::Deserialization(err.to_string())
75    }
76}
77
78/// Type alias for Result with DataflowError
79pub type Result<T> = std::result::Result<T, DataflowError>;
80
81/// Structured error information for error tracking in messages
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ErrorInfo {
84    /// ID of the workflow where the error occurred (if available)
85    pub workflow_id: Option<String>,
86
87    /// ID of the task where the error occurred (if available)
88    pub task_id: Option<String>,
89
90    /// Timestamp when the error occurred
91    pub timestamp: String,
92
93    /// The actual error
94    pub error_message: String,
95
96    /// Whether a retry was attempted
97    pub retry_attempted: bool,
98
99    /// Number of retries attempted
100    pub retry_count: u32,
101}
102
103impl ErrorInfo {
104    /// Create a new error info entry
105    pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
106        Self {
107            workflow_id,
108            task_id,
109            timestamp: Utc::now().to_rfc3339(),
110            error_message: error.to_string(),
111            retry_attempted: false,
112            retry_count: 0,
113        }
114    }
115
116    /// Mark that a retry was attempted
117    pub fn with_retry(mut self) -> Self {
118        self.retry_attempted = true;
119        self.retry_count += 1;
120        self
121    }
122}