1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
use replace_with::replace_with_or_abort; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use super::{DistributedIteratorMulti, DistributedReducer, ReduceFactory, Reducer, ReducerA}; use crate::pool::ProcessSend; #[must_use] pub struct Combine<I, F> { i: I, f: F, } impl<I, F> Combine<I, F> { pub(super) fn new(i: I, f: F) -> Self { Self { i, f } } } impl<I: DistributedIteratorMulti<Source>, Source, F> DistributedReducer<I, Source, Option<I::Item>> for Combine<I, F> where F: FnMut(I::Item, I::Item) -> I::Item + Clone + ProcessSend, I::Item: ProcessSend, { type ReduceAFactory = CombineReducerFactory<I::Item, I::Item, CombineFn<F>>; type ReduceA = CombineReducer<I::Item, I::Item, CombineFn<F>>; type ReduceB = CombineReducer<Option<I::Item>, I::Item, CombineFn<F>>; fn reducers(self) -> (I, Self::ReduceAFactory, Self::ReduceB) { ( self.i, CombineReducerFactory(CombineFn(self.f.clone()), PhantomData), CombineReducer(None, CombineFn(self.f), PhantomData), ) } } #[derive(Copy, Clone, Serialize, Deserialize)] pub struct CombineFn<F>(F); impl<F, A> Combiner<A> for CombineFn<F> where F: FnMut(A, A) -> A, { fn combine(&mut self, a: A, b: A) -> A { self.0(a, b) } } pub trait Combiner<A> { fn combine(&mut self, a: A, b: A) -> A; } pub struct CombineReducerFactory<A, B, F>(pub(crate) F, pub(crate) PhantomData<fn(A, B)>); impl<A, B, F> ReduceFactory for CombineReducerFactory<A, B, F> where Option<B>: From<A>, F: Combiner<B> + Clone, { type Reducer = CombineReducer<A, B, F>; fn make(&self) -> Self::Reducer { CombineReducer(None, self.0.clone(), PhantomData) } } #[derive(Serialize, Deserialize)] #[serde( bound(serialize = "B: Serialize, F: Serialize"), bound(deserialize = "B: Deserialize<'de>, F: Deserialize<'de>") )] pub struct CombineReducer<A, B, F>( pub(crate) Option<B>, pub(crate) F, pub(crate) PhantomData<fn(A)>, ); impl<A, B, F> Reducer for CombineReducer<A, B, F> where Option<B>: From<A>, F: Combiner<B>, { type Item = A; type Output = Option<B>; #[inline(always)] fn push(&mut self, item: Self::Item) -> bool { let item: Option<B> = item.into(); let self_1 = &mut self.1; if let Some(item) = item { replace_with_or_abort(&mut self.0, |self_0| { Some(if let Some(cur) = self_0 { self_1.combine(cur, item) } else { item }) }); } true } fn ret(self) -> Self::Output { self.0 } } impl<A, B, F> ReducerA for CombineReducer<A, B, F> where A: 'static, Option<B>: From<A>, F: Combiner<B> + ProcessSend, B: ProcessSend, { type Output = Option<B>; }