Skip to main content

pipe_it/
concurrency.rs

1use crate::{Context, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4use futures::future::join_all;
5
6/// creates a new map pipeline
7pub fn map<P, I, O>(p: P) -> Map<P, I, O>
8where
9    P: Pipeline<I, O> + Send + Sync + 'static,
10    I: Clone + Send + Sync + 'static,
11    O: Send + 'static,
12{
13    Map {
14        p,
15        concurrent: true,
16        _marker: PhantomData,
17    }
18}
19
20/// A pipeline that maps an input collection to an output collection by applying
21/// an inner pipeline to each element.
22pub struct Map<P, I, O> {
23    pub(crate) p: P,
24    pub(crate) concurrent: bool,
25    pub(crate) _marker: PhantomData<fn(I) -> O>,
26}
27
28impl<P, I, O> Map<P, I, O> {
29    /// Forces the map to run concurrently.
30    pub fn concurrent(mut self) -> Self {
31        self.concurrent = true;
32        self
33    }
34    
35    /// Forces the map to run sequentially.
36    pub fn sequential(mut self) -> Self {
37        self.concurrent = false;
38        self
39    }
40}
41
42impl<P, I, O> Pipeline<Vec<I>, Vec<O>> for Map<P, I, O>
43where
44    P: Pipeline<I, O> + Send + Sync + 'static,
45    I: Clone + Send + Sync + 'static,
46    O: Send + 'static,
47{
48    fn apply(&self, ctx: Context<Vec<I>>) -> impl Future<Output = Vec<O>> + Send {
49        async move {
50            let input = ctx.input(); // Arc<Vec<I>>
51            
52            if self.concurrent {
53                let mut futures = Vec::with_capacity(input.len());
54                for item in input.iter() {
55                    // Create a new context for each item with shared dependencies preserved
56                    let sub_ctx = ctx.clone().replace(item.clone());
57                    futures.push(self.p.apply(sub_ctx));
58                }
59                join_all(futures).await
60            } else {
61                let mut results = Vec::with_capacity(input.len());
62                for item in input.iter() {
63                    let sub_ctx = ctx.clone().replace(item.clone());
64                    results.push(self.p.apply(sub_ctx).await);
65                }
66                results
67            }
68        }
69    }
70}
71