pipe_it/
cocurrency.rs

1use crate::{Context, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4use futures::future::join_all;
5
6/// A pipeline that maps an input collection to an output collection by applying
7/// an inner pipeline to each element concurrently.
8pub struct Map<P, I, O> {
9    pub(crate) p: P,
10    pub(crate) _marker: PhantomData<fn(I) -> O>,
11}
12
13impl<P, I, O> Pipeline<Vec<I>, Vec<O>> for Map<P, I, O>
14where
15    P: Pipeline<I, O> + Send + Sync + 'static,
16    I: Clone + Send + Sync + 'static,
17    O: Send + 'static,
18{
19    fn apply(&self, ctx: Context<Vec<I>>) -> impl Future<Output = Vec<O>> + Send {
20        async move {
21            let input = ctx.input(); // Arc<Vec<I>>
22            let mut futures = Vec::with_capacity(input.len());
23
24            for item in input.iter() {
25                // Create a new context for each item with shared dependencies preserved
26                let sub_ctx = ctx.clone().replace(item.clone());
27                futures.push(self.p.apply(sub_ctx));
28            }
29
30            join_all(futures).await
31        }
32    }
33}
34
35// Support for pipe wrapper specifically if needed, but the generic impl above covers it 
36// if Pipe implements Pipeline (which it does).