1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
use derive_new::new; use educe::Educe; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink}; use crate::util::u64_to_f64; #[derive(new)] #[must_use] pub struct Mean<P> { pipe: P, } impl_par_dist! { impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> { folder_par_sink!( MeanFolder<StepA>, MeanFolder<StepB>, self, MeanFolder::new(), MeanFolder::new() ); } } #[derive(Educe, Serialize, Deserialize, new)] #[educe(Clone)] #[serde(bound = "")] pub struct MeanFolder<Step> { marker: PhantomData<fn() -> Step>, } pub struct StepA; pub struct StepB; #[derive(Serialize, Deserialize, new)] pub struct State { #[new(default)] mean: f64, #[new(default)] correction: f64, #[new(default)] count: u64, } impl FolderSync<f64> for MeanFolder<StepA> { type State = State; type Done = f64; #[inline(always)] fn zero(&mut self) -> Self::State { State::new() } #[inline(always)] fn push(&mut self, state: &mut Self::State, item: f64) { state.count += 1; let f = (item - state.mean) / (u64_to_f64(state.count)); let y = f - state.correction; let t = state.mean + y; state.correction = (t - state.mean) - y; state.mean = t; } #[inline(always)] fn done(&mut self, state: Self::State) -> Self::Done { state.mean } } impl FolderSync<State> for MeanFolder<StepB> { type State = State; type Done = f64; #[inline(always)] fn zero(&mut self) -> Self::State { State::new() } #[inline(always)] fn push(&mut self, state: &mut Self::State, item: State) { state.correction = (state.correction * u64_to_f64(state.count)) + (item.correction * u64_to_f64(item.count)) / u64_to_f64(state.count + item.count); state.mean = ((state.mean * u64_to_f64(state.count)) + (item.mean * u64_to_f64(item.count))) / u64_to_f64(state.count + item.count); state.count += item.count; } #[inline(always)] fn done(&mut self, state: Self::State) -> Self::Done { state.mean } }