1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use derive_new::new;
use educe::Educe;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;

use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};
use crate::util::u64_to_f64;

#[derive(new)]
#[must_use]
pub struct Mean<P> {
	pipe: P,
}

impl_par_dist! {
	impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> {
		folder_par_sink!(
			MeanFolder<StepA>,
			MeanFolder<StepB>,
			self,
			MeanFolder::new(),
			MeanFolder::new()
		);
	}
}

#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone)]
#[serde(bound = "")]

pub struct MeanFolder<Step> {
	marker: PhantomData<fn() -> Step>,
}

pub struct StepA;
pub struct StepB;

#[derive(Serialize, Deserialize, new)]
pub struct State {
	#[new(default)]
	mean: f64,
	#[new(default)]
	correction: f64,
	#[new(default)]
	count: u64,
}

impl FolderSync<f64> for MeanFolder<StepA> {
	type State = State;
	type Done = f64;

	#[inline(always)]
	fn zero(&mut self) -> Self::State {
		State::new()
	}

	#[inline(always)]
	fn push(&mut self, state: &mut Self::State, item: f64) {
		state.count += 1;
		let f = (item - state.mean) / (u64_to_f64(state.count));
		let y = f - state.correction;
		let t = state.mean + y;
		state.correction = (t - state.mean) - y;
		state.mean = t;
	}

	#[inline(always)]
	fn done(&mut self, state: Self::State) -> Self::Done {
		state.mean
	}
}

impl FolderSync<State> for MeanFolder<StepB> {
	type State = State;
	type Done = f64;

	#[inline(always)]
	fn zero(&mut self) -> Self::State {
		State::new()
	}

	#[inline(always)]
	fn push(&mut self, state: &mut Self::State, item: State) {
		state.correction = (state.correction * u64_to_f64(state.count))
			+ (item.correction * u64_to_f64(item.count)) / u64_to_f64(state.count + item.count);
		state.mean = ((state.mean * u64_to_f64(state.count))
			+ (item.mean * u64_to_f64(item.count)))
			/ u64_to_f64(state.count + item.count);
		state.count += item.count;
	}

	#[inline(always)]
	fn done(&mut self, state: Self::State) -> Self::Done {
		state.mean
	}
}