Skip to main content

matrixcode_core/workflow/executors/
composite.rs

1//! Composite Executor
2//!
3//! 组合执行器,可以组合多个执行器按顺序或条件执行。
4
5use anyhow::Result;
6use async_trait::async_trait;
7use std::sync::Arc;
8
9use crate::workflow::context::WorkflowContext;
10use crate::workflow::def::NodeDef;
11use super::node_executor::NodeExecutor;
12
13/// 组合执行模式
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum CompositeMode {
16    /// 按顺序执行所有
17    Sequential,
18    /// 执行第一个成功的
19    FirstSuccess,
20    /// 并行执行(需要 tokio)
21    Parallel,
22}
23
24/// 组合执行器
25///
26/// 可以组合多个执行器,按顺序或条件执行。
27pub struct CompositeExecutor {
28    /// 子执行器列表
29    pub executors: Vec<Arc<dyn NodeExecutor>>,
30    /// 执行模式
31    mode: CompositeMode,
32}
33
34impl CompositeExecutor {
35    /// 创建新的组合执行器
36    pub fn new(executors: Vec<Arc<dyn NodeExecutor>>, mode: CompositeMode) -> Self {
37        Self { executors, mode }
38    }
39
40    /// 添加执行器
41    pub fn add_executor(&mut self, executor: Arc<dyn NodeExecutor>) {
42        self.executors.push(executor);
43    }
44}
45
46#[async_trait]
47impl NodeExecutor for CompositeExecutor {
48    async fn execute(
49        &self,
50        node: &NodeDef,
51        context: &mut WorkflowContext,
52    ) -> Result<serde_json::Value> {
53        match self.mode {
54            CompositeMode::Sequential => {
55                let mut outputs = Vec::new();
56                for executor in &self.executors {
57                    let output = executor.execute(node, context).await?;
58                    outputs.push(output);
59                }
60                Ok(serde_json::Value::Array(outputs))
61            }
62            CompositeMode::FirstSuccess => {
63                for executor in &self.executors {
64                    if let Ok(output) = executor.execute(node, context).await {
65                        return Ok(output);
66                    }
67                }
68                Err(anyhow::anyhow!("All executors failed"))
69            }
70            CompositeMode::Parallel => {
71                // 并行执行需要更复杂的实现
72                // 这里简化为顺序执行
73                let mut outputs = Vec::new();
74                for executor in &self.executors {
75                    let output = executor.execute(node, context).await?;
76                    outputs.push(output);
77                }
78                Ok(serde_json::Value::Array(outputs))
79            }
80        }
81    }
82
83    fn name(&self) -> &str {
84        "composite_executor"
85    }
86}