Skip to main content

synaptic_runnables/
parallel.rs

1use async_trait::async_trait;
2use serde_json::Value;
3use synaptic_core::{RunnableConfig, SynapticError};
4
5use crate::runnable::{BoxRunnable, Runnable};
6
7/// Runs multiple named runnables concurrently on the same (cloned) input,
8/// merging outputs into a JSON object keyed by branch name.
9pub struct RunnableParallel<I: Send + Clone + 'static> {
10    branches: Vec<(String, BoxRunnable<I, Value>)>,
11}
12
13impl<I: Send + Clone + 'static> RunnableParallel<I> {
14    pub fn new(branches: Vec<(String, BoxRunnable<I, Value>)>) -> Self {
15        Self { branches }
16    }
17}
18
19#[async_trait]
20impl<I: Send + Clone + 'static> Runnable<I, Value> for RunnableParallel<I> {
21    async fn invoke(&self, input: I, config: &RunnableConfig) -> Result<Value, SynapticError> {
22        let futures: Vec<_> = self
23            .branches
24            .iter()
25            .map(|(key, runnable)| {
26                let input_clone = input.clone();
27                let key = key.clone();
28                async move {
29                    let result = runnable.invoke(input_clone, config).await?;
30                    Ok::<_, SynapticError>((key, result))
31                }
32            })
33            .collect();
34
35        let results = futures::future::join_all(futures).await;
36        let mut map = serde_json::Map::new();
37        for result in results {
38            let (key, value) = result?;
39            map.insert(key, value);
40        }
41        Ok(Value::Object(map))
42    }
43}