1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
use replace_with::replace_with_or_abort;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;

use super::{DistributedIteratorMulti, DistributedReducer, ReduceFactory, Reducer, ReducerA};
use crate::pool::ProcessSend;

#[must_use]
pub struct Combine<I, F> {
	i: I,
	f: F,
}
impl<I, F> Combine<I, F> {
	pub(super) fn new(i: I, f: F) -> Self {
		Self { i, f }
	}
}

impl<I: DistributedIteratorMulti<Source>, Source, F> DistributedReducer<I, Source, Option<I::Item>>
	for Combine<I, F>
where
	F: FnMut(I::Item, I::Item) -> I::Item + Clone + ProcessSend,
	I::Item: ProcessSend,
{
	type ReduceAFactory = CombineReducerFactory<I::Item, I::Item, CombineFn<F>>;
	type ReduceA = CombineReducer<I::Item, I::Item, CombineFn<F>>;
	type ReduceB = CombineReducer<Option<I::Item>, I::Item, CombineFn<F>>;

	fn reducers(self) -> (I, Self::ReduceAFactory, Self::ReduceB) {
		(
			self.i,
			CombineReducerFactory(CombineFn(self.f.clone()), PhantomData),
			CombineReducer(None, CombineFn(self.f), PhantomData),
		)
	}
}

#[derive(Copy, Clone, Serialize, Deserialize)]
pub struct CombineFn<F>(F);
impl<F, A> Combiner<A> for CombineFn<F>
where
	F: FnMut(A, A) -> A,
{
	fn combine(&mut self, a: A, b: A) -> A {
		self.0(a, b)
	}
}

pub trait Combiner<A> {
	fn combine(&mut self, a: A, b: A) -> A;
}

pub struct CombineReducerFactory<A, B, F>(pub(crate) F, pub(crate) PhantomData<fn(A, B)>);

impl<A, B, F> ReduceFactory for CombineReducerFactory<A, B, F>
where
	Option<B>: From<A>,
	F: Combiner<B> + Clone,
{
	type Reducer = CombineReducer<A, B, F>;
	fn make(&self) -> Self::Reducer {
		CombineReducer(None, self.0.clone(), PhantomData)
	}
}

#[derive(Serialize, Deserialize)]
#[serde(
	bound(serialize = "B: Serialize, F: Serialize"),
	bound(deserialize = "B: Deserialize<'de>, F: Deserialize<'de>")
)]
pub struct CombineReducer<A, B, F>(
	pub(crate) Option<B>,
	pub(crate) F,
	pub(crate) PhantomData<fn(A)>,
);

impl<A, B, F> Reducer for CombineReducer<A, B, F>
where
	Option<B>: From<A>,
	F: Combiner<B>,
{
	type Item = A;
	type Output = Option<B>;

	#[inline(always)]
	fn push(&mut self, item: Self::Item) -> bool {
		let item: Option<B> = item.into();
		let self_1 = &mut self.1;
		if let Some(item) = item {
			replace_with_or_abort(&mut self.0, |self_0| {
				Some(if let Some(cur) = self_0 {
					self_1.combine(cur, item)
				} else {
					item
				})
			});
		}
		true
	}
	fn ret(self) -> Self::Output {
		self.0
	}
}
impl<A, B, F> ReducerA for CombineReducer<A, B, F>
where
	A: 'static,
	Option<B>: From<A>,
	F: Combiner<B> + ProcessSend,
	B: ProcessSend,
{
	type Output = Option<B>;
}