1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
#![allow(clippy::type_complexity)] use derive_new::new; use educe::Educe; use itertools::Itertools; use replace_with::replace_with_or_default; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, hash::Hash, marker::PhantomData}; use super::{ folder_par_sink, FolderSync, FolderSyncReducer, FolderSyncReducerFactory, ParallelPipe, ParallelSink }; #[derive(new)] #[must_use] pub struct Histogram<I> { i: I, } impl_par_dist! { impl<I: ParallelPipe<Source>, Source> ParallelSink<Source> for Histogram<I> where I::Item: Hash + Ord + Send + 'static, { folder_par_sink!(HistogramFolder<I::Item, StepA>, HistogramFolder<I::Item, StepB>, self, HistogramFolder::new(), HistogramFolder::new()); } } #[derive(Educe, Serialize, Deserialize, new)] #[educe(Clone)] #[serde(bound = "")] pub struct HistogramFolder<B, Step> { marker: PhantomData<fn() -> (B, Step)>, } pub struct StepA; pub struct StepB; impl<B> FolderSync<B> for HistogramFolder<B, StepA> where B: Hash + Ord, { type Output = HashMap<B, usize>; fn zero(&mut self) -> Self::Output { HashMap::new() } fn push(&mut self, state: &mut Self::Output, item: B) { *state.entry(item).or_insert(0) += 1; } } impl<B> FolderSync<HashMap<B, usize>> for HistogramFolder<B, StepB> where B: Hash + Ord, { type Output = Vec<(B, usize)>; fn zero(&mut self) -> Self::Output { Vec::new() } fn push(&mut self, state: &mut Self::Output, b: HashMap<B, usize>) { let mut b = b.into_iter().collect::<Vec<_>>(); b.sort_by(|a, b| a.0.cmp(&b.0)); replace_with_or_default(state, |state| { state .into_iter() .merge(b) .coalesce(|a, b| { if a.0 == b.0 { Ok((a.0, a.1 + b.1)) } else { Err((a, b)) } }) .collect() }) } } impl<B> FolderSync<Vec<(B, usize)>> for HistogramFolder<B, StepB> where B: Hash + Ord, { type Output = Vec<(B, usize)>; fn zero(&mut self) -> Self::Output { Vec::new() } fn push(&mut self, state: &mut Self::Output, b: Vec<(B, usize)>) { replace_with_or_default(state, |state| { state .into_iter() .merge(b) .coalesce(|a, b| { if a.0 == b.0 { Ok((a.0, a.1 + b.1)) } else { Err((a, b)) } }) .collect() }) } }