matrixcode_core/workflow/executors/
composite.rs1use anyhow::Result;
6use async_trait::async_trait;
7use std::sync::Arc;
8
9use crate::workflow::context::WorkflowContext;
10use crate::workflow::def::NodeDef;
11use super::node_executor::NodeExecutor;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum CompositeMode {
16 Sequential,
18 FirstSuccess,
20 Parallel,
22}
23
24pub struct CompositeExecutor {
28 pub executors: Vec<Arc<dyn NodeExecutor>>,
30 mode: CompositeMode,
32}
33
34impl CompositeExecutor {
35 pub fn new(executors: Vec<Arc<dyn NodeExecutor>>, mode: CompositeMode) -> Self {
37 Self { executors, mode }
38 }
39
40 pub fn add_executor(&mut self, executor: Arc<dyn NodeExecutor>) {
42 self.executors.push(executor);
43 }
44}
45
46#[async_trait]
47impl NodeExecutor for CompositeExecutor {
48 async fn execute(
49 &self,
50 node: &NodeDef,
51 context: &mut WorkflowContext,
52 ) -> Result<serde_json::Value> {
53 match self.mode {
54 CompositeMode::Sequential => {
55 let mut outputs = Vec::new();
56 for executor in &self.executors {
57 let output = executor.execute(node, context).await?;
58 outputs.push(output);
59 }
60 Ok(serde_json::Value::Array(outputs))
61 }
62 CompositeMode::FirstSuccess => {
63 for executor in &self.executors {
64 if let Ok(output) = executor.execute(node, context).await {
65 return Ok(output);
66 }
67 }
68 Err(anyhow::anyhow!("All executors failed"))
69 }
70 CompositeMode::Parallel => {
71 let mut outputs = Vec::new();
74 for executor in &self.executors {
75 let output = executor.execute(node, context).await?;
76 outputs.push(output);
77 }
78 Ok(serde_json::Value::Array(outputs))
79 }
80 }
81 }
82
83 fn name(&self) -> &str {
84 "composite_executor"
85 }
86}