1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
use derive_new::new; use educe::Educe; use futures::{ready, Stream}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll} }; use super::{ DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PushReducer, Reducer, ReducerAsync, ReducerProcessSend, ReducerSend }; use crate::{pipe::Sink, pool::ProcessSend}; #[derive(new)] #[must_use] pub struct ForEach<I, F> { i: I, f: F, } impl<I: ParallelPipe<Source>, Source, F> ParallelSink<Source> for ForEach<I, F> where F: FnMut(I::Item) + Clone + Send + 'static, { type Output = (); type Pipe = I; type ReduceA = ForEachReducer<I::Item, F>; type ReduceC = PushReducer<()>; fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) { ( self.i, ForEachReducer(self.f, PhantomData), PushReducer::new(), ) } } impl<I: DistributedPipe<Source>, Source, F> DistributedSink<Source> for ForEach<I, F> where F: FnMut(I::Item) + Clone + ProcessSend + 'static, { type Output = (); type Pipe = I; type ReduceA = ForEachReducer<I::Item, F>; type ReduceB = PushReducer<()>; type ReduceC = PushReducer<()>; fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) { ( self.i, ForEachReducer(self.f, PhantomData), PushReducer::new(), PushReducer::new(), ) } } #[pin_project] #[derive(Educe, Serialize, Deserialize)] #[educe(Clone(bound = "F: Clone"))] #[serde( bound(serialize = "F: Serialize"), bound(deserialize = "F: Deserialize<'de>") )] pub struct ForEachReducer<A, F>(F, PhantomData<fn() -> A>); impl<A, F> Reducer<A> for ForEachReducer<A, F> where F: FnMut(A) + Clone, { type Output = (); type Async = Self; fn into_async(self) -> Self::Async { self } } impl<A, F> ReducerProcessSend<A> for ForEachReducer<A, F> where F: FnMut(A) + Clone, { type Output = (); } impl<A, F> ReducerSend<A> for ForEachReducer<A, F> where F: FnMut(A) + Clone, { type Output = (); } impl<A, F> Sink<A> for ForEachReducer<A, F> where F: FnMut(A) + Clone, { #[inline(always)] fn poll_pipe( self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = A>>, ) -> Poll<()> { let self_ = self.project(); while let Some(item) = ready!(stream.as_mut().poll_next(cx)) { self_.0(item); } Poll::Ready(()) } } impl<A, F> ReducerAsync<A> for ForEachReducer<A, F> where F: FnMut(A) + Clone, { type Output = (); fn output<'a>(self: Pin<&'a mut Self>) -> Pin<Box<dyn Future<Output = Self::Output> + 'a>> { Box::pin(async move {}) } }