use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::time::Duration;
use uuid::Uuid;
use crate::error::{ExecutorError, TaskError, ValidationError};
use crate::task::TaskState;
use crate::Context;
pub trait StatusCallback: Send + Sync {
fn on_status_change(&self, status: WorkflowStatus);
}
#[derive(Debug, Clone)]
pub struct TaskResult {
pub task_name: String,
pub status: TaskState,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub duration: Option<Duration>,
pub attempt_count: i32,
pub error_message: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum WorkflowExecutionError {
#[error("Database connection failed: {message}")]
DatabaseConnection { message: String },
#[error("Workflow not found: {workflow_name}")]
WorkflowNotFound { workflow_name: String },
#[error("Workflow execution failed: {message}")]
ExecutionFailed { message: String },
#[error("Workflow timeout after {timeout_seconds}s")]
Timeout { timeout_seconds: u64 },
#[error("Validation error: {0}")]
Validation(#[from] ValidationError),
#[error("Task execution error: {0}")]
TaskExecution(#[from] TaskError),
#[error("Executor error: {0}")]
Executor(#[from] ExecutorError),
#[error("Configuration error: {message}")]
Configuration { message: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkflowStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
Paused,
}
impl WorkflowStatus {
pub fn is_terminal(&self) -> bool {
matches!(
self,
WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Cancelled
)
}
}
#[derive(Debug)]
pub struct WorkflowExecutionResult {
pub execution_id: Uuid,
pub workflow_name: String,
pub status: WorkflowStatus,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub duration: Option<Duration>,
pub final_context: Context<serde_json::Value>,
pub task_results: Vec<TaskResult>,
pub error_message: Option<String>,
}
pub struct WorkflowExecution {
pub execution_id: Uuid,
pub workflow_name: String,
executor: crate::runner::DefaultRunner,
}
impl WorkflowExecution {
pub fn new(
execution_id: Uuid,
workflow_name: String,
executor: crate::runner::DefaultRunner,
) -> Self {
Self {
execution_id,
workflow_name,
executor,
}
}
pub async fn wait_for_completion(
self,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError> {
self.wait_for_completion_with_timeout(None).await
}
pub async fn wait_for_completion_with_timeout(
self,
timeout: Option<Duration>,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError> {
let start_time = std::time::Instant::now();
loop {
if let Some(timeout_duration) = timeout {
if start_time.elapsed() > timeout_duration {
return Err(WorkflowExecutionError::Timeout {
timeout_seconds: timeout_duration.as_secs(),
});
}
}
match self
.executor
.get_execution_status(self.execution_id)
.await?
{
status if status.is_terminal() => {
return self.executor.get_execution_result(self.execution_id).await;
}
_ => {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
}
pub async fn get_status(&self) -> Result<WorkflowStatus, WorkflowExecutionError> {
self.executor.get_execution_status(self.execution_id).await
}
pub async fn cancel(&self) -> Result<(), WorkflowExecutionError> {
self.executor.cancel_execution(self.execution_id).await
}
pub async fn pause(&self, reason: Option<&str>) -> Result<(), WorkflowExecutionError> {
self.executor
.pause_execution(self.execution_id, reason)
.await
}
pub async fn resume(&self) -> Result<(), WorkflowExecutionError> {
self.executor.resume_execution(self.execution_id).await
}
}
#[async_trait]
pub trait WorkflowExecutor: Send + Sync {
async fn execute(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError>;
async fn execute_async(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<WorkflowExecution, WorkflowExecutionError>;
async fn get_execution_status(
&self,
execution_id: Uuid,
) -> Result<WorkflowStatus, WorkflowExecutionError>;
async fn get_execution_result(
&self,
execution_id: Uuid,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError>;
async fn cancel_execution(&self, execution_id: Uuid) -> Result<(), WorkflowExecutionError>;
async fn pause_execution(
&self,
execution_id: Uuid,
reason: Option<&str>,
) -> Result<(), WorkflowExecutionError>;
async fn resume_execution(&self, execution_id: Uuid) -> Result<(), WorkflowExecutionError>;
async fn execute_with_callback(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
callback: Box<dyn StatusCallback>,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError>;
async fn list_executions(&self)
-> Result<Vec<WorkflowExecutionResult>, WorkflowExecutionError>;
async fn shutdown(&self) -> Result<(), WorkflowExecutionError>;
}
impl WorkflowStatus {
#[cfg(test)]
pub(crate) fn from_str(s: &str) -> Self {
match s {
"Pending" => WorkflowStatus::Pending,
"Running" => WorkflowStatus::Running,
"Completed" => WorkflowStatus::Completed,
"Failed" => WorkflowStatus::Failed,
"Cancelled" => WorkflowStatus::Cancelled,
"Paused" => WorkflowStatus::Paused,
_ => WorkflowStatus::Failed,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
#[test]
fn test_workflow_status_is_terminal() {
assert!(WorkflowStatus::Completed.is_terminal());
assert!(WorkflowStatus::Failed.is_terminal());
assert!(WorkflowStatus::Cancelled.is_terminal());
}
#[test]
fn test_workflow_status_is_not_terminal() {
assert!(!WorkflowStatus::Pending.is_terminal());
assert!(!WorkflowStatus::Running.is_terminal());
assert!(!WorkflowStatus::Paused.is_terminal());
}
#[test]
fn test_workflow_status_from_str_valid() {
assert_eq!(WorkflowStatus::from_str("Pending"), WorkflowStatus::Pending);
assert_eq!(WorkflowStatus::from_str("Running"), WorkflowStatus::Running);
assert_eq!(
WorkflowStatus::from_str("Completed"),
WorkflowStatus::Completed
);
assert_eq!(WorkflowStatus::from_str("Failed"), WorkflowStatus::Failed);
assert_eq!(
WorkflowStatus::from_str("Cancelled"),
WorkflowStatus::Cancelled
);
assert_eq!(WorkflowStatus::from_str("Paused"), WorkflowStatus::Paused);
}
#[test]
fn test_workflow_status_from_str_invalid_defaults_to_failed() {
assert_eq!(WorkflowStatus::from_str("garbage"), WorkflowStatus::Failed);
assert_eq!(WorkflowStatus::from_str(""), WorkflowStatus::Failed);
assert_eq!(WorkflowStatus::from_str("running"), WorkflowStatus::Failed);
}
#[test]
fn test_workflow_status_eq() {
assert_eq!(WorkflowStatus::Running, WorkflowStatus::Running);
assert_ne!(WorkflowStatus::Running, WorkflowStatus::Pending);
}
#[test]
fn test_workflow_status_clone() {
let status = WorkflowStatus::Paused;
let cloned = status.clone();
assert_eq!(status, cloned);
}
#[test]
fn test_workflow_status_debug() {
let debug_str = format!("{:?}", WorkflowStatus::Running);
assert_eq!(debug_str, "Running");
}
#[test]
fn test_workflow_error_display_database_connection() {
let err = WorkflowExecutionError::DatabaseConnection {
message: "connection refused".to_string(),
};
assert_eq!(
err.to_string(),
"Database connection failed: connection refused"
);
}
#[test]
fn test_workflow_error_display_workflow_not_found() {
let err = WorkflowExecutionError::WorkflowNotFound {
workflow_name: "my_workflow".to_string(),
};
assert_eq!(err.to_string(), "Workflow not found: my_workflow");
}
#[test]
fn test_workflow_error_display_execution_failed() {
let err = WorkflowExecutionError::ExecutionFailed {
message: "something broke".to_string(),
};
assert_eq!(
err.to_string(),
"Workflow execution failed: something broke"
);
}
#[test]
fn test_workflow_error_display_timeout() {
let err = WorkflowExecutionError::Timeout {
timeout_seconds: 300,
};
assert_eq!(err.to_string(), "Workflow timeout after 300s");
}
#[test]
fn test_workflow_error_display_configuration() {
let err = WorkflowExecutionError::Configuration {
message: "bad config".to_string(),
};
assert_eq!(err.to_string(), "Configuration error: bad config");
}
#[test]
fn test_task_result_construction() {
let now = Utc::now();
let result = TaskResult {
task_name: "extract".to_string(),
status: TaskState::Completed {
completion_time: now,
},
start_time: Some(now),
end_time: Some(now),
duration: Some(Duration::from_secs(5)),
attempt_count: 1,
error_message: None,
};
assert_eq!(result.task_name, "extract");
assert_eq!(result.attempt_count, 1);
assert!(result.error_message.is_none());
}
#[test]
fn test_task_result_with_error() {
let result = TaskResult {
task_name: "transform".to_string(),
status: TaskState::Failed {
error: "division by zero".to_string(),
failure_time: Utc::now(),
},
start_time: None,
end_time: None,
duration: None,
attempt_count: 3,
error_message: Some("division by zero".to_string()),
};
assert_eq!(result.error_message.as_deref(), Some("division by zero"));
assert_eq!(result.attempt_count, 3);
}
#[test]
fn test_task_result_clone() {
let result = TaskResult {
task_name: "load".to_string(),
status: TaskState::Pending,
start_time: None,
end_time: None,
duration: None,
attempt_count: 0,
error_message: None,
};
let cloned = result.clone();
assert_eq!(cloned.task_name, result.task_name);
}
#[test]
fn test_workflow_result_construction() {
let result = WorkflowExecutionResult {
execution_id: Uuid::new_v4(),
workflow_name: "etl_pipeline".to_string(),
status: WorkflowStatus::Completed,
start_time: Utc::now(),
end_time: Some(Utc::now()),
duration: Some(Duration::from_secs(10)),
final_context: Context::new(),
task_results: vec![],
error_message: None,
};
assert_eq!(result.workflow_name, "etl_pipeline");
assert_eq!(result.status, WorkflowStatus::Completed);
assert!(result.error_message.is_none());
assert!(result.task_results.is_empty());
}
#[test]
fn test_workflow_result_with_tasks() {
let task1 = TaskResult {
task_name: "step_1".to_string(),
status: TaskState::Completed {
completion_time: Utc::now(),
},
start_time: None,
end_time: None,
duration: Some(Duration::from_secs(2)),
attempt_count: 1,
error_message: None,
};
let task2 = TaskResult {
task_name: "step_2".to_string(),
status: TaskState::Failed {
error: "oops".to_string(),
failure_time: Utc::now(),
},
start_time: None,
end_time: None,
duration: Some(Duration::from_secs(1)),
attempt_count: 2,
error_message: Some("oops".to_string()),
};
let result = WorkflowExecutionResult {
execution_id: Uuid::new_v4(),
workflow_name: "two_step".to_string(),
status: WorkflowStatus::Failed,
start_time: Utc::now(),
end_time: Some(Utc::now()),
duration: Some(Duration::from_secs(3)),
final_context: Context::new(),
task_results: vec![task1, task2],
error_message: Some("step_2 failed".to_string()),
};
assert_eq!(result.task_results.len(), 2);
assert_eq!(result.task_results[0].task_name, "step_1");
assert_eq!(result.task_results[1].task_name, "step_2");
}
#[test]
fn test_workflow_result_debug() {
let result = WorkflowExecutionResult {
execution_id: Uuid::new_v4(),
workflow_name: "debug_wf".to_string(),
status: WorkflowStatus::Running,
start_time: Utc::now(),
end_time: None,
duration: None,
final_context: Context::new(),
task_results: vec![],
error_message: None,
};
let debug_str = format!("{:?}", result);
assert!(debug_str.contains("WorkflowExecutionResult"));
assert!(debug_str.contains("debug_wf"));
}
}