use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
use crate::workflow::context::WorkflowContext;
use crate::workflow::def::NodeDef;
use super::node_executor::NodeExecutor;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompositeMode {
Sequential,
FirstSuccess,
Parallel,
}
pub struct CompositeExecutor {
pub executors: Vec<Arc<dyn NodeExecutor>>,
mode: CompositeMode,
}
impl CompositeExecutor {
pub fn new(executors: Vec<Arc<dyn NodeExecutor>>, mode: CompositeMode) -> Self {
Self { executors, mode }
}
pub fn add_executor(&mut self, executor: Arc<dyn NodeExecutor>) {
self.executors.push(executor);
}
}
#[async_trait]
impl NodeExecutor for CompositeExecutor {
async fn execute(
&self,
node: &NodeDef,
context: &mut WorkflowContext,
) -> Result<serde_json::Value> {
match self.mode {
CompositeMode::Sequential => {
let mut outputs = Vec::new();
for executor in &self.executors {
let output = executor.execute(node, context).await?;
outputs.push(output);
}
Ok(serde_json::Value::Array(outputs))
}
CompositeMode::FirstSuccess => {
for executor in &self.executors {
if let Ok(output) = executor.execute(node, context).await {
return Ok(output);
}
}
Err(anyhow::anyhow!("All executors failed"))
}
CompositeMode::Parallel => {
let mut outputs = Vec::new();
for executor in &self.executors {
let output = executor.execute(node, context).await?;
outputs.push(output);
}
Ok(serde_json::Value::Array(outputs))
}
}
}
fn name(&self) -> &str {
"composite_executor"
}
}