1use crate::{Context, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4use futures::future::join_all;
5
6pub fn map<P, I, O>(p: P) -> Map<P, I, O>
8where
9 P: Pipeline<I, O> + Send + Sync + 'static,
10 I: Clone + Send + Sync + 'static,
11 O: Send + 'static,
12{
13 Map {
14 p,
15 concurrent: true,
16 _marker: PhantomData,
17 }
18}
19
20pub struct Map<P, I, O> {
23 pub(crate) p: P,
24 pub(crate) concurrent: bool,
25 pub(crate) _marker: PhantomData<fn(I) -> O>,
26}
27
28impl<P, I, O> Map<P, I, O> {
29 pub fn concurrent(mut self) -> Self {
31 self.concurrent = true;
32 self
33 }
34
35 pub fn sequential(mut self) -> Self {
37 self.concurrent = false;
38 self
39 }
40}
41
42impl<P, I, O> Pipeline<Vec<I>, Vec<O>> for Map<P, I, O>
43where
44 P: Pipeline<I, O> + Send + Sync + 'static,
45 I: Clone + Send + Sync + 'static,
46 O: Send + 'static,
47{
48 fn apply(&self, ctx: Context<Vec<I>>) -> impl Future<Output = Vec<O>> + Send {
49 async move {
50 let input = ctx.input(); if self.concurrent {
53 let mut futures = Vec::with_capacity(input.len());
54 for item in input.iter() {
55 let sub_ctx = ctx.clone().replace(item.clone());
57 futures.push(self.p.apply(sub_ctx));
58 }
59 join_all(futures).await
60 } else {
61 let mut results = Vec::with_capacity(input.len());
62 for item in input.iter() {
63 let sub_ctx = ctx.clone().replace(item.clone());
64 results.push(self.p.apply(sub_ctx).await);
65 }
66 results
67 }
68 }
69 }
70}
71