amadeus_core/par_sink/
mean.rs

1use derive_new::new;
2use educe::Educe;
3use serde::{Deserialize, Serialize};
4use std::marker::PhantomData;
5
6use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};
7use crate::util::u64_to_f64;
8
9#[derive(new)]
10#[must_use]
11pub struct Mean<P> {
12	pipe: P,
13}
14
15impl_par_dist! {
16	impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> {
17		folder_par_sink!(
18			MeanFolder<StepA>,
19			MeanFolder<StepB>,
20			self,
21			MeanFolder::new(),
22			MeanFolder::new()
23		);
24	}
25}
26
27#[derive(Educe, Serialize, Deserialize, new)]
28#[educe(Clone)]
29#[serde(bound = "")]
30
31pub struct MeanFolder<Step> {
32	marker: PhantomData<fn() -> Step>,
33}
34
35pub struct StepA;
36pub struct StepB;
37
38#[derive(Serialize, Deserialize, new)]
39pub struct State {
40	#[new(default)]
41	mean: f64,
42	#[new(default)]
43	correction: f64,
44	#[new(default)]
45	count: u64,
46}
47
48impl FolderSync<f64> for MeanFolder<StepA> {
49	type State = State;
50	type Done = f64;
51
52	#[inline(always)]
53	fn zero(&mut self) -> Self::State {
54		State::new()
55	}
56
57	#[inline(always)]
58	fn push(&mut self, state: &mut Self::State, item: f64) {
59		state.count += 1;
60		let f = (item - state.mean) / (u64_to_f64(state.count));
61		let y = f - state.correction;
62		let t = state.mean + y;
63		state.correction = (t - state.mean) - y;
64		state.mean = t;
65	}
66
67	#[inline(always)]
68	fn done(&mut self, state: Self::State) -> Self::Done {
69		state.mean
70	}
71}
72
73impl FolderSync<State> for MeanFolder<StepB> {
74	type State = State;
75	type Done = f64;
76
77	#[inline(always)]
78	fn zero(&mut self) -> Self::State {
79		State::new()
80	}
81
82	#[inline(always)]
83	fn push(&mut self, state: &mut Self::State, item: State) {
84		state.correction = (state.correction * u64_to_f64(state.count))
85			+ (item.correction * u64_to_f64(item.count)) / u64_to_f64(state.count + item.count);
86		state.mean = ((state.mean * u64_to_f64(state.count))
87			+ (item.mean * u64_to_f64(item.count)))
88			/ u64_to_f64(state.count + item.count);
89		state.count += item.count;
90	}
91
92	#[inline(always)]
93	fn done(&mut self, state: Self::State) -> Self::Done {
94		state.mean
95	}
96}