kotoba_workflow/
executor.rs

1//! WorkflowExecutor - Temporalベースワークフロー実行器
2//!
3//! 拡張されたStrategyIRを解釈し、Temporal風のワークフロー実行を
4//! 実現します。Activity実行、並列実行、Sagaパターンなどをサポート。
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio::time::timeout;
13
14// Import the new routing schema
15// use kotoba_routing::schema::{WorkflowStep, WorkflowStepType};
16// Import the shared error type
17use kotoba_errors::WorkflowError;
18
19// These will be needed once we integrate the DB handler
20// use kotoba_jsonnet::runtime::DbHandler;
21// use kotoba_core::execution::QueryExecutor;
22// use kotoba_core::rewrite::RewriteEngine;
23
24use kotoba_core::types::{GraphRef_ as GraphRef, TxId};
25use kotoba_core::prelude::StrategyOp;
26use crate::ir::*;
27
28/// Activity実行インターフェース
29#[async_trait]
30pub trait Activity: Send + Sync {
31    async fn execute(&self, inputs: HashMap<String, serde_json::Value>) -> std::result::Result<HashMap<String, serde_json::Value>, ActivityError>;
32    fn name(&self) -> &str;
33    fn timeout(&self) -> Option<Duration> { None }
34    fn retry_policy(&self) -> Option<RetryPolicy> { None }
35}
36
37/// Activity実行エラー
38#[derive(Debug, thiserror::Error)]
39pub enum ActivityError {
40    #[error("Activity not found: {0}")]
41    NotFound(String),
42    #[error("Activity execution failed: {0}")]
43    ExecutionFailed(String),
44    #[error("Activity timeout")]
45    Timeout,
46    #[error("Invalid input: {0}")]
47    InvalidInput(String),
48}
49
50impl From<ActivityError> for kotoba_errors::WorkflowError {
51    fn from(err: ActivityError) -> Self {
52        match err {
53            ActivityError::NotFound(msg) => kotoba_errors::WorkflowError::InvalidDefinition(format!("Activity not found: {}", msg)),
54            ActivityError::ExecutionFailed(msg) => kotoba_errors::WorkflowError::InvalidDefinition(format!("Activity execution failed: {}", msg)),
55            ActivityError::Timeout => kotoba_errors::WorkflowError::InvalidDefinition("Activity timeout".to_string()),
56            ActivityError::InvalidInput(msg) => kotoba_errors::WorkflowError::InvalidDefinition(format!("Invalid input: {}", msg)),
57        }
58    }
59}
60
61/// リトライポリシー
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct RetryPolicy {
64    pub initial_interval: Duration,
65    pub backoff_coefficient: f64,
66    pub maximum_interval: Option<Duration>,
67    pub maximum_attempts: u32,
68    pub non_retryable_errors: Vec<String>,
69}
70
71/// Activity実行結果
72#[derive(Debug, Clone)]
73pub struct ActivityResult {
74    pub activity_name: String,
75    pub status: ActivityStatus,
76    pub outputs: Option<HashMap<String, serde_json::Value>>,
77    pub error: Option<String>,
78    pub execution_time: Duration,
79    pub attempt_count: u32,
80}
81
82/// Activity実行状態
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub enum ActivityStatus {
85    Scheduled,
86    Started,
87    Completed,
88    Failed,
89    Cancelled,
90    TimedOut,
91}
92
93/// Activityレジストリ
94pub struct ActivityRegistry {
95    activities: tokio::sync::RwLock<HashMap<String, Arc<dyn Activity>>>,
96}
97
98impl ActivityRegistry {
99    pub fn new() -> Self {
100        Self {
101            activities: tokio::sync::RwLock::new(HashMap::new()),
102        }
103    }
104
105    /// Activityを登録
106    pub async fn register(&self, activity: Arc<dyn Activity>) {
107        let mut activities = self.activities.write().await;
108        activities.insert(activity.name().to_string(), activity);
109    }
110
111    /// Activityを取得
112    pub async fn get(&self, name: &str) -> Option<Arc<dyn Activity>> {
113        let activities = self.activities.read().await;
114        activities.get(name).cloned()
115    }
116
117    /// Activityを実行
118    pub async fn execute(
119        &self,
120        name: &str,
121        inputs: HashMap<String, serde_json::Value>,
122    ) -> std::result::Result<ActivityResult, ActivityError> {
123        let start_time = std::time::Instant::now();
124        let activity = self.get(name).await
125            .ok_or(ActivityError::NotFound(name.to_string()))?;
126
127        let mut attempt_count = 0;
128        let retry_policy = activity.retry_policy();
129
130        // リトライロジック
131        if let Some(retry_policy) = retry_policy {
132            self.execute_with_retry(&*activity, inputs, retry_policy, start_time).await
133        } else {
134            self.execute_once(&*activity, inputs, start_time, 1).await
135        }
136    }
137
138    async fn execute_with_retry(
139        &self,
140        activity: &dyn Activity,
141        inputs: HashMap<String, serde_json::Value>,
142        retry_policy: RetryPolicy,
143        start_time: std::time::Instant,
144    ) -> std::result::Result<ActivityResult, ActivityError> {
145        let mut attempt_count = 0;
146        let mut current_interval = retry_policy.initial_interval;
147
148        loop {
149            attempt_count += 1;
150
151            match self.execute_once(activity, inputs.clone(), start_time, attempt_count).await {
152                Ok(result) => return Ok(result),
153                Err(e) => {
154                    // リトライ不可エラーのチェック
155                    if retry_policy.non_retryable_errors.iter().any(|err| e.to_string().contains(err)) {
156                        return Err(e);
157                    }
158
159                    // 最大試行回数チェック
160                    if attempt_count >= retry_policy.maximum_attempts {
161                        return Err(e);
162                    }
163
164                    // リトライ待機
165                    tokio::time::sleep(current_interval).await;
166
167                    // インターバル更新
168                    current_interval = std::cmp::min(
169                        current_interval.mul_f64(retry_policy.backoff_coefficient),
170                        retry_policy.maximum_interval.unwrap_or(Duration::from_secs(300)),
171                    );
172                }
173            }
174        }
175    }
176
177    async fn execute_once(
178        &self,
179        activity: &dyn Activity,
180        inputs: HashMap<String, serde_json::Value>,
181        start_time: std::time::Instant,
182        attempt_count: u32,
183    ) -> std::result::Result<ActivityResult, ActivityError> {
184        // タイムアウト設定を考慮した実行
185        let result = if let Some(timeout_duration) = activity.timeout() {
186            match timeout(timeout_duration, activity.execute(inputs)).await {
187                Ok(result) => result,
188                Err(_) => return Err(ActivityError::Timeout),
189            }
190        } else {
191            activity.execute(inputs).await
192        };
193
194        let execution_time = start_time.elapsed();
195
196        match result {
197            Ok(outputs) => Ok(ActivityResult {
198                activity_name: activity.name().to_string(),
199                status: ActivityStatus::Completed,
200                outputs: Some(outputs),
201                error: None,
202                execution_time,
203                attempt_count,
204            }),
205            Err(e) => Ok(ActivityResult {
206                activity_name: activity.name().to_string(),
207                status: ActivityStatus::Failed,
208                outputs: None,
209                error: Some(e.to_string()),
210                execution_time,
211                attempt_count,
212            }),
213        }
214    }
215
216    /// 登録されているActivity一覧を取得
217    pub async fn list_activities(&self) -> Vec<String> {
218        let activities = self.activities.read().await;
219        activities.keys().cloned().collect()
220    }
221}
222
223// The WorkflowError enum has been moved to `kotoba-errors`.
224
225/// A context object for a single workflow execution.
226#[derive(Debug, Clone, Default)]
227pub struct ExecutionContext {
228    /// Holds the initial request data and results from each step.
229    pub data: HashMap<String, serde_json::Value>,
230}
231
232impl ExecutionContext {
233    /// Creates a new context, usually from an initial request object.
234    pub fn new(initial_data: serde_json::Value) -> Self {
235        let mut data = HashMap::new();
236        data.insert("request".to_string(), initial_data);
237        Self { data }
238    }
239
240    /// Resolves a variable path (e.g., "context.step1.result") from the context.
241    pub fn resolve(&self, path: &str) -> Option<&serde_json::Value> {
242        let mut parts = path.split('.');
243        if parts.next()? != "context" {
244            return None;
245        }
246        let step_id = parts.next()?;
247        let mut current = self.data.get(step_id)?;
248        for part in parts {
249            current = current.get(part)?;
250        }
251        Some(current)
252    }
253}
254
255
256/// WorkflowExecutor - Temporalベースワークフロー実行エンジン
257pub struct WorkflowExecutor {
258    activity_registry: Arc<ActivityRegistry>,
259    state_manager: Arc<WorkflowStateManager>,
260    // This will hold the db_handler
261    // db_handler: Arc<DbHandler>,
262}
263
264impl WorkflowExecutor {
265    pub fn new(
266        activity_registry: Arc<ActivityRegistry>,
267        state_manager: Arc<WorkflowStateManager>,
268        // db_handler: Arc<DbHandler>,
269    ) -> Self {
270        Self {
271            activity_registry,
272            state_manager,
273            // db_handler,
274        }
275    }
276
277    /// Executes a declarative workflow, like one from an HTTP route.
278    /*
279    pub async fn execute_declarative_workflow(
280        &self,
281        steps: &[WorkflowStep],
282        initial_context: ExecutionContext,
283    ) -> Result<serde_json::Value, WorkflowError> {
284        let mut context = initial_context;
285
286        for step in steps {
287            let result = self.execute_step(step, &context).await?;
288            context.data.insert(step.id.clone(), result);
289
290            // If the step was a 'return' step, terminate the workflow.
291            if step.step_type == WorkflowStepType::Return {
292                // The body of the return step is the final result.
293                return Ok(context.data.get(&step.id).cloned().unwrap_or_default());
294            }
295        }
296
297        Err(WorkflowError::InvalidDefinition("Workflow did not end with a 'return' step.".to_string()))
298    }
299    */
300
301    /// Executes a single step from a declarative workflow.
302    async fn execute_step(
303        &self,
304        step: &WorkflowStep,
305        context: &ExecutionContext,
306    ) -> Result<serde_json::Value, WorkflowError> {
307        match step.step_type {
308            WorkflowStepType::DbQuery => {
309                // Mock implementation
310                println!("Executing DB Query: {}", step.body);
311                Ok(serde_json::json!({ "result": "mock_db_query_result" }))
312                // Real implementation:
313                // let params = self.materialize_params(&step.body, context)?;
314                // let query = step.body.as_str().unwrap_or("");
315                // let result = self.db_handler.query(query, params).await?;
316                // Ok(serde_json::to_value(result)?)
317            }
318            WorkflowStepType::DbRewrite => {
319                // Mock implementation
320                println!("Executing DB Rewrite Rule: {}", step.body);
321                Ok(serde_json::json!({ "result": "mock_db_rewrite_result" }))
322                // Real implementation:
323                // let params = self.materialize_params(&step.body, context)?;
324                // let result = self.db_handler.rewrite(&step.rule, params).await?;
325                // Ok(serde_json::to_value(result)?)
326            }
327            WorkflowStepType::Return => {
328                // The body of the return step becomes its result.
329                let body = self.materialize_params(&step.body, context)?;
330                Ok(body)
331            }
332            // Other step types would be handled here...
333            _ => Err(WorkflowError::InvalidDefinition(format!("Invalid step type: {:?}", step.step_type))),
334        }
335    }
336
337    /// Resolves parameters that might be context references.
338    pub fn materialize_params(
339        &self,
340        params: &serde_json::Value,
341        context: &ExecutionContext,
342    ) -> Result<serde_json::Value, WorkflowError> {
343        match params {
344            serde_json::Value::String(s) if s.starts_with("context.") => {
345                context.resolve(s)
346                    .cloned()
347                    .ok_or_else(|| WorkflowError::ContextVariableNotFound(s.clone()))
348            }
349            serde_json::Value::Object(map) => {
350                let mut new_map = serde_json::Map::new();
351                for (k, v) in map {
352                    new_map.insert(k.clone(), self.materialize_params(v, context)?);
353                }
354                Ok(serde_json::Value::Object(new_map))
355            }
356            serde_json::Value::Array(arr) => {
357                let mut new_arr = Vec::new();
358                for v in arr {
359                    new_arr.push(self.materialize_params(v, context)?);
360                }
361                Ok(serde_json::Value::Array(new_arr))
362            }
363            // If it's not a context reference or a container, return as is.
364            _ => Ok(params.clone()),
365        }
366    }
367
368
369    /// ワークフロー実行開始
370    pub async fn start_workflow(
371        &self,
372        workflow_ir: &WorkflowIR,
373        inputs: HashMap<String, serde_json::Value>,
374    ) -> std::result::Result<WorkflowExecutionId, WorkflowError> {
375        // ワークフロー実行インスタンス作成
376        let execution_id = self.state_manager.create_execution(workflow_ir, inputs.clone()).await?;
377
378        // ワークフロー実行を開始(バックグラウンドで実行)
379        let executor = Arc::new(Self::new(
380            Arc::clone(&self.activity_registry),
381            Arc::clone(&self.state_manager),
382            // Arc::clone(&self.db_handler),
383        ));
384
385        let workflow_ir = workflow_ir.clone();
386        let execution_id_clone = execution_id.clone();
387
388        tokio::spawn(async move {
389            if let Err(e) = executor.execute_workflow(workflow_ir, execution_id_clone).await {
390                eprintln!("Workflow execution failed: {:?}", e);
391            }
392        });
393
394        Ok(execution_id)
395    }
396
397    /// ワークフロー実行メイン処理
398    async fn execute_workflow(
399        &self,
400        workflow_ir: WorkflowIR,
401        execution_id: WorkflowExecutionId,
402    ) -> std::result::Result<(), WorkflowError> {
403        // 初期グラフ状態を作成(TODO: 実際のグラフ作成ロジックを実装)
404        let initial_graph = GraphRef("initial".to_string());
405
406        // 戦略実行
407        let result = self.execute_strategy(workflow_ir.strategy, initial_graph, &execution_id).await;
408
409        // 実行結果に基づいて最終状態を更新(MVCC対応)
410        let mut execution = self.state_manager.get_execution(&execution_id).await
411            .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
412
413        match result {
414            Ok(final_graph) => {
415                execution.status = ExecutionStatus::Completed;
416                execution.end_time = Some(chrono::Utc::now());
417                execution.current_graph = final_graph;
418                // outputs は最終グラフから抽出(TODO)
419
420                // 完了イベントを追加
421                let event = ExecutionEvent {
422                    id: uuid::Uuid::new_v4().to_string(),
423                    timestamp: chrono::Utc::now(),
424                    event_type: ExecutionEventType::WorkflowCompleted,
425                    payload: HashMap::new(),
426                };
427                self.state_manager.add_execution_event(&execution_id, event).await?;
428            }
429            Err(e) => {
430                execution.status = ExecutionStatus::Failed;
431                execution.end_time = Some(chrono::Utc::now());
432
433                // 失敗イベントを追加
434                let event = ExecutionEvent {
435                    id: uuid::Uuid::new_v4().to_string(),
436                    timestamp: chrono::Utc::now(),
437                    event_type: ExecutionEventType::WorkflowFailed,
438                    payload: [("error".to_string(), serde_json::json!(e.to_string()))].into_iter().collect(),
439                };
440                self.state_manager.add_execution_event(&execution_id, event).await?;
441            }
442        }
443
444        // 最終状態を保存
445        self.state_manager.update_execution(execution).await?;
446        Ok(())
447    }
448
449    /// 戦略実行(再帰的)
450    fn execute_strategy<'a>(
451        &'a self,
452        strategy: WorkflowStrategyOp,
453        graph: GraphRef,
454        execution_id: &'a WorkflowExecutionId,
455    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<GraphRef, WorkflowError>> + Send + 'a>> {
456        Box::pin(async move {
457            match strategy {
458                WorkflowStrategyOp::Basic { strategy } => {
459                    self.execute_basic_strategy(strategy, graph, execution_id).await
460                }
461                WorkflowStrategyOp::Seq { strategies } => {
462                    self.execute_seq(strategies, graph, execution_id).await
463                }
464                WorkflowStrategyOp::Parallel { branches, completion_condition } => {
465                    self.execute_parallel(branches, completion_condition, graph, execution_id).await
466                }
467                WorkflowStrategyOp::Decision { conditions, default_branch } => {
468                    self.execute_decision(conditions, default_branch, graph, execution_id).await
469                }
470                WorkflowStrategyOp::Wait { condition, timeout } => {
471                    self.execute_wait(condition, timeout, graph, execution_id).await
472                }
473                WorkflowStrategyOp::Saga { main_flow, compensation } => {
474                    self.execute_saga(*main_flow, *compensation, graph, execution_id).await
475                }
476                WorkflowStrategyOp::Activity { activity_ref, input_mapping, retry_policy } => {
477                    self.execute_activity(activity_ref, input_mapping, retry_policy, graph, execution_id).await
478                }
479                WorkflowStrategyOp::SubWorkflow { workflow_ref, input_mapping } => {
480                    self.execute_subworkflow(workflow_ref, input_mapping, graph, execution_id).await
481                }
482            }
483        })
484    }
485
486    /// 基本戦略実行
487    fn execute_basic_strategy<'a>(
488        &'a self,
489        strategy: StrategyOp,
490        graph: GraphRef,
491        execution_id: &'a WorkflowExecutionId,
492    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<GraphRef, WorkflowError>> + Send + 'a>> {
493        Box::pin(async move {
494            match strategy {
495                StrategyOp::Seq { strategies } => {
496                    let mut current_graph = graph;
497                    for strategy in strategies {
498                        current_graph = self.execute_basic_strategy(*strategy, current_graph, execution_id).await?;
499                    }
500                    Ok(current_graph)
501                }
502                StrategyOp::Once { rule } => {
503                    // ルール適用(簡易実装)
504                    println!("Executing rule: {}", rule);
505                    Ok(graph) // TODO: 実際のルール適用を実装
506                }
507                StrategyOp::Exhaust { rule, order: _, measure: _ } => {
508                    // ルール適用(簡易実装)
509                    println!("Executing rule exhaustively: {}", rule);
510                    Ok(graph)
511                }
512                StrategyOp::While { rule, pred: _, order: _ } => {
513                    // 条件付きルール適用
514                    println!("Executing rule while predicate: {}", rule);
515                    Ok(graph)
516                }
517                StrategyOp::Choice { strategies } => {
518                    // 選択実行(最初の成功したものを返す)
519                    for strategy in strategies {
520                        match self.execute_basic_strategy(*strategy.clone(), graph.clone(), execution_id).await {
521                            Ok(result_graph) => return Ok(result_graph),
522                            Err(_) => continue,
523                        }
524                    }
525                    Err(WorkflowError::InvalidStrategy("All strategies in choice failed".to_string()))
526                }
527                StrategyOp::Priority { strategies } => {
528                    // 優先度付き実行
529                    println!("Executing with priority");
530                    // TODO: 優先度に基づいて実行順序を決定
531                    // 簡易実装として最初の戦略を実行
532                    if let Some(first_strategy) = strategies.first() {
533                        self.execute_basic_strategy((*first_strategy.strategy).clone(), graph, execution_id).await
534                    } else {
535                        Ok(graph)
536                    }
537                }
538            }
539        })
540    }
541
542    /// 順次実行
543    async fn execute_seq(
544        &self,
545        strategies: Vec<Box<WorkflowStrategyOp>>,
546        graph: GraphRef,
547        execution_id: &WorkflowExecutionId,
548    ) -> std::result::Result<GraphRef, WorkflowError> {
549        let mut current_graph = graph;
550        for strategy in strategies {
551            current_graph = self.execute_strategy(*strategy, current_graph, execution_id).await?;
552        }
553        Ok(current_graph)
554    }
555
556    /// 並列実行
557    async fn execute_parallel(
558        &self,
559        branches: Vec<Box<WorkflowStrategyOp>>,
560        completion_condition: CompletionCondition,
561        graph: GraphRef,
562        execution_id: &WorkflowExecutionId,
563    ) -> std::result::Result<GraphRef, WorkflowError> {
564        let mut handles = vec![];
565
566        for branch in branches {
567            let executor = Arc::new(Self::new(
568                Arc::clone(&self.activity_registry),
569                Arc::clone(&self.state_manager),
570                // Arc::clone(&self.db_handler),
571            ));
572            let graph_clone = graph.clone();
573            let execution_id_clone = execution_id.clone();
574
575            let handle = tokio::spawn(async move {
576                executor.execute_strategy(*branch, graph_clone, &execution_id_clone).await
577            });
578            handles.push(handle);
579        }
580
581        match completion_condition {
582            CompletionCondition::All => {
583                // 全てのブランチが完了するまで待つ
584                let mut results = vec![];
585                for handle in handles {
586                    results.push(handle.await.map_err(|_| WorkflowError::InvalidStrategy("Task panicked".to_string()))?);
587                }
588                // 最初の成功したグラフを返す
589                results.into_iter().next().unwrap_or(Ok(graph.clone()))
590            }
591            CompletionCondition::Any => {
592                // いずれかのブランチが完了したら進む
593                // TODO: select! マクロを使って実装
594                Err(WorkflowError::InvalidStrategy("Any completion not implemented".to_string()))
595            }
596            CompletionCondition::AtLeast(count) => {
597                // 指定数のブランチが完了したら進む
598                Err(WorkflowError::InvalidStrategy("AtLeast completion not implemented".to_string()))
599            }
600        }
601    }
602
603    /// 条件分岐実行
604    async fn execute_decision(
605        &self,
606        conditions: Vec<DecisionBranch>,
607        default_branch: Option<Box<WorkflowStrategyOp>>,
608        graph: GraphRef,
609        execution_id: &WorkflowExecutionId,
610    ) -> std::result::Result<GraphRef, WorkflowError> {
611        // 実行コンテキストを取得
612        let execution = self.state_manager.get_execution(execution_id).await
613            .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
614
615        let context = execution.inputs.clone();
616
617        // 条件を順番に評価
618        for branch in conditions {
619            if self.evaluate_condition(&branch.condition, &context) {
620                return self.execute_strategy(*branch.branch, graph, execution_id).await;
621            }
622        }
623
624        // デフォルトブランチを実行
625        if let Some(default_branch) = default_branch {
626            self.execute_strategy(*default_branch, graph, execution_id).await
627        } else {
628            Ok(graph) // 何も実行しない
629        }
630    }
631
632    /// 待機実行
633    async fn execute_wait(
634        &self,
635        condition: WaitCondition,
636        timeout: Option<Duration>,
637        graph: GraphRef,
638        _execution_id: &WorkflowExecutionId,
639    ) -> std::result::Result<GraphRef, WorkflowError> {
640        match condition {
641            WaitCondition::Timer { duration } => {
642                tokio::time::sleep(duration).await;
643                Ok(graph)
644            }
645            WaitCondition::Event { event_type, filter } => {
646                // イベント待機は別途実装が必要
647                println!("Waiting for event: {}", event_type);
648                if let Some(timeout) = timeout {
649                    tokio::time::sleep(timeout).await;
650                }
651                Ok(graph)
652            }
653            WaitCondition::Signal { signal_name } => {
654                // シグナル待機は別途実装が必要
655                println!("Waiting for signal: {}", signal_name);
656                if let Some(timeout) = timeout {
657                    tokio::time::sleep(timeout).await;
658                }
659                Ok(graph)
660            }
661        }
662    }
663
664    /// Sagaパターン実行
665    async fn execute_saga(
666        &self,
667        main_flow: WorkflowStrategyOp,
668        compensation: WorkflowStrategyOp,
669        graph: GraphRef,
670        execution_id: &WorkflowExecutionId,
671    ) -> std::result::Result<GraphRef, WorkflowError> {
672        // メイン処理を実行
673        match self.execute_strategy(main_flow, graph.clone(), execution_id).await {
674            Ok(result_graph) => Ok(result_graph),
675            Err(e) => {
676                // 失敗したら補償処理を実行
677                println!("Main flow failed, executing compensation");
678                match self.execute_strategy(compensation, graph, execution_id).await {
679                    Ok(_) => Err(WorkflowError::CompensationFailed("Main flow failed, compensation executed".to_string())),
680                    Err(compensation_error) => Err(WorkflowError::CompensationFailed(
681                        format!("Main flow failed and compensation also failed: {:?}", compensation_error)
682                    )),
683                }
684            }
685        }
686    }
687
688    /// Activity実行
689    async fn execute_activity(
690        &self,
691        activity_ref: String,
692        input_mapping: HashMap<String, String>,
693        retry_policy: Option<crate::ir::RetryPolicy>,
694        graph: GraphRef,
695        execution_id: &WorkflowExecutionId,
696    ) -> std::result::Result<GraphRef, WorkflowError> {
697        // 実行コンテキストから入力値をマッピング
698        let execution = self.state_manager.get_execution(execution_id).await
699            .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
700
701        let inputs = self.map_inputs(&input_mapping, &execution.inputs)?;
702
703        // Activity実行(リトライ対応)
704        let result = if let Some(retry_policy) = retry_policy {
705            self.execute_with_retry(&activity_ref, inputs, retry_policy).await
706        } else {
707            self.activity_registry.execute(&activity_ref, inputs).await
708                .map(|result| result.outputs.unwrap_or_default())
709        };
710
711        match result {
712            Ok(outputs) => {
713                // 実行結果をグラフに反映(TODO)
714                println!("Activity {} completed successfully", activity_ref);
715                Ok(graph)
716            }
717            Err(e) => {
718                println!("Activity {} failed: {:?}", activity_ref, e);
719                Err(WorkflowError::InvalidDefinition(format!("Activity failed: {:?}", e)))
720            }
721        }
722    }
723
724    /// 子ワークフロー実行
725    async fn execute_subworkflow(
726        &self,
727        workflow_ref: String,
728        input_mapping: HashMap<String, String>,
729        graph: GraphRef,
730        execution_id: &WorkflowExecutionId,
731    ) -> std::result::Result<GraphRef, WorkflowError> {
732        // 親ワークフローから入力値をマッピング
733        let execution = self.state_manager.get_execution(execution_id).await
734            .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
735
736        let inputs = self.map_inputs(&input_mapping, &execution.inputs)?;
737
738        // 子ワークフロー定義を取得(実際の実装ではレジストリから取得)
739        // TODO: 子ワークフロー定義の取得を実装
740        println!("Subworkflow {} execution not yet implemented", workflow_ref);
741        Ok(graph)
742    }
743
744    /// リトライ付きActivity実行
745    async fn execute_with_retry(
746        &self,
747        activity_ref: &str,
748        inputs: HashMap<String, serde_json::Value>,
749        retry_policy: crate::ir::RetryPolicy,
750    ) -> std::result::Result<HashMap<String, serde_json::Value>, ActivityError> {
751        let mut attempts = 0;
752        let mut current_interval = retry_policy.initial_interval;
753
754        loop {
755            attempts += 1;
756
757            match self.activity_registry.execute(activity_ref, inputs.clone()).await {
758                Ok(result) => return Ok(result.outputs.unwrap_or_default()),
759                Err(e) => {
760                    // リトライ不可エラーのチェック
761                    if retry_policy.non_retryable_errors.iter().any(|err| e.to_string().contains(err)) {
762                        return Err(e);
763                    }
764
765                    // 最大試行回数チェック
766                    if attempts >= retry_policy.maximum_attempts {
767                        return Err(e);
768                    }
769
770                    // リトライ待機
771                    tokio::time::sleep(current_interval).await;
772
773                    // インターバル更新
774                    current_interval = std::cmp::min(
775                        current_interval.mul_f64(retry_policy.backoff_coefficient),
776                        retry_policy.maximum_interval.unwrap_or(Duration::from_secs(300)),
777                    );
778                }
779            }
780        }
781    }
782
783    /// 入力値マッピング
784    fn map_inputs(
785        &self,
786        mapping: &HashMap<String, String>,
787        context: &HashMap<String, serde_json::Value>,
788    ) -> std::result::Result<HashMap<String, serde_json::Value>, WorkflowError> {
789        let mut inputs = HashMap::new();
790
791        for (key, expr) in mapping {
792            // 簡易的な式評価(実際の実装ではもっと複雑)
793            if expr.starts_with("$.inputs.") {
794                let field = &expr[9..]; // "$.inputs." を除去
795                if let Some(value) = context.get(field) {
796                    inputs.insert(key.clone(), value.clone());
797                }
798            }
799        }
800
801        Ok(inputs)
802    }
803
804    /// 条件式評価
805    fn evaluate_condition(
806        &self,
807        condition: &str,
808        context: &HashMap<String, serde_json::Value>,
809    ) -> bool {
810        // 簡易的な条件評価(実際の実装では式パーサーを使用)
811        // TODO: より複雑な条件式の評価を実装
812        if condition.contains("==") {
813            // 例: "$.inputs.status == 'active'"
814            // 簡易実装なので常にtrueを返す
815            true
816        } else {
817            false
818        }
819    }
820}
821
822/// ワークフロー状態マネージャー - MVCCベースの実装
823pub struct WorkflowStateManager {
824    /// 実行状態の管理(TxIdベースのバージョン管理)
825    executions: RwLock<HashMap<String, Vec<(TxId, WorkflowExecution)>>>,
826    /// 現在のTxIdカウンター
827    current_tx_id: std::sync::atomic::AtomicU64,
828}
829
830impl WorkflowStateManager {
831    pub fn new() -> Self {
832        Self {
833            executions: RwLock::new(HashMap::new()),
834            current_tx_id: std::sync::atomic::AtomicU64::new(1),
835        }
836    }
837
838    /// 新しいTxIdを生成
839    fn next_tx_id(&self) -> TxId {
840        let tx_id = self.current_tx_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
841        TxId(tx_id.to_string())
842    }
843
844    /// MVCCベースのワークフロー実行作成
845    pub async fn create_execution(
846        &self,
847        workflow_ir: &WorkflowIR,
848        inputs: HashMap<String, serde_json::Value>,
849    ) -> std::result::Result<WorkflowExecutionId, WorkflowError> {
850        let tx_id = self.next_tx_id();
851        let execution_id = WorkflowExecutionId(uuid::Uuid::new_v4().to_string());
852
853        let execution = WorkflowExecution {
854            id: execution_id.clone(),
855            workflow_id: workflow_ir.id.clone(),
856            status: ExecutionStatus::Running,
857            start_time: chrono::Utc::now(),
858            end_time: None,
859            inputs,
860            outputs: None,
861            current_graph: GraphRef("initial".to_string()),
862            execution_history: vec![ExecutionEvent {
863                id: uuid::Uuid::new_v4().to_string(),
864                timestamp: chrono::Utc::now(),
865                event_type: ExecutionEventType::Started,
866                payload: HashMap::new(),
867            }],
868            retry_count: 0,
869            timeout_at: workflow_ir.timeout.map(|t| chrono::Utc::now() + chrono::Duration::from_std(t).unwrap()),
870        };
871
872        let mut executions = self.executions.write().await;
873        let versions = executions.entry(execution_id.0.clone()).or_insert_with(Vec::new);
874        versions.push((tx_id, execution));
875
876        Ok(execution_id)
877    }
878
879    /// 指定されたTxId時点での実行状態を取得(MVCC対応)
880    pub async fn get_execution_at(&self, id: &WorkflowExecutionId, tx_id: Option<TxId>) -> Option<WorkflowExecution> {
881        let executions = self.executions.read().await;
882        let versions = executions.get(&id.0)?;
883
884        match tx_id {
885            Some(tx_id) => {
886                // 指定TxId以前の最新バージョンを取得
887                versions.iter()
888                    .filter(|(v_tx_id, _)| v_tx_id.0 <= tx_id.0)
889                    .max_by_key(|(v_tx_id, _)| &v_tx_id.0)
890                    .map(|(_, execution)| execution.clone())
891            }
892            None => {
893                // 最新バージョンを取得
894                versions.last().map(|(_, execution)| execution.clone())
895            }
896        }
897    }
898
899    /// 最新バージョンの実行状態を取得
900    pub async fn get_execution(&self, id: &WorkflowExecutionId) -> Option<WorkflowExecution> {
901        self.get_execution_at(id, None).await
902    }
903
904    /// MVCCベースの実行状態更新
905    pub async fn update_execution(&self, execution: WorkflowExecution) -> std::result::Result<TxId, WorkflowError> {
906        let tx_id = self.next_tx_id();
907
908        let mut executions = self.executions.write().await;
909        let versions = executions.entry(execution.id.0.clone()).or_insert_with(Vec::new);
910        versions.push((tx_id.clone(), execution));
911
912        Ok(tx_id)
913    }
914
915    /// 実行のバージョン履歴を取得
916    pub async fn get_execution_history(&self, id: &WorkflowExecutionId) -> Vec<(TxId, WorkflowExecution)> {
917        let executions = self.executions.read().await;
918        executions.get(&id.0).cloned().unwrap_or_default()
919    }
920
921    /// スナップショット作成(古いバージョンのクリーンアップ)
922    pub async fn create_snapshot(&self, execution_id: &WorkflowExecutionId, max_versions: usize) -> std::result::Result<(), WorkflowError> {
923        let mut executions = self.executions.write().await;
924        if let Some(versions) = executions.get_mut(&execution_id.0) {
925            if versions.len() > max_versions {
926                // 最新のmax_versions個を保持
927                let keep_count = versions.len().saturating_sub(max_versions);
928                versions.drain(0..keep_count);
929            }
930        }
931        Ok(())
932    }
933
934    /// 実行中のワークフロー一覧を取得
935    pub async fn get_running_executions(&self) -> Vec<WorkflowExecution> {
936        let executions = self.executions.read().await;
937        executions.values()
938            .filter_map(|versions| {
939                versions.last().map(|(_, execution)| execution.clone())
940                    .filter(|execution| matches!(execution.status, ExecutionStatus::Running))
941            })
942            .collect()
943    }
944
945    /// 実行イベントを追加
946    pub async fn add_execution_event(&self, execution_id: &WorkflowExecutionId, event: ExecutionEvent) -> std::result::Result<TxId, WorkflowError> {
947        let mut execution = self.get_execution(execution_id).await
948            .ok_or(WorkflowError::WorkflowNotFound(execution_id.0.clone()))?;
949
950        execution.execution_history.push(event);
951        self.update_execution(execution).await
952    }
953}
954
955// TODO: Implement workflow execution engine
956// For now, this module provides basic activity execution framework
957