use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Clone)]
pub struct TransientStepError {
pub code: String,
pub message: String,
pub source: Option<String>,
pub retry_after: Option<std::time::Duration>,
}
impl TransientStepError {
pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
code: code.into(),
message: message.into(),
source: None,
retry_after: None,
}
}
pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
self.source = Some(source.to_string());
self
}
pub fn with_delay(mut self, delay: std::time::Duration) -> Self {
self.retry_after = Some(delay);
self
}
}
impl std::fmt::Display for TransientStepError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.code, self.message)
}
}
impl std::error::Error for TransientStepError {}
impl From<TransientStepError> for Error {
fn from(error: TransientStepError) -> Self {
Error::Transient {
code: error.code,
message: error.message,
retry_after: error.retry_after,
}
}
}
#[derive(Error, Debug)]
pub enum Error {
#[cfg(feature = "sqlx")]
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[cfg(feature = "turso")]
#[error("Turso error: {0}")]
Turso(#[from] turso::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Missing required configuration: {field}")]
MissingConfig { field: String },
#[error("Invalid configuration value for {field}: {message}")]
InvalidConfig { field: String, message: String },
#[error("Queue '{name}' not found")]
QueueNotFound { name: String },
#[error("Queue '{name}' already exists")]
QueueAlreadyExists { name: String },
#[error("Workflow '{name}' already exists")]
WorkflowAlreadyExists { name: String },
#[error("Message with id '{id}' not found")]
MessageNotFound { id: uuid::Uuid },
#[error("Invalid message format: {message}")]
InvalidMessage { message: String },
#[error("Schema validation failed: {message}")]
SchemaValidation { message: String },
#[error("Operation timeout: {operation}")]
Timeout { operation: String },
#[error("Operation suspended: {reason}")]
Suspended { reason: String },
#[error("Database connection failed: {source}. Context: {context}")]
ConnectionFailed { source: BoxError, context: String },
#[error("Database query failed: {query}. Context: {context}. Source: {source}")]
QueryFailed {
source: BoxError,
query: String,
context: String,
},
#[error("Database transaction failed: {source}. Context: {context}")]
TransactionFailed { source: BoxError, context: String },
#[error("Database connection pool exhausted: {source}. Context: {context}")]
PoolExhausted { source: BoxError, context: String },
#[cfg(feature = "sqlx")]
#[error("Database migration failed: {0}")]
MigrationFailed(#[from] sqlx::migrate::MigrateError),
#[error("Internal error: {message}")]
Internal { message: String },
#[error("Validation failed: {reason}")]
ValidationFailed { reason: String },
#[error("Rate limit exceeded, retry after {retry_after:?}")]
RateLimited { retry_after: std::time::Duration },
#[error("Payload size {actual_bytes} exceeds limit {max_bytes}")]
PayloadTooLarge {
actual_bytes: usize,
max_bytes: usize,
},
#[error("Invalid state transition from {from} to {to}: {reason}")]
InvalidStateTransition {
from: String,
to: String,
reason: String,
},
#[error("Worker has {count} pending messages: {reason}")]
WorkerHasPendingMessages { count: u64, reason: String },
#[error("Invalid worker type: {message}")]
InvalidWorkerType { message: String },
#[error("Conflict: {message}")]
Conflict { message: String },
#[error("Worker with id '{id}' not found")]
WorkerNotFound { id: i64 },
#[error("Worker not registered: {message}")]
WorkerNotRegistered { message: String },
#[deprecated(
since = "0.6.0",
note = "Use ConnectionFailed, QueryFailed, TransactionFailed, or PoolExhausted instead"
)]
#[error("Connection error: {message}")]
Connection { message: String },
#[error("{entity} with id '{id}' not found")]
NotFound { entity: String, id: String },
#[error("Workflow run {run_id} failed: {error}")]
ExecutionFailed {
run_id: i64,
error: serde_json::Value,
},
#[error("Workflow paused: {message}")]
Paused {
message: String,
resume_after: std::time::Duration,
},
#[cfg(any(test, feature = "test-utils"))]
#[error("Test crash")]
TestCrash,
#[error("Transient error ({code}): {message}")]
Transient {
code: String,
message: String,
retry_after: Option<std::time::Duration>,
},
#[error("Step failed after {attempts} attempts: {error}")]
RetriesExhausted {
error: serde_json::Value,
attempts: u32,
},
#[error("Step not ready for execution (retry scheduled for {retry_at})")]
StepNotReady {
retry_at: chrono::DateTime<chrono::Utc>,
retry_count: u32,
},
}