use chrono::Utc;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Error, Clone, Serialize, Deserialize)]
pub enum DataflowError {
#[error("Validation error: {0}")]
Validation(String),
#[error("Function execution error: {context}")]
FunctionExecution {
context: String,
#[source]
#[serde(skip)]
source: Option<Box<DataflowError>>,
},
#[error("Workflow error: {0}")]
Workflow(String),
#[error("Task error: {0}")]
Task(String),
#[error("Function not found: {0}")]
FunctionNotFound(String),
#[error("Deserialization error: {0}")]
Deserialization(String),
#[error("IO error: {0}")]
Io(String),
#[error("Logic evaluation error: {0}")]
LogicEvaluation(String),
#[error("HTTP error: {status} - {message}")]
Http { status: u16, message: String },
#[error("Timeout error: {0}")]
Timeout(String),
#[error("Unknown error: {0}")]
Unknown(String),
}
impl DataflowError {
pub fn function_execution<S: Into<String>>(context: S, source: Option<DataflowError>) -> Self {
DataflowError::FunctionExecution {
context: context.into(),
source: source.map(Box::new),
}
}
pub fn http<S: Into<String>>(status: u16, message: S) -> Self {
DataflowError::Http {
status,
message: message.into(),
}
}
pub fn from_io(err: std::io::Error) -> Self {
DataflowError::Io(err.to_string())
}
pub fn from_serde(err: serde_json::Error) -> Self {
DataflowError::Deserialization(err.to_string())
}
pub fn retryable(&self) -> bool {
match self {
DataflowError::Http { status, .. } => {
*status >= 500 || *status == 429 || *status == 408 || *status == 0
}
DataflowError::Timeout(_) => true,
DataflowError::Io(_) => true,
DataflowError::FunctionExecution { source, .. } => {
source.as_ref().map(|e| e.retryable()).unwrap_or(false)
}
DataflowError::Validation(_) => false,
DataflowError::LogicEvaluation(_) => false,
DataflowError::Deserialization(_) => false,
DataflowError::Workflow(_) => false,
DataflowError::Task(_) => false,
DataflowError::FunctionNotFound(_) => false,
DataflowError::Unknown(_) => false,
}
}
}
pub type Result<T> = std::result::Result<T, DataflowError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorInfo {
pub code: String,
pub message: String,
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub workflow_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_attempted: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_count: Option<u32>,
}
impl ErrorInfo {
pub fn new(workflow_id: Option<String>, task_id: Option<String>, error: DataflowError) -> Self {
Self {
code: match &error {
DataflowError::Validation(_) => "VALIDATION_ERROR".to_string(),
DataflowError::Workflow(_) => "WORKFLOW_ERROR".to_string(),
DataflowError::Task(_) => "TASK_ERROR".to_string(),
DataflowError::FunctionNotFound(_) => "FUNCTION_NOT_FOUND".to_string(),
DataflowError::FunctionExecution { .. } => "FUNCTION_ERROR".to_string(),
DataflowError::LogicEvaluation(_) => "LOGIC_ERROR".to_string(),
DataflowError::Http { .. } => "HTTP_ERROR".to_string(),
DataflowError::Timeout(_) => "TIMEOUT_ERROR".to_string(),
DataflowError::Io(_) => "IO_ERROR".to_string(),
DataflowError::Deserialization(_) => "DESERIALIZATION_ERROR".to_string(),
DataflowError::Unknown(_) => "UNKNOWN_ERROR".to_string(),
},
message: error.to_string(),
path: None,
workflow_id,
task_id,
timestamp: Some(Utc::now().to_rfc3339()),
retry_attempted: Some(false),
retry_count: Some(0),
}
}
pub fn simple(code: String, message: String, path: Option<String>) -> Self {
Self {
code,
message,
path,
workflow_id: None,
task_id: None,
timestamp: Some(Utc::now().to_rfc3339()),
retry_attempted: None,
retry_count: None,
}
}
pub fn simple_ref(code: &str, message: &str, path: Option<&str>) -> Self {
Self {
code: code.to_string(),
message: message.to_string(),
path: path.map(|s| s.to_string()),
workflow_id: None,
task_id: None,
timestamp: Some(Utc::now().to_rfc3339()),
retry_attempted: None,
retry_count: None,
}
}
pub fn with_retry(mut self) -> Self {
self.retry_attempted = Some(true);
self.retry_count = Some(self.retry_count.unwrap_or(0) + 1);
self
}
pub fn builder(code: impl Into<String>, message: impl Into<String>) -> ErrorInfoBuilder {
ErrorInfoBuilder::new(code, message)
}
}
pub struct ErrorInfoBuilder {
code: String,
message: String,
path: Option<String>,
workflow_id: Option<String>,
task_id: Option<String>,
timestamp: Option<String>,
retry_attempted: Option<bool>,
retry_count: Option<u32>,
}
impl ErrorInfoBuilder {
pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
code: code.into(),
message: message.into(),
path: None,
workflow_id: None,
task_id: None,
timestamp: Some(Utc::now().to_rfc3339()),
retry_attempted: None,
retry_count: None,
}
}
pub fn path(mut self, path: impl Into<String>) -> Self {
self.path = Some(path.into());
self
}
pub fn workflow_id(mut self, id: impl Into<String>) -> Self {
self.workflow_id = Some(id.into());
self
}
pub fn task_id(mut self, id: impl Into<String>) -> Self {
self.task_id = Some(id.into());
self
}
pub fn timestamp(mut self, timestamp: impl Into<String>) -> Self {
self.timestamp = Some(timestamp.into());
self
}
pub fn retry_attempted(mut self, attempted: bool) -> Self {
self.retry_attempted = Some(attempted);
self
}
pub fn retry_count(mut self, count: u32) -> Self {
self.retry_count = Some(count);
self
}
pub fn build(self) -> ErrorInfo {
ErrorInfo {
code: self.code,
message: self.message,
path: self.path,
workflow_id: self.workflow_id,
task_id: self.task_id,
timestamp: self.timestamp,
retry_attempted: self.retry_attempted,
retry_count: self.retry_count,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retryable_errors() {
assert!(
DataflowError::Http {
status: 500,
message: "Internal Server Error".to_string()
}
.retryable()
);
assert!(
DataflowError::Http {
status: 502,
message: "Bad Gateway".to_string()
}
.retryable()
);
assert!(
DataflowError::Http {
status: 503,
message: "Service Unavailable".to_string()
}
.retryable()
);
assert!(
DataflowError::Http {
status: 429,
message: "Too Many Requests".to_string()
}
.retryable()
);
assert!(
DataflowError::Http {
status: 408,
message: "Request Timeout".to_string()
}
.retryable()
);
assert!(
DataflowError::Http {
status: 0,
message: "Connection Error".to_string()
}
.retryable()
);
assert!(DataflowError::Timeout("Connection timeout".to_string()).retryable());
assert!(DataflowError::Io("Network error".to_string()).retryable());
}
#[test]
fn test_non_retryable_errors() {
assert!(
!DataflowError::Http {
status: 400,
message: "Bad Request".to_string()
}
.retryable()
);
assert!(
!DataflowError::Http {
status: 401,
message: "Unauthorized".to_string()
}
.retryable()
);
assert!(
!DataflowError::Http {
status: 403,
message: "Forbidden".to_string()
}
.retryable()
);
assert!(
!DataflowError::Http {
status: 404,
message: "Not Found".to_string()
}
.retryable()
);
assert!(!DataflowError::Validation("Invalid input".to_string()).retryable());
assert!(!DataflowError::LogicEvaluation("Invalid logic".to_string()).retryable());
assert!(!DataflowError::Deserialization("Invalid JSON".to_string()).retryable());
assert!(!DataflowError::Workflow("Invalid workflow".to_string()).retryable());
assert!(!DataflowError::Unknown("Unknown error".to_string()).retryable());
}
#[test]
fn test_function_execution_error_retryability() {
let retryable_source = DataflowError::Http {
status: 500,
message: "Server Error".to_string(),
};
let non_retryable_source = DataflowError::Validation("Invalid data".to_string());
let retryable_func_error =
DataflowError::function_execution("HTTP call failed", Some(retryable_source));
let non_retryable_func_error =
DataflowError::function_execution("Validation failed", Some(non_retryable_source));
let no_source_func_error = DataflowError::function_execution("Unknown failure", None);
assert!(retryable_func_error.retryable());
assert!(!non_retryable_func_error.retryable());
assert!(!no_source_func_error.retryable());
}
#[test]
fn test_error_info_builder() {
let error = ErrorInfo::builder("TEST_ERROR", "Test message").build();
assert_eq!(error.code, "TEST_ERROR");
assert_eq!(error.message, "Test message");
assert!(error.timestamp.is_some());
assert!(error.path.is_none());
let error = ErrorInfo::builder("VALIDATION_ERROR", "Field validation failed")
.path("data.email")
.workflow_id("workflow_1")
.task_id("validate_email")
.retry_attempted(true)
.retry_count(2)
.build();
assert_eq!(error.code, "VALIDATION_ERROR");
assert_eq!(error.message, "Field validation failed");
assert_eq!(error.path, Some("data.email".to_string()));
assert_eq!(error.workflow_id, Some("workflow_1".to_string()));
assert_eq!(error.task_id, Some("validate_email".to_string()));
assert_eq!(error.retry_attempted, Some(true));
assert_eq!(error.retry_count, Some(2));
}
}