1use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio::time::timeout;
13
14use kotoba_errors::WorkflowError;
18
19use kotoba_core::types::{GraphRef_ as GraphRef, TxId};
25use kotoba_core::prelude::StrategyOp;
26use crate::ir::*;
27
28#[async_trait]
30pub trait Activity: Send + Sync {
31 async fn execute(&self, inputs: HashMap<String, serde_json::Value>) -> std::result::Result<HashMap<String, serde_json::Value>, ActivityError>;
32 fn name(&self) -> &str;
33 fn timeout(&self) -> Option<Duration> { None }
34 fn retry_policy(&self) -> Option<RetryPolicy> { None }
35}
36
37#[derive(Debug, thiserror::Error)]
39pub enum ActivityError {
40 #[error("Activity not found: {0}")]
41 NotFound(String),
42 #[error("Activity execution failed: {0}")]
43 ExecutionFailed(String),
44 #[error("Activity timeout")]
45 Timeout,
46 #[error("Invalid input: {0}")]
47 InvalidInput(String),
48}
49
50impl From<ActivityError> for kotoba_errors::WorkflowError {
51 fn from(err: ActivityError) -> Self {
52 match err {
53 ActivityError::NotFound(msg) => kotoba_errors::WorkflowError::InvalidDefinition(format!("Activity not found: {}", msg)),
54 ActivityError::ExecutionFailed(msg) => kotoba_errors::WorkflowError::InvalidDefinition(format!("Activity execution failed: {}", msg)),
55 ActivityError::Timeout => kotoba_errors::WorkflowError::InvalidDefinition("Activity timeout".to_string()),
56 ActivityError::InvalidInput(msg) => kotoba_errors::WorkflowError::InvalidDefinition(format!("Invalid input: {}", msg)),
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct RetryPolicy {
64 pub initial_interval: Duration,
65 pub backoff_coefficient: f64,
66 pub maximum_interval: Option<Duration>,
67 pub maximum_attempts: u32,
68 pub non_retryable_errors: Vec<String>,
69}
70
71#[derive(Debug, Clone)]
73pub struct ActivityResult {
74 pub activity_name: String,
75 pub status: ActivityStatus,
76 pub outputs: Option<HashMap<String, serde_json::Value>>,
77 pub error: Option<String>,
78 pub execution_time: Duration,
79 pub attempt_count: u32,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub enum ActivityStatus {
85 Scheduled,
86 Started,
87 Completed,
88 Failed,
89 Cancelled,
90 TimedOut,
91}
92
93pub struct ActivityRegistry {
95 activities: tokio::sync::RwLock<HashMap<String, Arc<dyn Activity>>>,
96}
97
98impl ActivityRegistry {
99 pub fn new() -> Self {
100 Self {
101 activities: tokio::sync::RwLock::new(HashMap::new()),
102 }
103 }
104
105 pub async fn register(&self, activity: Arc<dyn Activity>) {
107 let mut activities = self.activities.write().await;
108 activities.insert(activity.name().to_string(), activity);
109 }
110
111 pub async fn get(&self, name: &str) -> Option<Arc<dyn Activity>> {
113 let activities = self.activities.read().await;
114 activities.get(name).cloned()
115 }
116
117 pub async fn execute(
119 &self,
120 name: &str,
121 inputs: HashMap<String, serde_json::Value>,
122 ) -> std::result::Result<ActivityResult, ActivityError> {
123 let start_time = std::time::Instant::now();
124 let activity = self.get(name).await
125 .ok_or(ActivityError::NotFound(name.to_string()))?;
126
127 let mut attempt_count = 0;
128 let retry_policy = activity.retry_policy();
129
130 if let Some(retry_policy) = retry_policy {
132 self.execute_with_retry(&*activity, inputs, retry_policy, start_time).await
133 } else {
134 self.execute_once(&*activity, inputs, start_time, 1).await
135 }
136 }
137
138 async fn execute_with_retry(
139 &self,
140 activity: &dyn Activity,
141 inputs: HashMap<String, serde_json::Value>,
142 retry_policy: RetryPolicy,
143 start_time: std::time::Instant,
144 ) -> std::result::Result<ActivityResult, ActivityError> {
145 let mut attempt_count = 0;
146 let mut current_interval = retry_policy.initial_interval;
147
148 loop {
149 attempt_count += 1;
150
151 match self.execute_once(activity, inputs.clone(), start_time, attempt_count).await {
152 Ok(result) => return Ok(result),
153 Err(e) => {
154 if retry_policy.non_retryable_errors.iter().any(|err| e.to_string().contains(err)) {
156 return Err(e);
157 }
158
159 if attempt_count >= retry_policy.maximum_attempts {
161 return Err(e);
162 }
163
164 tokio::time::sleep(current_interval).await;
166
167 current_interval = std::cmp::min(
169 current_interval.mul_f64(retry_policy.backoff_coefficient),
170 retry_policy.maximum_interval.unwrap_or(Duration::from_secs(300)),
171 );
172 }
173 }
174 }
175 }
176
177 async fn execute_once(
178 &self,
179 activity: &dyn Activity,
180 inputs: HashMap<String, serde_json::Value>,
181 start_time: std::time::Instant,
182 attempt_count: u32,
183 ) -> std::result::Result<ActivityResult, ActivityError> {
184 let result = if let Some(timeout_duration) = activity.timeout() {
186 match timeout(timeout_duration, activity.execute(inputs)).await {
187 Ok(result) => result,
188 Err(_) => return Err(ActivityError::Timeout),
189 }
190 } else {
191 activity.execute(inputs).await
192 };
193
194 let execution_time = start_time.elapsed();
195
196 match result {
197 Ok(outputs) => Ok(ActivityResult {
198 activity_name: activity.name().to_string(),
199 status: ActivityStatus::Completed,
200 outputs: Some(outputs),
201 error: None,
202 execution_time,
203 attempt_count,
204 }),
205 Err(e) => Ok(ActivityResult {
206 activity_name: activity.name().to_string(),
207 status: ActivityStatus::Failed,
208 outputs: None,
209 error: Some(e.to_string()),
210 execution_time,
211 attempt_count,
212 }),
213 }
214 }
215
216 pub async fn list_activities(&self) -> Vec<String> {
218 let activities = self.activities.read().await;
219 activities.keys().cloned().collect()
220 }
221}
222
223#[derive(Debug, Clone, Default)]
227pub struct ExecutionContext {
228 pub data: HashMap<String, serde_json::Value>,
230}
231
232impl ExecutionContext {
233 pub fn new(initial_data: serde_json::Value) -> Self {
235 let mut data = HashMap::new();
236 data.insert("request".to_string(), initial_data);
237 Self { data }
238 }
239
240 pub fn resolve(&self, path: &str) -> Option<&serde_json::Value> {
242 let mut parts = path.split('.');
243 if parts.next()? != "context" {
244 return None;
245 }
246 let step_id = parts.next()?;
247 let mut current = self.data.get(step_id)?;
248 for part in parts {
249 current = current.get(part)?;
250 }
251 Some(current)
252 }
253}
254
255
256pub struct WorkflowExecutor {
258 activity_registry: Arc<ActivityRegistry>,
259 state_manager: Arc<WorkflowStateManager>,
260 }
263
264impl WorkflowExecutor {
265 pub fn new(
266 activity_registry: Arc<ActivityRegistry>,
267 state_manager: Arc<WorkflowStateManager>,
268 ) -> Self {
270 Self {
271 activity_registry,
272 state_manager,
273 }
275 }
276
277 async fn execute_step(
303 &self,
304 step: &WorkflowStep,
305 context: &ExecutionContext,
306 ) -> Result<serde_json::Value, WorkflowError> {
307 match step.step_type {
308 WorkflowStepType::DbQuery => {
309 println!("Executing DB Query: {}", step.body);
311 Ok(serde_json::json!({ "result": "mock_db_query_result" }))
312 }
318 WorkflowStepType::DbRewrite => {
319 println!("Executing DB Rewrite Rule: {}", step.body);
321 Ok(serde_json::json!({ "result": "mock_db_rewrite_result" }))
322 }
327 WorkflowStepType::Return => {
328 let body = self.materialize_params(&step.body, context)?;
330 Ok(body)
331 }
332 _ => Err(WorkflowError::InvalidDefinition(format!("Invalid step type: {:?}", step.step_type))),
334 }
335 }
336
337 pub fn materialize_params(
339 &self,
340 params: &serde_json::Value,
341 context: &ExecutionContext,
342 ) -> Result<serde_json::Value, WorkflowError> {
343 match params {
344 serde_json::Value::String(s) if s.starts_with("context.") => {
345 context.resolve(s)
346 .cloned()
347 .ok_or_else(|| WorkflowError::ContextVariableNotFound(s.clone()))
348 }
349 serde_json::Value::Object(map) => {
350 let mut new_map = serde_json::Map::new();
351 for (k, v) in map {
352 new_map.insert(k.clone(), self.materialize_params(v, context)?);
353 }
354 Ok(serde_json::Value::Object(new_map))
355 }
356 serde_json::Value::Array(arr) => {
357 let mut new_arr = Vec::new();
358 for v in arr {
359 new_arr.push(self.materialize_params(v, context)?);
360 }
361 Ok(serde_json::Value::Array(new_arr))
362 }
363 _ => Ok(params.clone()),
365 }
366 }
367
368
369 pub async fn start_workflow(
371 &self,
372 workflow_ir: &WorkflowIR,
373 inputs: HashMap<String, serde_json::Value>,
374 ) -> std::result::Result<WorkflowExecutionId, WorkflowError> {
375 let execution_id = self.state_manager.create_execution(workflow_ir, inputs.clone()).await?;
377
378 let executor = Arc::new(Self::new(
380 Arc::clone(&self.activity_registry),
381 Arc::clone(&self.state_manager),
382 ));
384
385 let workflow_ir = workflow_ir.clone();
386 let execution_id_clone = execution_id.clone();
387
388 tokio::spawn(async move {
389 if let Err(e) = executor.execute_workflow(workflow_ir, execution_id_clone).await {
390 eprintln!("Workflow execution failed: {:?}", e);
391 }
392 });
393
394 Ok(execution_id)
395 }
396
397 async fn execute_workflow(
399 &self,
400 workflow_ir: WorkflowIR,
401 execution_id: WorkflowExecutionId,
402 ) -> std::result::Result<(), WorkflowError> {
403 let initial_graph = GraphRef("initial".to_string());
405
406 let result = self.execute_strategy(workflow_ir.strategy, initial_graph, &execution_id).await;
408
409 let mut execution = self.state_manager.get_execution(&execution_id).await
411 .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
412
413 match result {
414 Ok(final_graph) => {
415 execution.status = ExecutionStatus::Completed;
416 execution.end_time = Some(chrono::Utc::now());
417 execution.current_graph = final_graph;
418 let event = ExecutionEvent {
422 id: uuid::Uuid::new_v4().to_string(),
423 timestamp: chrono::Utc::now(),
424 event_type: ExecutionEventType::WorkflowCompleted,
425 payload: HashMap::new(),
426 };
427 self.state_manager.add_execution_event(&execution_id, event).await?;
428 }
429 Err(e) => {
430 execution.status = ExecutionStatus::Failed;
431 execution.end_time = Some(chrono::Utc::now());
432
433 let event = ExecutionEvent {
435 id: uuid::Uuid::new_v4().to_string(),
436 timestamp: chrono::Utc::now(),
437 event_type: ExecutionEventType::WorkflowFailed,
438 payload: [("error".to_string(), serde_json::json!(e.to_string()))].into_iter().collect(),
439 };
440 self.state_manager.add_execution_event(&execution_id, event).await?;
441 }
442 }
443
444 self.state_manager.update_execution(execution).await?;
446 Ok(())
447 }
448
449 fn execute_strategy<'a>(
451 &'a self,
452 strategy: WorkflowStrategyOp,
453 graph: GraphRef,
454 execution_id: &'a WorkflowExecutionId,
455 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<GraphRef, WorkflowError>> + Send + 'a>> {
456 Box::pin(async move {
457 match strategy {
458 WorkflowStrategyOp::Basic { strategy } => {
459 self.execute_basic_strategy(strategy, graph, execution_id).await
460 }
461 WorkflowStrategyOp::Seq { strategies } => {
462 self.execute_seq(strategies, graph, execution_id).await
463 }
464 WorkflowStrategyOp::Parallel { branches, completion_condition } => {
465 self.execute_parallel(branches, completion_condition, graph, execution_id).await
466 }
467 WorkflowStrategyOp::Decision { conditions, default_branch } => {
468 self.execute_decision(conditions, default_branch, graph, execution_id).await
469 }
470 WorkflowStrategyOp::Wait { condition, timeout } => {
471 self.execute_wait(condition, timeout, graph, execution_id).await
472 }
473 WorkflowStrategyOp::Saga { main_flow, compensation } => {
474 self.execute_saga(*main_flow, *compensation, graph, execution_id).await
475 }
476 WorkflowStrategyOp::Activity { activity_ref, input_mapping, retry_policy } => {
477 self.execute_activity(activity_ref, input_mapping, retry_policy, graph, execution_id).await
478 }
479 WorkflowStrategyOp::SubWorkflow { workflow_ref, input_mapping } => {
480 self.execute_subworkflow(workflow_ref, input_mapping, graph, execution_id).await
481 }
482 }
483 })
484 }
485
486 fn execute_basic_strategy<'a>(
488 &'a self,
489 strategy: StrategyOp,
490 graph: GraphRef,
491 execution_id: &'a WorkflowExecutionId,
492 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<GraphRef, WorkflowError>> + Send + 'a>> {
493 Box::pin(async move {
494 match strategy {
495 StrategyOp::Seq { strategies } => {
496 let mut current_graph = graph;
497 for strategy in strategies {
498 current_graph = self.execute_basic_strategy(*strategy, current_graph, execution_id).await?;
499 }
500 Ok(current_graph)
501 }
502 StrategyOp::Once { rule } => {
503 println!("Executing rule: {}", rule);
505 Ok(graph) }
507 StrategyOp::Exhaust { rule, order: _, measure: _ } => {
508 println!("Executing rule exhaustively: {}", rule);
510 Ok(graph)
511 }
512 StrategyOp::While { rule, pred: _, order: _ } => {
513 println!("Executing rule while predicate: {}", rule);
515 Ok(graph)
516 }
517 StrategyOp::Choice { strategies } => {
518 for strategy in strategies {
520 match self.execute_basic_strategy(*strategy.clone(), graph.clone(), execution_id).await {
521 Ok(result_graph) => return Ok(result_graph),
522 Err(_) => continue,
523 }
524 }
525 Err(WorkflowError::InvalidStrategy("All strategies in choice failed".to_string()))
526 }
527 StrategyOp::Priority { strategies } => {
528 println!("Executing with priority");
530 if let Some(first_strategy) = strategies.first() {
533 self.execute_basic_strategy((*first_strategy.strategy).clone(), graph, execution_id).await
534 } else {
535 Ok(graph)
536 }
537 }
538 }
539 })
540 }
541
542 async fn execute_seq(
544 &self,
545 strategies: Vec<Box<WorkflowStrategyOp>>,
546 graph: GraphRef,
547 execution_id: &WorkflowExecutionId,
548 ) -> std::result::Result<GraphRef, WorkflowError> {
549 let mut current_graph = graph;
550 for strategy in strategies {
551 current_graph = self.execute_strategy(*strategy, current_graph, execution_id).await?;
552 }
553 Ok(current_graph)
554 }
555
556 async fn execute_parallel(
558 &self,
559 branches: Vec<Box<WorkflowStrategyOp>>,
560 completion_condition: CompletionCondition,
561 graph: GraphRef,
562 execution_id: &WorkflowExecutionId,
563 ) -> std::result::Result<GraphRef, WorkflowError> {
564 let mut handles = vec![];
565
566 for branch in branches {
567 let executor = Arc::new(Self::new(
568 Arc::clone(&self.activity_registry),
569 Arc::clone(&self.state_manager),
570 ));
572 let graph_clone = graph.clone();
573 let execution_id_clone = execution_id.clone();
574
575 let handle = tokio::spawn(async move {
576 executor.execute_strategy(*branch, graph_clone, &execution_id_clone).await
577 });
578 handles.push(handle);
579 }
580
581 match completion_condition {
582 CompletionCondition::All => {
583 let mut results = vec![];
585 for handle in handles {
586 results.push(handle.await.map_err(|_| WorkflowError::InvalidStrategy("Task panicked".to_string()))?);
587 }
588 results.into_iter().next().unwrap_or(Ok(graph.clone()))
590 }
591 CompletionCondition::Any => {
592 Err(WorkflowError::InvalidStrategy("Any completion not implemented".to_string()))
595 }
596 CompletionCondition::AtLeast(count) => {
597 Err(WorkflowError::InvalidStrategy("AtLeast completion not implemented".to_string()))
599 }
600 }
601 }
602
603 async fn execute_decision(
605 &self,
606 conditions: Vec<DecisionBranch>,
607 default_branch: Option<Box<WorkflowStrategyOp>>,
608 graph: GraphRef,
609 execution_id: &WorkflowExecutionId,
610 ) -> std::result::Result<GraphRef, WorkflowError> {
611 let execution = self.state_manager.get_execution(execution_id).await
613 .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
614
615 let context = execution.inputs.clone();
616
617 for branch in conditions {
619 if self.evaluate_condition(&branch.condition, &context) {
620 return self.execute_strategy(*branch.branch, graph, execution_id).await;
621 }
622 }
623
624 if let Some(default_branch) = default_branch {
626 self.execute_strategy(*default_branch, graph, execution_id).await
627 } else {
628 Ok(graph) }
630 }
631
632 async fn execute_wait(
634 &self,
635 condition: WaitCondition,
636 timeout: Option<Duration>,
637 graph: GraphRef,
638 _execution_id: &WorkflowExecutionId,
639 ) -> std::result::Result<GraphRef, WorkflowError> {
640 match condition {
641 WaitCondition::Timer { duration } => {
642 tokio::time::sleep(duration).await;
643 Ok(graph)
644 }
645 WaitCondition::Event { event_type, filter } => {
646 println!("Waiting for event: {}", event_type);
648 if let Some(timeout) = timeout {
649 tokio::time::sleep(timeout).await;
650 }
651 Ok(graph)
652 }
653 WaitCondition::Signal { signal_name } => {
654 println!("Waiting for signal: {}", signal_name);
656 if let Some(timeout) = timeout {
657 tokio::time::sleep(timeout).await;
658 }
659 Ok(graph)
660 }
661 }
662 }
663
664 async fn execute_saga(
666 &self,
667 main_flow: WorkflowStrategyOp,
668 compensation: WorkflowStrategyOp,
669 graph: GraphRef,
670 execution_id: &WorkflowExecutionId,
671 ) -> std::result::Result<GraphRef, WorkflowError> {
672 match self.execute_strategy(main_flow, graph.clone(), execution_id).await {
674 Ok(result_graph) => Ok(result_graph),
675 Err(e) => {
676 println!("Main flow failed, executing compensation");
678 match self.execute_strategy(compensation, graph, execution_id).await {
679 Ok(_) => Err(WorkflowError::CompensationFailed("Main flow failed, compensation executed".to_string())),
680 Err(compensation_error) => Err(WorkflowError::CompensationFailed(
681 format!("Main flow failed and compensation also failed: {:?}", compensation_error)
682 )),
683 }
684 }
685 }
686 }
687
688 async fn execute_activity(
690 &self,
691 activity_ref: String,
692 input_mapping: HashMap<String, String>,
693 retry_policy: Option<crate::ir::RetryPolicy>,
694 graph: GraphRef,
695 execution_id: &WorkflowExecutionId,
696 ) -> std::result::Result<GraphRef, WorkflowError> {
697 let execution = self.state_manager.get_execution(execution_id).await
699 .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
700
701 let inputs = self.map_inputs(&input_mapping, &execution.inputs)?;
702
703 let result = if let Some(retry_policy) = retry_policy {
705 self.execute_with_retry(&activity_ref, inputs, retry_policy).await
706 } else {
707 self.activity_registry.execute(&activity_ref, inputs).await
708 .map(|result| result.outputs.unwrap_or_default())
709 };
710
711 match result {
712 Ok(outputs) => {
713 println!("Activity {} completed successfully", activity_ref);
715 Ok(graph)
716 }
717 Err(e) => {
718 println!("Activity {} failed: {:?}", activity_ref, e);
719 Err(WorkflowError::InvalidDefinition(format!("Activity failed: {:?}", e)))
720 }
721 }
722 }
723
724 async fn execute_subworkflow(
726 &self,
727 workflow_ref: String,
728 input_mapping: HashMap<String, String>,
729 graph: GraphRef,
730 execution_id: &WorkflowExecutionId,
731 ) -> std::result::Result<GraphRef, WorkflowError> {
732 let execution = self.state_manager.get_execution(execution_id).await
734 .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
735
736 let inputs = self.map_inputs(&input_mapping, &execution.inputs)?;
737
738 println!("Subworkflow {} execution not yet implemented", workflow_ref);
741 Ok(graph)
742 }
743
744 async fn execute_with_retry(
746 &self,
747 activity_ref: &str,
748 inputs: HashMap<String, serde_json::Value>,
749 retry_policy: crate::ir::RetryPolicy,
750 ) -> std::result::Result<HashMap<String, serde_json::Value>, ActivityError> {
751 let mut attempts = 0;
752 let mut current_interval = retry_policy.initial_interval;
753
754 loop {
755 attempts += 1;
756
757 match self.activity_registry.execute(activity_ref, inputs.clone()).await {
758 Ok(result) => return Ok(result.outputs.unwrap_or_default()),
759 Err(e) => {
760 if retry_policy.non_retryable_errors.iter().any(|err| e.to_string().contains(err)) {
762 return Err(e);
763 }
764
765 if attempts >= retry_policy.maximum_attempts {
767 return Err(e);
768 }
769
770 tokio::time::sleep(current_interval).await;
772
773 current_interval = std::cmp::min(
775 current_interval.mul_f64(retry_policy.backoff_coefficient),
776 retry_policy.maximum_interval.unwrap_or(Duration::from_secs(300)),
777 );
778 }
779 }
780 }
781 }
782
783 fn map_inputs(
785 &self,
786 mapping: &HashMap<String, String>,
787 context: &HashMap<String, serde_json::Value>,
788 ) -> std::result::Result<HashMap<String, serde_json::Value>, WorkflowError> {
789 let mut inputs = HashMap::new();
790
791 for (key, expr) in mapping {
792 if expr.starts_with("$.inputs.") {
794 let field = &expr[9..]; if let Some(value) = context.get(field) {
796 inputs.insert(key.clone(), value.clone());
797 }
798 }
799 }
800
801 Ok(inputs)
802 }
803
804 fn evaluate_condition(
806 &self,
807 condition: &str,
808 context: &HashMap<String, serde_json::Value>,
809 ) -> bool {
810 if condition.contains("==") {
813 true
816 } else {
817 false
818 }
819 }
820}
821
822pub struct WorkflowStateManager {
824 executions: RwLock<HashMap<String, Vec<(TxId, WorkflowExecution)>>>,
826 current_tx_id: std::sync::atomic::AtomicU64,
828}
829
830impl WorkflowStateManager {
831 pub fn new() -> Self {
832 Self {
833 executions: RwLock::new(HashMap::new()),
834 current_tx_id: std::sync::atomic::AtomicU64::new(1),
835 }
836 }
837
838 fn next_tx_id(&self) -> TxId {
840 let tx_id = self.current_tx_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
841 TxId(tx_id.to_string())
842 }
843
844 pub async fn create_execution(
846 &self,
847 workflow_ir: &WorkflowIR,
848 inputs: HashMap<String, serde_json::Value>,
849 ) -> std::result::Result<WorkflowExecutionId, WorkflowError> {
850 let tx_id = self.next_tx_id();
851 let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4().to_string());
852
853 let execution = WorkflowExecution {
854 id: execution_id.clone(),
855 workflow_id: workflow_ir.id.clone(),
856 status: ExecutionStatus::Running,
857 start_time: chrono::Utc::now(),
858 end_time: None,
859 inputs,
860 outputs: None,
861 current_graph: GraphRef("initial".to_string()),
862 execution_history: vec![ExecutionEvent {
863 id: uuid::Uuid::new_v4().to_string(),
864 timestamp: chrono::Utc::now(),
865 event_type: ExecutionEventType::Started,
866 payload: HashMap::new(),
867 }],
868 retry_count: 0,
869 timeout_at: workflow_ir.timeout.map(|t| chrono::Utc::now() + chrono::Duration::from_std(t).unwrap()),
870 };
871
872 let mut executions = self.executions.write().await;
873 let versions = executions.entry(execution_id.0.clone()).or_insert_with(Vec::new);
874 versions.push((tx_id, execution));
875
876 Ok(execution_id)
877 }
878
879 pub async fn get_execution_at(&self, id: &WorkflowExecutionId, tx_id: Option<TxId>) -> Option<WorkflowExecution> {
881 let executions = self.executions.read().await;
882 let versions = executions.get(&id.0)?;
883
884 match tx_id {
885 Some(tx_id) => {
886 versions.iter()
888 .filter(|(v_tx_id, _)| v_tx_id.0 <= tx_id.0)
889 .max_by_key(|(v_tx_id, _)| &v_tx_id.0)
890 .map(|(_, execution)| execution.clone())
891 }
892 None => {
893 versions.last().map(|(_, execution)| execution.clone())
895 }
896 }
897 }
898
899 pub async fn get_execution(&self, id: &WorkflowExecutionId) -> Option<WorkflowExecution> {
901 self.get_execution_at(id, None).await
902 }
903
904 pub async fn update_execution(&self, execution: WorkflowExecution) -> std::result::Result<TxId, WorkflowError> {
906 let tx_id = self.next_tx_id();
907
908 let mut executions = self.executions.write().await;
909 let versions = executions.entry(execution.id.0.clone()).or_insert_with(Vec::new);
910 versions.push((tx_id.clone(), execution));
911
912 Ok(tx_id)
913 }
914
915 pub async fn get_execution_history(&self, id: &WorkflowExecutionId) -> Vec<(TxId, WorkflowExecution)> {
917 let executions = self.executions.read().await;
918 executions.get(&id.0).cloned().unwrap_or_default()
919 }
920
921 pub async fn create_snapshot(&self, execution_id: &WorkflowExecutionId, max_versions: usize) -> std::result::Result<(), WorkflowError> {
923 let mut executions = self.executions.write().await;
924 if let Some(versions) = executions.get_mut(&execution_id.0) {
925 if versions.len() > max_versions {
926 let keep_count = versions.len().saturating_sub(max_versions);
928 versions.drain(0..keep_count);
929 }
930 }
931 Ok(())
932 }
933
934 pub async fn get_running_executions(&self) -> Vec<WorkflowExecution> {
936 let executions = self.executions.read().await;
937 executions.values()
938 .filter_map(|versions| {
939 versions.last().map(|(_, execution)| execution.clone())
940 .filter(|execution| matches!(execution.status, ExecutionStatus::Running))
941 })
942 .collect()
943 }
944
945 pub async fn add_execution_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> std::result::Result<TxId, WorkflowError> {
947 let mut execution = self.get_execution(execution_id).await
948 .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
949
950 execution.execution_history.push(event);
951 self.update_execution(execution).await
952 }
953}
954
955