entelix_runnable/
parallel.rs1use std::collections::HashMap;
9use std::sync::Arc;
10
11use entelix_core::{ExecutionContext, Result};
12use futures::future::try_join_all;
13
14use crate::runnable::Runnable;
15
16type Branch<I, O> = (String, Arc<dyn Runnable<I, O>>);
17
18pub struct RunnableParallel<I, O>
32where
33 I: Clone + Send + 'static,
34 O: Send + 'static,
35{
36 branches: Vec<Branch<I, O>>,
37}
38
39impl<I, O> RunnableParallel<I, O>
40where
41 I: Clone + Send + 'static,
42 O: Send + 'static,
43{
44 pub fn new() -> Self {
46 Self {
47 branches: Vec::new(),
48 }
49 }
50
51 #[must_use]
55 pub fn branch<R>(mut self, name: impl Into<String>, runnable: R) -> Self
56 where
57 R: Runnable<I, O> + 'static,
58 {
59 self.branches.push((name.into(), Arc::new(runnable)));
60 self
61 }
62
63 pub fn len(&self) -> usize {
65 self.branches.len()
66 }
67
68 pub fn is_empty(&self) -> bool {
70 self.branches.is_empty()
71 }
72}
73
74impl<I, O> Default for RunnableParallel<I, O>
75where
76 I: Clone + Send + 'static,
77 O: Send + 'static,
78{
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait::async_trait]
85impl<I, O> Runnable<I, HashMap<String, O>> for RunnableParallel<I, O>
86where
87 I: Clone + Send + Sync + 'static,
88 O: Send + 'static,
89{
90 async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<HashMap<String, O>> {
91 let futures = self.branches.iter().map(|(name, runnable)| {
92 let input = input.clone();
93 let runnable = Arc::clone(runnable);
94 let name = name.clone();
95 let ctx = ctx.clone();
96 async move {
97 let out = runnable.invoke(input, &ctx).await?;
98 Ok::<_, entelix_core::Error>((name, out))
99 }
100 });
101 let pairs = try_join_all(futures).await?;
102 Ok(pairs.into_iter().collect())
103 }
104}