amadeus_core/par_sink/
histogram.rs1#![allow(clippy::type_complexity)]
2
3use derive_new::new;
4use educe::Educe;
5use itertools::Itertools;
6use replace_with::replace_with_or_default;
7use serde::{Deserialize, Serialize};
8use std::{collections::HashMap, hash::Hash, marker::PhantomData};
9
10use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};
11
12#[derive(new)]
13#[must_use]
14pub struct Histogram<P> {
15 pipe: P,
16}
17
18impl_par_dist! {
19 impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for Histogram<P>
20 where
21 P::Output: Hash + Ord + Send + 'static,
22 {
23 folder_par_sink!(HistogramFolder<P::Output, StepA>, HistogramFolder<P::Output, StepB>, self, HistogramFolder::new(), HistogramFolder::new());
24 }
25}
26
27#[derive(Educe, Serialize, Deserialize, new)]
28#[educe(Clone)]
29#[serde(bound = "")]
30pub struct HistogramFolder<B, Step> {
31 marker: PhantomData<fn() -> (B, Step)>,
32}
33
34pub struct StepA;
35pub struct StepB;
36
37impl<Item> FolderSync<Item> for HistogramFolder<Item, StepA>
38where
39 Item: Hash + Ord,
40{
41 type State = HashMap<Item, usize>;
42 type Done = Self::State;
43
44 fn zero(&mut self) -> Self::State {
45 HashMap::new()
46 }
47 fn push(&mut self, state: &mut Self::Done, item: Item) {
48 *state.entry(item).or_insert(0) += 1;
49 }
50 fn done(&mut self, state: Self::State) -> Self::Done {
51 state
52 }
53}
54impl<B> FolderSync<HashMap<B, usize>> for HistogramFolder<B, StepB>
55where
56 B: Hash + Ord,
57{
58 type State = Vec<(B, usize)>;
59 type Done = Self::State;
60
61 fn zero(&mut self) -> Self::State {
62 Vec::new()
63 }
64 fn push(&mut self, state: &mut Self::State, b: HashMap<B, usize>) {
65 let mut b = b.into_iter().collect::<Vec<_>>();
66 b.sort_by(|a, b| a.0.cmp(&b.0));
67 replace_with_or_default(state, |state| {
68 state
69 .into_iter()
70 .merge(b)
71 .coalesce(|a, b| {
72 if a.0 == b.0 {
73 Ok((a.0, a.1 + b.1))
74 } else {
75 Err((a, b))
76 }
77 })
78 .collect()
79 })
80 }
81 fn done(&mut self, state: Self::State) -> Self::Done {
82 state
83 }
84}
85impl<B> FolderSync<Vec<(B, usize)>> for HistogramFolder<B, StepB>
86where
87 B: Hash + Ord,
88{
89 type State = Vec<(B, usize)>;
90 type Done = Self::State;
91
92 fn zero(&mut self) -> Self::State {
93 Vec::new()
94 }
95 fn push(&mut self, state: &mut Self::State, b: Vec<(B, usize)>) {
96 replace_with_or_default(state, |state| {
97 state
98 .into_iter()
99 .merge(b)
100 .coalesce(|a, b| {
101 if a.0 == b.0 {
102 Ok((a.0, a.1 + b.1))
103 } else {
104 Err((a, b))
105 }
106 })
107 .collect()
108 })
109 }
110 #[inline(always)]
111 fn done(&mut self, state: Self::State) -> Self::Done {
112 state
113 }
114}