1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#![allow(clippy::type_complexity)]

use derive_new::new;
use educe::Educe;
use either::Either;
use replace_with::replace_with_or_abort;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;

use super::{
	folder_par_sink, FolderSync, FolderSyncReducer, FolderSyncReducerFactory, ParallelPipe, ParallelSink
};

#[derive(new)]
#[must_use]
pub struct Fold<I, ID, F, B> {
	i: I,
	identity: ID,
	op: F,
	marker: PhantomData<fn() -> B>,
}

impl_par_dist! {
	impl<I: ParallelPipe<Source>, Source, ID, F, B> ParallelSink<Source>
		for Fold<I, ID, F, B>
	where
		ID: FnMut() -> B + Clone + Send + 'static,
		F: FnMut(B, Either<I::Item, B>) -> B + Clone + Send + 'static,
		B: Send + 'static,
	{
		folder_par_sink!(FoldFolder<I::Item, ID, F, B, StepA>, FoldFolder<I::Item, ID, F, B, StepB>, self, FoldFolder::new(self.identity.clone(), self.op.clone()), FoldFolder::new(self.identity, self.op));
	}
}

#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone(bound = "ID: Clone, F: Clone"))]
#[serde(
	bound(serialize = "ID: Serialize, F: Serialize"),
	bound(deserialize = "ID: Deserialize<'de>, F: Deserialize<'de>")
)]
pub struct FoldFolder<A, ID, F, B, Step> {
	identity: ID,
	op: F,
	marker: PhantomData<fn() -> (A, B, Step)>,
}

pub struct StepA;
pub struct StepB;

impl<A, ID, F, B> FolderSync<A> for FoldFolder<A, ID, F, B, StepA>
where
	ID: FnMut() -> B,
	F: FnMut(B, Either<A, B>) -> B,
{
	type Output = B;

	fn zero(&mut self) -> Self::Output {
		(self.identity)()
	}
	fn push(&mut self, state: &mut Self::Output, item: A) {
		replace_with_or_abort(state, |state| (self.op)(state, Either::Left(item)))
	}
}
impl<A, ID, F, B> FolderSync<B> for FoldFolder<A, ID, F, B, StepB>
where
	ID: FnMut() -> B,
	F: FnMut(B, Either<A, B>) -> B,
{
	type Output = B;

	fn zero(&mut self) -> Self::Output {
		(self.identity)()
	}
	fn push(&mut self, state: &mut Self::Output, item: B) {
		replace_with_or_abort(state, |state| (self.op)(state, Either::Right(item)))
	}
}