Skip to main content

cognis_core/compose/
each.rs

1//! Apply a runnable to every element of a `Vec<I>`, preserving order.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::stream::{self, StreamExt};
8
9use crate::runnable::{Runnable, RunnableConfig};
10use crate::Result;
11
12/// Wraps a `Runnable<I, O>` to act on a `Vec<I>` element-wise.
13///
14/// Concurrency is bounded by `RunnableConfig::max_concurrency`. Output
15/// order matches input order.
16pub struct Each<R, I, O> {
17    inner: R,
18    _phantom: PhantomData<fn(I) -> O>,
19}
20
21impl<R, I, O> Each<R, I, O>
22where
23    R: Runnable<I, O>,
24    I: Send + 'static,
25    O: Send + 'static,
26{
27    /// Wrap a runnable so it operates element-wise on a `Vec<I>`.
28    pub fn new(inner: R) -> Self {
29        Self {
30            inner,
31            _phantom: PhantomData,
32        }
33    }
34}
35
36#[async_trait]
37impl<R, I, O> Runnable<Vec<I>, Vec<O>> for Each<R, I, O>
38where
39    R: Runnable<I, O>,
40    I: Send + 'static,
41    O: Send + 'static,
42{
43    async fn invoke(&self, inputs: Vec<I>, config: RunnableConfig) -> Result<Vec<O>> {
44        let concurrency = config.max_concurrency.max(1);
45        let cfg = Arc::new(config);
46        stream::iter(inputs)
47            .map(|i| {
48                let cfg = cfg.clone();
49                async move {
50                    self.inner
51                        .invoke(i, RunnableConfig::clone_for_subcall(&cfg))
52                        .await
53                }
54            })
55            .buffered(concurrency)
56            .collect::<Vec<_>>()
57            .await
58            .into_iter()
59            .collect()
60    }
61    fn name(&self) -> &str {
62        "Each"
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    struct Inc;
71
72    #[async_trait]
73    impl Runnable<u32, u32> for Inc {
74        async fn invoke(&self, input: u32, _: RunnableConfig) -> Result<u32> {
75            Ok(input + 1)
76        }
77    }
78
79    #[tokio::test]
80    async fn maps_each_element() {
81        let e = Each::new(Inc);
82        let out = e
83            .invoke(vec![1, 2, 3, 4], RunnableConfig::default())
84            .await
85            .unwrap();
86        assert_eq!(out, vec![2, 3, 4, 5]);
87    }
88}