use async_trait::async_trait;
use chrono;
use serde_json;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use uuid::Uuid;
use crate::core::{
TaskState, TaskStatus, WorkflowDefinition, WorkflowEngine, WorkflowResult, WorkflowState,
};
use crate::db::WorkflowDatabase;
pub struct DefaultWorkflowEngine {
database: Arc<dyn WorkflowDatabase>,
workflows: RwLock<HashMap<String, Arc<dyn WorkflowDefinition>>>,
}
impl DefaultWorkflowEngine {
pub fn new(database: Arc<dyn WorkflowDatabase>) -> Self {
Self {
database,
workflows: RwLock::new(HashMap::new()),
}
}
pub fn register_workflow(&self, workflow: Arc<dyn WorkflowDefinition>) {
self.workflows
.write()
.expect("workflows registry lock poisoned")
.insert(workflow.name().to_string(), workflow);
}
}
#[async_trait]
impl WorkflowEngine for DefaultWorkflowEngine {
async fn create_workflow(
&self,
definition_name: &str,
data: serde_json::Value,
) -> WorkflowResult<WorkflowState> {
let workflow_def = self
.workflows
.read()
.expect("workflows registry lock poisoned")
.get(definition_name)
.cloned()
.ok_or_else(|| format!("Workflow definition '{}' not found", definition_name))?;
let workflow = workflow_def.start(data).await?;
Ok(workflow)
}
async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
self.database.get_workflow(workflow_id).await
}
async fn update_workflow(
&self,
workflow_id: &str,
data: serde_json::Value,
) -> WorkflowResult<WorkflowState> {
let mut workflow = self.database.get_workflow(workflow_id).await?;
workflow.data = data;
workflow.updated_at = chrono::Utc::now();
self.database.update_workflow(&workflow).await?;
Ok(workflow)
}
async fn complete_task(
&self,
task_id: &str,
data: serde_json::Value,
) -> WorkflowResult<TaskState> {
let task = self.database.get_task(task_id).await?;
let workflow = self.database.get_workflow(&task.workflow_id).await?;
let workflow_def = self
.workflows
.read()
.expect("workflows registry lock poisoned")
.get(&workflow.name)
.cloned();
if let Some(definition) = workflow_def {
return definition.execute_task(task_id, data).await;
}
let mut fallback_task = task;
fallback_task.data = data;
fallback_task.status = TaskStatus::Completed;
fallback_task.completed_at = Some(chrono::Utc::now());
fallback_task.updated_at = chrono::Utc::now();
self.database.update_task(&fallback_task).await?;
Ok(fallback_task)
}
async fn get_tasks(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
self.database.get_tasks_by_workflow(workflow_id).await
}
}
pub fn generate_id() -> String {
Uuid::new_v4().to_string()
}