Skip to main content

synaptic_runnables/
assign.rs

1use async_trait::async_trait;
2use serde_json::Value;
3use synaptic_core::{RunnableConfig, SynapticError};
4
5use crate::runnable::{BoxRunnable, Runnable};
6
7/// Runs named branches in parallel on the input, then merges results into the input object.
8/// Input must be a JSON object. Each branch receives a clone of the full input.
9pub struct RunnableAssign {
10    branches: Vec<(String, BoxRunnable<Value, Value>)>,
11}
12
13impl RunnableAssign {
14    pub fn new(branches: Vec<(String, BoxRunnable<Value, Value>)>) -> Self {
15        Self { branches }
16    }
17}
18
19#[async_trait]
20impl Runnable<Value, Value> for RunnableAssign {
21    async fn invoke(&self, input: Value, config: &RunnableConfig) -> Result<Value, SynapticError> {
22        let mut base = match input {
23            Value::Object(map) => map,
24            other => {
25                return Err(SynapticError::Validation(format!(
26                    "RunnableAssign expects a JSON object, got {}",
27                    other
28                )))
29            }
30        };
31
32        let futures: Vec<_> = self
33            .branches
34            .iter()
35            .map(|(key, runnable)| {
36                let input_clone = Value::Object(base.clone());
37                let key = key.clone();
38                async move {
39                    let result = runnable.invoke(input_clone, config).await?;
40                    Ok::<_, SynapticError>((key, result))
41                }
42            })
43            .collect();
44
45        let results = futures::future::join_all(futures).await;
46        for result in results {
47            let (key, value) = result?;
48            base.insert(key, value);
49        }
50
51        Ok(Value::Object(base))
52    }
53}