1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
use derive_new::new; use educe::Educe; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use super::{ combiner_par_sink, FolderSync, FolderSyncReducer, FolderSyncReducerFactory, ParallelPipe, ParallelSink }; #[derive(Educe, Serialize, Deserialize, new)] #[educe(Clone(bound = "F: Clone"))] #[serde( bound(serialize = "F: Serialize"), bound(deserialize = "F: Deserialize<'de>") )] pub struct ReduceFn<F, A>(F, PhantomData<fn() -> A>); impl<F, A, T> FolderSync<T> for ReduceFn<F, A> where F: FnMut(A, A) -> A, T: Into<Option<A>>, { type Output = Option<A>; fn zero(&mut self) -> Self::Output { None } fn push(&mut self, state: &mut Self::Output, item: T) { if let Some(item) = item.into() { *state = Some(if let Some(state) = state.take() { self.0(state, item) } else { item }); } } } #[derive(new)] #[must_use] pub struct Combine<I, F> { i: I, f: F, } impl_par_dist! { impl<I: ParallelPipe<Source>, Source, F> ParallelSink<Source> for Combine<I, F> where F: FnMut(I::Item, I::Item) -> I::Item + Clone + Send + 'static, I::Item: Send + 'static, { combiner_par_sink!(ReduceFn<F, I::Item>, self, ReduceFn::new(self.f)); } }