use async_trait::async_trait;
use std::time::Duration;
use uuid::Uuid;
use crate::dal::DAL;
use crate::executor::workflow_executor::{
WorkflowExecution, WorkflowExecutionError, WorkflowExecutionResult, WorkflowExecutor,
WorkflowStatus,
};
use crate::Context;
use crate::UniversalUuid;
use super::DefaultRunner;
#[async_trait]
impl WorkflowExecutor for DefaultRunner {
async fn execute(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError> {
let execution_id = self
.scheduler
.schedule_workflow_execution(workflow_name, context)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to schedule workflow: {}", e),
})?;
let start_time = std::time::Instant::now();
let dal = DAL::new(self.database.clone());
loop {
if let Some(timeout) = self.config.workflow_timeout() {
if start_time.elapsed() > timeout {
return Err(WorkflowExecutionError::Timeout {
timeout_seconds: timeout.as_secs(),
});
}
}
let execution = dal
.workflow_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to check execution status: {}", e),
})?;
match execution.status.as_str() {
"Completed" | "Failed" => {
return self.build_workflow_result(execution_id).await;
}
_ => {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
}
async fn execute_async(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
) -> Result<WorkflowExecution, WorkflowExecutionError> {
let execution_id = self
.scheduler
.schedule_workflow_execution(workflow_name, context)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to schedule workflow: {}", e),
})?;
Ok(WorkflowExecution::new(
execution_id,
workflow_name.to_string(),
self.clone(),
))
}
async fn execute_with_callback(
&self,
workflow_name: &str,
context: Context<serde_json::Value>,
callback: Box<dyn crate::executor::workflow_executor::StatusCallback>,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError> {
let execution = self.execute_async(workflow_name, context).await?;
let execution_id = execution.execution_id;
let mut last_status = WorkflowStatus::Pending;
callback.on_status_change(last_status.clone());
loop {
let current_status = self.get_execution_status(execution_id).await?;
if current_status != last_status {
callback.on_status_change(current_status.clone());
last_status = current_status.clone();
}
if current_status.is_terminal() {
return self.get_execution_result(execution_id).await;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
async fn get_execution_status(
&self,
execution_id: Uuid,
) -> Result<WorkflowStatus, WorkflowExecutionError> {
let dal = DAL::new(self.database.clone());
let execution = dal
.workflow_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get execution status: {}", e),
})?;
let status = match execution.status.as_str() {
"Pending" => WorkflowStatus::Pending,
"Running" => WorkflowStatus::Running,
"Completed" => WorkflowStatus::Completed,
"Failed" => WorkflowStatus::Failed,
"Cancelled" => WorkflowStatus::Cancelled,
"Paused" => WorkflowStatus::Paused,
_ => WorkflowStatus::Failed,
};
Ok(status)
}
async fn get_execution_result(
&self,
execution_id: Uuid,
) -> Result<WorkflowExecutionResult, WorkflowExecutionError> {
self.build_workflow_result(execution_id).await
}
async fn cancel_execution(&self, execution_id: Uuid) -> Result<(), WorkflowExecutionError> {
let dal = DAL::new(self.database.clone());
dal.workflow_execution()
.cancel(execution_id.into())
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to cancel execution: {}", e),
})?;
Ok(())
}
async fn pause_execution(
&self,
execution_id: Uuid,
reason: Option<&str>,
) -> Result<(), WorkflowExecutionError> {
let dal = DAL::new(self.database.clone());
let execution = dal
.workflow_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get execution: {}", e),
})?;
if execution.status != "Running" && execution.status != "Pending" {
return Err(WorkflowExecutionError::ExecutionFailed {
message: format!(
"Cannot pause workflow with status '{}'. Only 'Pending' or 'Running' workflows can be paused.",
execution.status
),
});
}
dal.workflow_execution()
.pause(execution_id.into(), reason)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to pause execution: {}", e),
})?;
Ok(())
}
async fn resume_execution(&self, execution_id: Uuid) -> Result<(), WorkflowExecutionError> {
let dal = DAL::new(self.database.clone());
let execution = dal
.workflow_execution()
.get_by_id(UniversalUuid(execution_id))
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get execution: {}", e),
})?;
if execution.status != "Paused" {
return Err(WorkflowExecutionError::ExecutionFailed {
message: format!(
"Cannot resume workflow with status '{}'. Only 'Paused' workflows can be resumed.",
execution.status
),
});
}
dal.workflow_execution()
.resume(execution_id.into())
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to resume execution: {}", e),
})?;
Ok(())
}
async fn list_executions(
&self,
) -> Result<Vec<WorkflowExecutionResult>, WorkflowExecutionError> {
let dal = DAL::new(self.database.clone());
let executions = dal
.workflow_execution()
.list_recent(100)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to list executions: {}", e),
})?;
let mut results = Vec::new();
for execution in executions {
if let Ok(result) = self.build_workflow_result(execution.id.into()).await {
results.push(result);
}
}
Ok(results)
}
async fn shutdown(&self) -> Result<(), WorkflowExecutionError> {
DefaultRunner::shutdown(self).await
}
}