1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
#![allow(unused_imports, clippy::single_component_path_imports, clippy::option_if_let_else)] use super::FolderSync; mod macros { #[macro_export] macro_rules! combiner_par_sink { ($combiner:ty, $self:ident, $init:expr) => { type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done; type Pipe = P; type ReduceA = FolderSyncReducer<P::Output, $combiner>; type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner>; fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) { let init = $init; ( $self.pipe, FolderSyncReducer::new(init.clone()), FolderSyncReducer::new(init), ) } }; } #[macro_export] macro_rules! combiner_dist_sink { ($combiner:ty, $self:ident, $init:expr) => { type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done; type Pipe = P; type ReduceA = FolderSyncReducer<P::Output, $combiner>; type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner>; type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $combiner>; fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) { let init = $init; ( $self.pipe, FolderSyncReducer::new(init.clone()), FolderSyncReducer::new(init.clone()), FolderSyncReducer::new(init), ) } }; } pub(crate) use combiner_dist_sink; pub(crate) use combiner_par_sink; } pub(crate) use macros::{combiner_dist_sink, combiner_par_sink}; pub trait CombinerSync { type Done; fn combine(&mut self, a: Self::Done, b: Self::Done) -> Self::Done; } impl<C, Item, B> FolderSync<Item> for C where C: CombinerSync<Done = B>, Item: Into<Option<B>>, { type Done = Option<B>; fn zero(&mut self) -> Self::Done { None } fn push(&mut self, state: &mut Self::Done, item: Item) { if let Some(item) = item.into() { *state = Some(if let Some(state) = state.take() { self.combine(state, item) } else { item }); } } }