rust_viewflow/core/
engine.rs1use async_trait::async_trait;
2use chrono;
3use serde_json;
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6use uuid::Uuid;
7
8use crate::core::{
9 TaskState, TaskStatus, WorkflowDefinition, WorkflowEngine, WorkflowResult, WorkflowState,
10};
11use crate::db::WorkflowDatabase;
12
13pub struct DefaultWorkflowEngine {
15 database: Arc<dyn WorkflowDatabase>,
16 workflows: RwLock<HashMap<String, Arc<dyn WorkflowDefinition>>>,
17}
18
19impl DefaultWorkflowEngine {
20 pub fn new(database: Arc<dyn WorkflowDatabase>) -> Self {
21 Self {
22 database,
23 workflows: RwLock::new(HashMap::new()),
24 }
25 }
26
27 pub fn register_workflow(&self, workflow: Arc<dyn WorkflowDefinition>) {
28 self.workflows
29 .write()
30 .expect("workflows registry lock poisoned")
31 .insert(workflow.name().to_string(), workflow);
32 }
33}
34
35#[async_trait]
36impl WorkflowEngine for DefaultWorkflowEngine {
37 async fn create_workflow(
38 &self,
39 definition_name: &str,
40 data: serde_json::Value,
41 ) -> WorkflowResult<WorkflowState> {
42 let workflow_def = self
43 .workflows
44 .read()
45 .expect("workflows registry lock poisoned")
46 .get(definition_name)
47 .cloned()
48 .ok_or_else(|| format!("Workflow definition '{}' not found", definition_name))?;
49
50 let workflow = workflow_def.start(data).await?;
51
52 Ok(workflow)
53 }
54
55 async fn get_workflow(&self, workflow_id: &str) -> WorkflowResult<WorkflowState> {
56 self.database.get_workflow(workflow_id).await
57 }
58
59 async fn update_workflow(
60 &self,
61 workflow_id: &str,
62 data: serde_json::Value,
63 ) -> WorkflowResult<WorkflowState> {
64 let mut workflow = self.database.get_workflow(workflow_id).await?;
65 workflow.data = data;
66 workflow.updated_at = chrono::Utc::now();
67
68 self.database.update_workflow(&workflow).await?;
69
70 Ok(workflow)
71 }
72
73 async fn complete_task(
74 &self,
75 task_id: &str,
76 data: serde_json::Value,
77 ) -> WorkflowResult<TaskState> {
78 let task = self.database.get_task(task_id).await?;
79 let workflow = self.database.get_workflow(&task.workflow_id).await?;
80
81 let workflow_def = self
82 .workflows
83 .read()
84 .expect("workflows registry lock poisoned")
85 .get(&workflow.name)
86 .cloned();
87
88 if let Some(definition) = workflow_def {
89 return definition.execute_task(task_id, data).await;
90 }
91
92 let mut fallback_task = task;
93 fallback_task.data = data;
94 fallback_task.status = TaskStatus::Completed;
95 fallback_task.completed_at = Some(chrono::Utc::now());
96 fallback_task.updated_at = chrono::Utc::now();
97
98 self.database.update_task(&fallback_task).await?;
99
100 Ok(fallback_task)
101 }
102
103 async fn get_tasks(&self, workflow_id: &str) -> WorkflowResult<Vec<TaskState>> {
104 self.database.get_tasks_by_workflow(workflow_id).await
105 }
106}
107
108pub fn generate_id() -> String {
110 Uuid::new_v4().to_string()
111}