amadeus_core/par_sink/
mean.rs1use derive_new::new;
2use educe::Educe;
3use serde::{Deserialize, Serialize};
4use std::marker::PhantomData;
5
6use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};
7use crate::util::u64_to_f64;
8
9#[derive(new)]
10#[must_use]
11pub struct Mean<P> {
12 pipe: P,
13}
14
15impl_par_dist! {
16 impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> {
17 folder_par_sink!(
18 MeanFolder<StepA>,
19 MeanFolder<StepB>,
20 self,
21 MeanFolder::new(),
22 MeanFolder::new()
23 );
24 }
25}
26
27#[derive(Educe, Serialize, Deserialize, new)]
28#[educe(Clone)]
29#[serde(bound = "")]
30
31pub struct MeanFolder<Step> {
32 marker: PhantomData<fn() -> Step>,
33}
34
35pub struct StepA;
36pub struct StepB;
37
38#[derive(Serialize, Deserialize, new)]
39pub struct State {
40 #[new(default)]
41 mean: f64,
42 #[new(default)]
43 correction: f64,
44 #[new(default)]
45 count: u64,
46}
47
48impl FolderSync<f64> for MeanFolder<StepA> {
49 type State = State;
50 type Done = f64;
51
52 #[inline(always)]
53 fn zero(&mut self) -> Self::State {
54 State::new()
55 }
56
57 #[inline(always)]
58 fn push(&mut self, state: &mut Self::State, item: f64) {
59 state.count += 1;
60 let f = (item - state.mean) / (u64_to_f64(state.count));
61 let y = f - state.correction;
62 let t = state.mean + y;
63 state.correction = (t - state.mean) - y;
64 state.mean = t;
65 }
66
67 #[inline(always)]
68 fn done(&mut self, state: Self::State) -> Self::Done {
69 state.mean
70 }
71}
72
73impl FolderSync<State> for MeanFolder<StepB> {
74 type State = State;
75 type Done = f64;
76
77 #[inline(always)]
78 fn zero(&mut self) -> Self::State {
79 State::new()
80 }
81
82 #[inline(always)]
83 fn push(&mut self, state: &mut Self::State, item: State) {
84 state.correction = (state.correction * u64_to_f64(state.count))
85 + (item.correction * u64_to_f64(item.count)) / u64_to_f64(state.count + item.count);
86 state.mean = ((state.mean * u64_to_f64(state.count))
87 + (item.mean * u64_to_f64(item.count)))
88 / u64_to_f64(state.count + item.count);
89 state.count += item.count;
90 }
91
92 #[inline(always)]
93 fn done(&mut self, state: Self::State) -> Self::Done {
94 state.mean
95 }
96}