kotoba_workflow_core/
engine.rs1use async_trait::async_trait;
4use chrono::Utc;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use uuid::Uuid;
9
10use crate::{
11 WorkflowEngineInterface,
12 WorkflowIR,
13 WorkflowExecution,
14 WorkflowExecutionId,
15 ExecutionStatus,
16 WorkflowError,
17};
18
19#[derive(Debug)]
21pub struct WorkflowEngine {
22 executions: Arc<RwLock<HashMap<WorkflowExecutionId, WorkflowExecution>>>,
23}
24
25impl WorkflowEngine {
26 pub fn new() -> Self {
27 Self {
28 executions: Arc::new(RwLock::new(HashMap::new())),
29 }
30 }
31
32 pub fn builder() -> WorkflowEngineBuilder {
33 WorkflowEngineBuilder::new()
34 }
35}
36
37impl Default for WorkflowEngine {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43#[async_trait]
44impl WorkflowEngineInterface for WorkflowEngine {
45 async fn start_workflow(
46 &self,
47 workflow: &WorkflowIR,
48 _context: serde_json::Value,
49 ) -> Result<WorkflowExecutionId, WorkflowError> {
50 let execution_id = WorkflowExecutionId(Uuid::new_v4().to_string());
51 let now = Utc::now();
52
53 let execution = WorkflowExecution {
54 execution_id: execution_id.clone(),
55 workflow_id: workflow.id.clone(),
56 status: ExecutionStatus::Running,
57 created_at: now,
58 updated_at: now,
59 result: None,
60 error: None,
61 };
62
63 let execution_clone = execution.clone();
65 let executions = Arc::clone(&self.executions);
66
67 tokio::spawn(async move {
68 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
69
70 let mut executions = executions.write().await;
71 if let Some(mut exec) = executions.get_mut(&execution_clone.execution_id) {
72 exec.status = ExecutionStatus::Completed;
73 exec.updated_at = Utc::now();
74 exec.result = Some(serde_json::json!({"message": "Workflow completed successfully"}));
75 }
76 });
77
78 let mut executions = self.executions.write().await;
79 executions.insert(execution_id.clone(), execution);
80
81 Ok(execution_id)
82 }
83
84 async fn get_execution(
85 &self,
86 execution_id: &WorkflowExecutionId,
87 ) -> Result<Option<WorkflowExecution>, WorkflowError> {
88 let executions = self.executions.read().await;
89 Ok(executions.get(execution_id).cloned())
90 }
91
92 async fn list_executions(&self) -> Result<Vec<WorkflowExecution>, WorkflowError> {
93 let executions = self.executions.read().await;
94 Ok(executions.values().cloned().collect())
95 }
96
97 async fn cancel_execution(
98 &self,
99 execution_id: &WorkflowExecutionId,
100 ) -> Result<(), WorkflowError> {
101 let mut executions = self.executions.write().await;
102 if let Some(mut execution) = executions.get_mut(execution_id) {
103 execution.status = ExecutionStatus::Cancelled;
104 execution.updated_at = Utc::now();
105 Ok(())
106 } else {
107 Err(WorkflowError::NotFound(format!("Execution {} not found", execution_id)))
108 }
109 }
110}
111
112#[derive(Debug, Default)]
114pub struct WorkflowEngineBuilder {
115 }
117
118impl WorkflowEngineBuilder {
119 pub fn new() -> Self {
120 Self {}
121 }
122
123 pub fn build(self) -> WorkflowEngine {
124 WorkflowEngine::new()
125 }
126
127 pub fn with_memory_storage(self) -> Self {
129 self
131 }
132}
133