amadeus_core/par_sink/
sum.rs

1use derive_new::new;
2use educe::Educe;
3use replace_with::{replace_with, replace_with_or_abort};
4use serde::{Deserialize, Serialize};
5use std::{iter, marker::PhantomData};
6
7use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};
8
9#[derive(new)]
10#[must_use]
11pub struct Sum<P, B> {
12	pipe: P,
13	marker: PhantomData<fn() -> B>,
14}
15
16impl_par_dist! {
17	impl<P: ParallelPipe<Item>, Item, B> ParallelSink<Item> for Sum<P, B>
18	where
19		B: iter::Sum<P::Output> + iter::Sum<B> + Send + 'static,
20	{
21		folder_par_sink!(
22			SumFolder<B>,
23			SumFolder<B>,
24			self,
25			SumFolder::new(),
26			SumFolder::new()
27		);
28	}
29}
30
31#[derive(Educe, Serialize, Deserialize, new)]
32#[educe(Clone)]
33#[serde(bound = "")]
34pub struct SumFolder<B> {
35	marker: PhantomData<fn() -> B>,
36}
37
38impl<Item, B> FolderSync<Item> for SumFolder<B>
39where
40	B: iter::Sum<Item> + iter::Sum<B>,
41{
42	type State = B;
43	type Done = Self::State;
44
45	#[inline(always)]
46	fn zero(&mut self) -> Self::Done {
47		B::sum(iter::empty::<B>())
48	}
49	#[inline(always)]
50	fn push(&mut self, state: &mut Self::Done, item: Item) {
51		let default = || B::sum(iter::empty::<B>());
52		replace_with(state, default, |left| {
53			let right = iter::once(item).sum::<B>();
54			B::sum(iter::once(left).chain(iter::once(right)))
55		})
56	}
57	#[inline(always)]
58	fn done(&mut self, state: Self::State) -> Self::Done {
59		state
60	}
61}
62
63#[derive(Clone, Serialize, Deserialize)]
64pub struct SumZeroFolder<B> {
65	zero: Option<B>,
66}
67impl<B> SumZeroFolder<B> {
68	#[inline(always)]
69	pub(crate) fn new(zero: B) -> Self {
70		Self { zero: Some(zero) }
71	}
72}
73
74impl<Item> FolderSync<Item> for SumZeroFolder<Item>
75where
76	Option<Item>: iter::Sum<Item>,
77{
78	type State = Item;
79	type Done = Self::State;
80
81	#[inline(always)]
82	fn zero(&mut self) -> Self::Done {
83		self.zero.take().unwrap()
84	}
85	#[inline(always)]
86	fn push(&mut self, state: &mut Self::Done, item: Item) {
87		replace_with_or_abort(state, |left| {
88			let right = iter::once(item).sum::<Option<Item>>().unwrap();
89			<Option<Item> as iter::Sum<Item>>::sum(iter::once(left).chain(iter::once(right)))
90				.unwrap()
91		})
92	}
93
94	#[inline(always)]
95	fn done(&mut self, state: Self::State) -> Self::Done {
96		state
97	}
98}