Skip to main content

rust_viewflow/core/
engine.rs

1use async_trait::async_trait;
2use chrono;
3use serde_json;
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6use uuid::Uuid;
7
8use crate::core::{
9    TaskState, TaskStatus, WorkflowDefinition, WorkflowEngine, WorkflowResult, WorkflowState,
10};
11use crate::db::WorkflowDatabase;
12
13/// Default workflow engine implementation
14pub struct DefaultWorkflowEngine {
15    database: Arc<dyn WorkflowDatabase>,
16    workflows: RwLock<HashMap<String, Arc<dyn WorkflowDefinition>>>,
17}
18
19impl DefaultWorkflowEngine {
20    pub fn new(database: Arc<dyn WorkflowDatabase>) -> Self {
21        Self {
22            database,
23            workflows: RwLock::new(HashMap::new()),
24        }
25    }
26
27    pub fn register_workflow(&self, workflow: Arc<dyn WorkflowDefinition>) {
28        self.workflows
29            .write()
30            .expect("workflows registry lock poisoned")
31            .insert(workflow.name().to_string(), workflow);
32    }
33}
34
35#[async_trait]
36impl WorkflowEngine for DefaultWorkflowEngine {
37    async fn create_workflow(
38        &self,
39        definition_name: &str,
40        data: serde_json::Value,
41    ) -> WorkflowResult<WorkflowState> {
42        let workflow_def = self
43            .workflows
44            .read()
45            .expect("workflows registry lock poisoned")
46            .get(definition_name)
47            .cloned()
48            .ok_or_else(|| format!("Workflow definition '{}' not found", definition_name))?;
49
50        let workflow = workflow_def.start(data).await?;
51
52        Ok(workflow)
53    }
54
55    async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
56        self.database.get_workflow(workflow_id).await
57    }
58
59    async fn update_workflow(
60        &self,
61        workflow_id: &str,
62        data: serde_json::Value,
63    ) -> WorkflowResult<WorkflowState> {
64        let mut workflow = self.database.get_workflow(workflow_id).await?;
65        workflow.data = data;
66        workflow.updated_at = chrono::Utc::now();
67
68        self.database.update_workflow(&workflow).await?;
69
70        Ok(workflow)
71    }
72
73    async fn complete_task(
74        &self,
75        task_id: &str,
76        data: serde_json::Value,
77    ) -> WorkflowResult<TaskState> {
78        let task = self.database.get_task(task_id).await?;
79        let workflow = self.database.get_workflow(&task.workflow_id).await?;
80
81        let workflow_def = self
82            .workflows
83            .read()
84            .expect("workflows registry lock poisoned")
85            .get(&workflow.name)
86            .cloned();
87
88        if let Some(definition) = workflow_def {
89            return definition.execute_task(task_id, data).await;
90        }
91
92        let mut fallback_task = task;
93        fallback_task.data = data;
94        fallback_task.status = TaskStatus::Completed;
95        fallback_task.completed_at = Some(chrono::Utc::now());
96        fallback_task.updated_at = chrono::Utc::now();
97
98        self.database.update_task(&fallback_task).await?;
99
100        Ok(fallback_task)
101    }
102
103    async fn get_tasks(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
104        self.database.get_tasks_by_workflow(workflow_id).await
105    }
106}
107
108/// Helper function to generate unique IDs
109pub fn generate_id() -> String {
110    Uuid::new_v4().to_string()
111}