use thiserror::Error;
use uuid::Uuid;
pub use cloacina_workflow::{CheckpointError, TaskError};
#[derive(Debug, Error)]
pub enum ContextError {
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Key not found: {0}")]
KeyNotFound(String),
#[error("Type mismatch for key {0}")]
TypeMismatch(String),
#[error("Key already exists: {0}")]
KeyExists(String),
#[error("Database error: {0}")]
Database(#[from] diesel::result::Error),
#[error("Connection pool error: {0}")]
ConnectionPool(String),
#[error("Invalid execution scope: {0}")]
InvalidScope(String),
}
impl From<cloacina_workflow::ContextError> for ContextError {
fn from(err: cloacina_workflow::ContextError) -> Self {
match err {
cloacina_workflow::ContextError::Serialization(e) => ContextError::Serialization(e),
cloacina_workflow::ContextError::KeyNotFound(k) => ContextError::KeyNotFound(k),
cloacina_workflow::ContextError::TypeMismatch(k) => ContextError::TypeMismatch(k),
cloacina_workflow::ContextError::KeyExists(k) => ContextError::KeyExists(k),
cloacina_workflow::ContextError::Database(msg) => ContextError::ConnectionPool(msg),
cloacina_workflow::ContextError::ConnectionPool(msg) => {
ContextError::ConnectionPool(msg)
}
}
}
}
#[derive(Debug, Error)]
pub enum RegistrationError {
#[error("Task with id '{id}' already registered")]
DuplicateTaskId { id: String },
#[error("Invalid task id: {message}")]
InvalidTaskId { message: String },
#[error("Task registration failed: {message}")]
RegistrationFailed { message: String },
}
#[derive(Debug, Error)]
pub enum ValidationError {
#[error("Circular dependency detected: {cycle:?}")]
CyclicDependency { cycle: Vec<String> },
#[error("Missing dependency: task '{task}' depends on '{dependency}' which is not registered")]
MissingDependency { task: String, dependency: String },
#[error("Duplicate task ID: {0}")]
DuplicateTaskId(String),
#[error("Workflow cannot be empty")]
EmptyWorkflow,
#[error("Invalid dependency graph: {message}")]
InvalidGraph { message: String },
#[error("Workflow not found in registry: {0}")]
WorkflowNotFound(String),
#[error("Workflow execution failed: {message}")]
ExecutionFailed { message: String },
#[error("Task scheduling failed: {task_id}")]
TaskSchedulingFailed { task_id: String },
#[error("Invalid trigger rule format: {0}")]
InvalidTriggerRule(String),
#[error("Invalid task name format: {0}")]
InvalidTaskName(String),
#[error("Context value evaluation failed: {key}")]
ContextEvaluationFailed { key: String },
#[error("Recovery operation failed: {message}")]
RecoveryFailed { message: String },
#[error("Task recovery abandoned: {task_id} after {attempts} attempts")]
TaskRecoveryAbandoned { task_id: String, attempts: i32 },
#[error("Workflow recovery failed: {workflow_execution_id}")]
WorkflowRecoveryFailed { workflow_execution_id: uuid::Uuid },
#[error("Database connection error: {message}")]
DatabaseConnection { message: String },
#[error("Database query error: {message}")]
DatabaseQuery { message: String },
#[error("Database error: {0}")]
Database(#[from] diesel::result::Error),
#[error("Connection pool error: {0}")]
ConnectionPool(String),
#[error("Context error: {0}")]
Context(#[from] ContextError),
}
impl From<deadpool::managed::PoolError<deadpool_diesel::Error>> for ValidationError {
fn from(err: deadpool::managed::PoolError<deadpool_diesel::Error>) -> Self {
ValidationError::ConnectionPool(err.to_string())
}
}
impl From<deadpool::managed::PoolError<deadpool_diesel::Error>> for ContextError {
fn from(err: deadpool::managed::PoolError<deadpool_diesel::Error>) -> Self {
ContextError::ConnectionPool(err.to_string())
}
}
#[derive(Debug, Error)]
pub enum ExecutorError {
#[error("Database error: {0}")]
Database(#[from] diesel::result::Error),
#[error("Connection pool error: {0}")]
ConnectionPool(String),
#[error("Task not found in registry: {0}")]
TaskNotFound(String),
#[error("Task execution error: {0}")]
TaskExecution(#[from] TaskError),
#[error("Context error: {0}")]
Context(#[from] ContextError),
#[error("Task execution timeout")]
TaskTimeout,
#[error("Task claim lost to another runner")]
ClaimLost,
#[error("Semaphore acquisition error: {0}")]
Semaphore(#[from] tokio::sync::AcquireError),
#[error("Workflow execution not found: {0}")]
WorkflowExecutionNotFound(Uuid),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Invalid context scope: {0}")]
InvalidScope(String),
#[error("Validation error: {0}")]
Validation(#[from] ValidationError),
#[error("Context load failed: {0}")]
ContextLoadFailed(String),
}
impl From<deadpool::managed::PoolError<deadpool_diesel::Error>> for ExecutorError {
fn from(err: deadpool::managed::PoolError<deadpool_diesel::Error>) -> Self {
ExecutorError::ConnectionPool(err.to_string())
}
}
#[derive(Debug, Error)]
pub enum WorkflowError {
#[error("Duplicate task: {0}")]
DuplicateTask(String),
#[error("Task not found: {0}")]
TaskNotFound(String),
#[error("Invalid dependency: {0}")]
InvalidDependency(String),
#[error("Cyclic dependency: {0:?}")]
CyclicDependency(Vec<String>),
#[error("Unreachable task: {0}")]
UnreachableTask(String),
#[error("Registry error: {0}")]
RegistryError(String),
#[error("Task error: {0}")]
TaskError(String),
#[error("Validation error: {0}")]
ValidationError(String),
}
#[derive(Debug, Error)]
pub enum SubgraphError {
#[error("Task not found: {0}")]
TaskNotFound(String),
#[error("Unsupported operation: {0}")]
UnsupportedOperation(String),
}
impl From<ContextError> for TaskError {
fn from(error: ContextError) -> Self {
let workflow_error = match error {
ContextError::Serialization(e) => cloacina_workflow::ContextError::Serialization(e),
ContextError::KeyNotFound(k) => cloacina_workflow::ContextError::KeyNotFound(k),
ContextError::TypeMismatch(k) => cloacina_workflow::ContextError::TypeMismatch(k),
ContextError::KeyExists(k) => cloacina_workflow::ContextError::KeyExists(k),
ContextError::Database(e) => {
cloacina_workflow::ContextError::Database(format!("{}", e))
}
ContextError::ConnectionPool(msg) => {
cloacina_workflow::ContextError::ConnectionPool(msg)
}
ContextError::InvalidScope(msg) => {
cloacina_workflow::ContextError::Database(format!("Invalid scope: {}", msg))
}
};
TaskError::ContextError {
task_id: "unknown".to_string(),
error: workflow_error,
}
}
}