use async_trait::async_trait;
use std::time::Duration;
use uuid::Uuid;
use crate::dal::DAL;
use crate::executor::pipeline_executor::{
PipelineError, PipelineExecution, PipelineExecutor, PipelineResult, PipelineStatus,
};
use crate::Context;
use crate::UniversalUuid;
use super::DefaultRunner;
#[async_trait]
impl PipelineExecutor for DefaultRunner {
async fn execute(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<PipelineResult, PipelineError> {
let execution_id = self
.scheduler
.schedule_workflow_execution(workflow_name, context)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to schedule workflow: {}", e),
})?;
let start_time = std::time::Instant::now();
let dal = DAL::new(self.database.clone());
loop {
if let Some(timeout) = self.config.pipeline_timeout() {
if start_time.elapsed() > timeout {
return Err(PipelineError::Timeout {
timeout_seconds: timeout.as_secs(),
});
}
}
let pipeline = dal
.pipeline_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to check execution status: {}", e),
})?;
match pipeline.status.as_str() {
"Completed" | "Failed" => {
return self.build_pipeline_result(execution_id).await;
}
_ => {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
}
async fn execute_async(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<PipelineExecution, PipelineError> {
let execution_id = self
.scheduler
.schedule_workflow_execution(workflow_name, context)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to schedule workflow: {}", e),
})?;
Ok(PipelineExecution::new(
execution_id,
workflow_name.to_string(),
self.clone(),
))
}
async fn execute_with_callback(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
callback: Box<dyn crate::executor::pipeline_executor::StatusCallback>,
) -> Result<PipelineResult, PipelineError> {
let execution = self.execute_async(workflow_name, context).await?;
let execution_id = execution.execution_id;
let mut last_status = PipelineStatus::Pending;
callback.on_status_change(last_status.clone());
loop {
let current_status = self.get_execution_status(execution_id).await?;
if current_status != last_status {
callback.on_status_change(current_status.clone());
last_status = current_status.clone();
}
if current_status.is_terminal() {
return self.get_execution_result(execution_id).await;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
async fn get_execution_status(
&self,
execution_id: Uuid,
) -> Result<PipelineStatus, PipelineError> {
let dal = DAL::new(self.database.clone());
let pipeline = dal
.pipeline_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get execution status: {}", e),
})?;
let status = match pipeline.status.as_str() {
"Pending" => PipelineStatus::Pending,
"Running" => PipelineStatus::Running,
"Completed" => PipelineStatus::Completed,
"Failed" => PipelineStatus::Failed,
"Cancelled" => PipelineStatus::Cancelled,
"Paused" => PipelineStatus::Paused,
_ => PipelineStatus::Failed,
};
Ok(status)
}
async fn get_execution_result(
&self,
execution_id: Uuid,
) -> Result<PipelineResult, PipelineError> {
self.build_pipeline_result(execution_id).await
}
async fn cancel_execution(&self, execution_id: Uuid) -> Result<(), PipelineError> {
let dal = DAL::new(self.database.clone());
dal.pipeline_execution()
.cancel(execution_id.into())
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to cancel execution: {}", e),
})?;
Ok(())
}
async fn pause_execution(
&self,
execution_id: Uuid,
reason: Option<&str>,
) -> Result<(), PipelineError> {
let dal = DAL::new(self.database.clone());
let pipeline = dal
.pipeline_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get execution: {}", e),
})?;
if pipeline.status != "Running" && pipeline.status != "Pending" {
return Err(PipelineError::ExecutionFailed {
message: format!(
"Cannot pause pipeline with status '{}'. Only 'Pending' or 'Running' pipelines can be paused.",
pipeline.status
),
});
}
dal.pipeline_execution()
.pause(execution_id.into(), reason)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to pause execution: {}", e),
})?;
Ok(())
}
async fn resume_execution(&self, execution_id: Uuid) -> Result<(), PipelineError> {
let dal = DAL::new(self.database.clone());
let pipeline = dal
.pipeline_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get execution: {}", e),
})?;
if pipeline.status != "Paused" {
return Err(PipelineError::ExecutionFailed {
message: format!(
"Cannot resume pipeline with status '{}'. Only 'Paused' pipelines can be resumed.",
pipeline.status
),
});
}
dal.pipeline_execution()
.resume(execution_id.into())
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to resume execution: {}", e),
})?;
Ok(())
}
async fn list_executions(&self) -> Result<Vec<PipelineResult>, PipelineError> {
let dal = DAL::new(self.database.clone());
let executions = dal
.pipeline_execution()
.list_recent(100)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to list executions: {}", e),
})?;
let mut results = Vec::new();
for execution in executions {
if let Ok(result) = self.build_pipeline_result(execution.id.into()).await {
results.push(result);
}
}
Ok(results)
}
async fn shutdown(&self) -> Result<(), PipelineError> {
DefaultRunner::shutdown(self).await
}
}