Skip to main content

entelix_runnable/
parallel.rs

1//! `RunnableParallel` — fan-out one input to many runnables, collect outputs
2//! into a `HashMap<String, O>`.
3//!
4//! Branches run concurrently via `futures::future::try_join_all` — first
5//! failure aborts the others. The input must be `Clone` because each branch
6//! receives its own copy.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use entelix_core::{ExecutionContext, Result};
12use futures::future::try_join_all;
13
14use crate::runnable::Runnable;
15
16type Branch<I, O> = (String, Arc<dyn Runnable<I, O>>);
17
18/// `Runnable<I, HashMap<String, O>>` that runs every registered branch in
19/// parallel against the same input.
20///
21/// Construct with [`RunnableParallel::new`] and add branches via
22/// [`RunnableParallel::branch`]:
23///
24/// ```ignore
25/// let parallel = RunnableParallel::new()
26///     .branch("joke", joke_chain)
27///     .branch("poem", poem_chain);
28/// let outputs = parallel.invoke(topic, &ctx).await?;
29/// // outputs: HashMap { "joke" => Message, "poem" => Message }
30/// ```
31pub struct RunnableParallel<I, O>
32where
33    I: Clone + Send + 'static,
34    O: Send + 'static,
35{
36    branches: Vec<Branch<I, O>>,
37}
38
39impl<I, O> RunnableParallel<I, O>
40where
41    I: Clone + Send + 'static,
42    O: Send + 'static,
43{
44    /// Empty parallel runner.
45    pub fn new() -> Self {
46        Self {
47            branches: Vec::new(),
48        }
49    }
50
51    /// Append a named branch. Insertion order is preserved in the output
52    /// `HashMap` only by virtue of `HashMap`'s eventual lookup; consumers
53    /// that need ordered results should iterate by key list.
54    #[must_use]
55    pub fn branch<R>(mut self, name: impl Into<String>, runnable: R) -> Self
56    where
57        R: Runnable<I, O> + 'static,
58    {
59        self.branches.push((name.into(), Arc::new(runnable)));
60        self
61    }
62
63    /// Number of branches registered.
64    pub fn len(&self) -> usize {
65        self.branches.len()
66    }
67
68    /// True when no branches are registered.
69    pub fn is_empty(&self) -> bool {
70        self.branches.is_empty()
71    }
72}
73
74impl<I, O> Default for RunnableParallel<I, O>
75where
76    I: Clone + Send + 'static,
77    O: Send + 'static,
78{
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84#[async_trait::async_trait]
85impl<I, O> Runnable<I, HashMap<String, O>> for RunnableParallel<I, O>
86where
87    I: Clone + Send + Sync + 'static,
88    O: Send + 'static,
89{
90    async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<HashMap<String, O>> {
91        let futures = self.branches.iter().map(|(name, runnable)| {
92            let input = input.clone();
93            let runnable = Arc::clone(runnable);
94            let name = name.clone();
95            let ctx = ctx.clone();
96            async move {
97                let out = runnable.invoke(input, &ctx).await?;
98                Ok::<_, entelix_core::Error>((name, out))
99            }
100        });
101        let pairs = try_join_all(futures).await?;
102        Ok(pairs.into_iter().collect())
103    }
104}