amadeus_core/par_sink/
sum.rs1use derive_new::new;
2use educe::Educe;
3use replace_with::{replace_with, replace_with_or_abort};
4use serde::{Deserialize, Serialize};
5use std::{iter, marker::PhantomData};
6
7use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};
8
9#[derive(new)]
10#[must_use]
11pub struct Sum<P, B> {
12 pipe: P,
13 marker: PhantomData<fn() -> B>,
14}
15
16impl_par_dist! {
17 impl<P: ParallelPipe<Item>, Item, B> ParallelSink<Item> for Sum<P, B>
18 where
19 B: iter::Sum<P::Output> + iter::Sum<B> + Send + 'static,
20 {
21 folder_par_sink!(
22 SumFolder<B>,
23 SumFolder<B>,
24 self,
25 SumFolder::new(),
26 SumFolder::new()
27 );
28 }
29}
30
31#[derive(Educe, Serialize, Deserialize, new)]
32#[educe(Clone)]
33#[serde(bound = "")]
34pub struct SumFolder<B> {
35 marker: PhantomData<fn() -> B>,
36}
37
38impl<Item, B> FolderSync<Item> for SumFolder<B>
39where
40 B: iter::Sum<Item> + iter::Sum<B>,
41{
42 type State = B;
43 type Done = Self::State;
44
45 #[inline(always)]
46 fn zero(&mut self) -> Self::Done {
47 B::sum(iter::empty::<B>())
48 }
49 #[inline(always)]
50 fn push(&mut self, state: &mut Self::Done, item: Item) {
51 let default = || B::sum(iter::empty::<B>());
52 replace_with(state, default, |left| {
53 let right = iter::once(item).sum::<B>();
54 B::sum(iter::once(left).chain(iter::once(right)))
55 })
56 }
57 #[inline(always)]
58 fn done(&mut self, state: Self::State) -> Self::Done {
59 state
60 }
61}
62
63#[derive(Clone, Serialize, Deserialize)]
64pub struct SumZeroFolder<B> {
65 zero: Option<B>,
66}
67impl<B> SumZeroFolder<B> {
68 #[inline(always)]
69 pub(crate) fn new(zero: B) -> Self {
70 Self { zero: Some(zero) }
71 }
72}
73
74impl<Item> FolderSync<Item> for SumZeroFolder<Item>
75where
76 Option<Item>: iter::Sum<Item>,
77{
78 type State = Item;
79 type Done = Self::State;
80
81 #[inline(always)]
82 fn zero(&mut self) -> Self::Done {
83 self.zero.take().unwrap()
84 }
85 #[inline(always)]
86 fn push(&mut self, state: &mut Self::Done, item: Item) {
87 replace_with_or_abort(state, |left| {
88 let right = iter::once(item).sum::<Option<Item>>().unwrap();
89 <Option<Item> as iter::Sum<Item>>::sum(iter::once(left).chain(iter::once(right)))
90 .unwrap()
91 })
92 }
93
94 #[inline(always)]
95 fn done(&mut self, state: Self::State) -> Self::Done {
96 state
97 }
98}