use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::time::Duration;
use uuid::Uuid;
use crate::error::{ExecutorError, TaskError, ValidationError};
use crate::task::TaskState;
use crate::Context;
pub trait StatusCallback: Send + Sync {
fn on_status_change(&self, status: PipelineStatus);
}
#[derive(Debug, Clone)]
pub struct TaskResult {
pub task_name: String,
pub status: TaskState,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub duration: Option<Duration>,
pub attempt_count: i32,
pub error_message: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum PipelineError {
#[error("Database connection failed: {message}")]
DatabaseConnection { message: String },
#[error("Workflow not found: {workflow_name}")]
WorkflowNotFound { workflow_name: String },
#[error("Pipeline execution failed: {message}")]
ExecutionFailed { message: String },
#[error("Pipeline timeout after {timeout_seconds}s")]
Timeout { timeout_seconds: u64 },
#[error("Validation error: {0}")]
Validation(#[from] ValidationError),
#[error("Task execution error: {0}")]
TaskExecution(#[from] TaskError),
#[error("Executor error: {0}")]
Executor(#[from] ExecutorError),
#[error("Configuration error: {message}")]
Configuration { message: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PipelineStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
Paused,
}
impl PipelineStatus {
pub fn is_terminal(&self) -> bool {
matches!(
self,
PipelineStatus::Completed | PipelineStatus::Failed | PipelineStatus::Cancelled
)
}
}
#[derive(Debug)]
pub struct PipelineResult {
pub execution_id: Uuid,
pub workflow_name: String,
pub status: PipelineStatus,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub duration: Option<Duration>,
pub final_context: Context<serde_json::Value>,
pub task_results: Vec<TaskResult>,
pub error_message: Option<String>,
}
pub struct PipelineExecution {
pub execution_id: Uuid,
pub workflow_name: String,
executor: crate::runner::DefaultRunner,
}
impl PipelineExecution {
pub fn new(
execution_id: Uuid,
workflow_name: String,
executor: crate::runner::DefaultRunner,
) -> Self {
Self {
execution_id,
workflow_name,
executor,
}
}
pub async fn wait_for_completion(self) -> Result<PipelineResult, PipelineError> {
self.wait_for_completion_with_timeout(None).await
}
pub async fn wait_for_completion_with_timeout(
self,
timeout: Option<Duration>,
) -> Result<PipelineResult, PipelineError> {
let start_time = std::time::Instant::now();
loop {
if let Some(timeout_duration) = timeout {
if start_time.elapsed() > timeout_duration {
return Err(PipelineError::Timeout {
timeout_seconds: timeout_duration.as_secs(),
});
}
}
match self
.executor
.get_execution_status(self.execution_id)
.await?
{
status if status.is_terminal() => {
return self.executor.get_execution_result(self.execution_id).await;
}
_ => {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
}
pub async fn get_status(&self) -> Result<PipelineStatus, PipelineError> {
self.executor.get_execution_status(self.execution_id).await
}
pub async fn cancel(&self) -> Result<(), PipelineError> {
self.executor.cancel_execution(self.execution_id).await
}
pub async fn pause(&self, reason: Option<&str>) -> Result<(), PipelineError> {
self.executor
.pause_execution(self.execution_id, reason)
.await
}
pub async fn resume(&self) -> Result<(), PipelineError> {
self.executor.resume_execution(self.execution_id).await
}
}
#[async_trait]
pub trait PipelineExecutor: Send + Sync {
async fn execute(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<PipelineResult, PipelineError>;
async fn execute_async(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<PipelineExecution, PipelineError>;
async fn get_execution_status(
&self,
execution_id: Uuid,
) -> Result<PipelineStatus, PipelineError>;
async fn get_execution_result(
&self,
execution_id: Uuid,
) -> Result<PipelineResult, PipelineError>;
async fn cancel_execution(&self, execution_id: Uuid) -> Result<(), PipelineError>;
async fn pause_execution(
&self,
execution_id: Uuid,
reason: Option<&str>,
) -> Result<(), PipelineError>;
async fn resume_execution(&self, execution_id: Uuid) -> Result<(), PipelineError>;
async fn execute_with_callback(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
callback: Box<dyn StatusCallback>,
) -> Result<PipelineResult, PipelineError>;
async fn list_executions(&self) -> Result<Vec<PipelineResult>, PipelineError>;
async fn shutdown(&self) -> Result<(), PipelineError>;
}
impl PipelineStatus {
#[allow(dead_code)]
pub(crate) fn from_str(s: &str) -> Self {
match s {
"Pending" => PipelineStatus::Pending,
"Running" => PipelineStatus::Running,
"Completed" => PipelineStatus::Completed,
"Failed" => PipelineStatus::Failed,
"Cancelled" => PipelineStatus::Cancelled,
"Paused" => PipelineStatus::Paused,
_ => PipelineStatus::Failed,
}
}
}