1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
use derive_new::new; use educe::Educe; use futures::{ready, Stream}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use serde_closure::traits::FnMut; use std::{ marker::PhantomData, pin::Pin, task::{Context, Poll} }; use super::{ DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PushReducer, Reducer, ReducerProcessSend, ReducerSend }; use crate::{pipe::Sink, pool::ProcessSend}; #[derive(new)] #[must_use] pub struct ForEach<P, F> { pipe: P, f: F, } impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for ForEach<P, F> where F: FnMut<(P::Output,), Output = ()> + Clone + Send + 'static, { type Done = (); type Pipe = P; type ReduceA = ForEachReducer<P::Output, F>; type ReduceC = PushReducer<()>; fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) { ( self.pipe, ForEachReducer(self.f, PhantomData), PushReducer::new(), ) } } impl<P: DistributedPipe<Item>, Item, F> DistributedSink<Item> for ForEach<P, F> where F: FnMut<(P::Output,), Output = ()> + Clone + ProcessSend + 'static, { type Done = (); type Pipe = P; type ReduceA = ForEachReducer<P::Output, F>; type ReduceB = PushReducer<()>; type ReduceC = PushReducer<()>; fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) { ( self.pipe, ForEachReducer(self.f, PhantomData), PushReducer::new(), PushReducer::new(), ) } } #[pin_project] #[derive(Educe, Serialize, Deserialize)] #[educe(Clone(bound = "F: Clone"))] #[serde( bound(serialize = "F: Serialize"), bound(deserialize = "F: Deserialize<'de>") )] pub struct ForEachReducer<Item, F>(F, PhantomData<fn() -> Item>); impl<Item, F> Reducer<Item> for ForEachReducer<Item, F> where F: FnMut<(Item,), Output = ()>, { type Done = (); type Async = Self; fn into_async(self) -> Self::Async { self } } impl<Item, F> ReducerProcessSend<Item> for ForEachReducer<Item, F> where F: FnMut<(Item,), Output = ()>, { type Done = (); } impl<Item, F> ReducerSend<Item> for ForEachReducer<Item, F> where F: FnMut<(Item,), Output = ()>, { type Done = (); } impl<Item, F> Sink<Item> for ForEachReducer<Item, F> where F: FnMut<(Item,), Output = ()>, { type Done = (); #[inline(always)] fn poll_forward( self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>, ) -> Poll<Self::Done> { let self_ = self.project(); while let Some(item) = ready!(stream.as_mut().poll_next(cx)) { self_.0.call_mut((item,)); } Poll::Ready(()) } }