Skip to main content

pipe_it/
concurrency.rs

1use crate::{Context, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4use futures::future::join_all;
5
6/// A pipeline that maps an input collection to an output collection by applying
7/// an inner pipeline to each element concurrently.
8pub struct Map<P, I, O> {
9    pub(crate) p: P,
10    pub(crate) _marker: PhantomData<fn(I) -> O>,
11}
12
13
14impl<P, I, O> Pipeline<Vec<I>, Vec<O>> for Map<P, I, O>
15where
16    P: Pipeline<I, O> + Send + Sync + 'static,
17    I: Clone + Send + Sync + 'static,
18    O: Send + 'static,
19{
20    fn apply(&self, ctx: Context<Vec<I>>) -> impl Future<Output = Vec<O>> + Send {
21        async move {
22            let input = ctx.input(); // Arc<Vec<I>>
23            let mut futures = Vec::with_capacity(input.len());
24
25            for item in input.iter() {
26                // Create a new context for each item with shared dependencies preserved
27                let sub_ctx = ctx.clone().replace(item.clone());
28                futures.push(self.p.apply(sub_ctx));
29            }
30
31            join_all(futures).await
32        }
33    }
34}
35