pipebase/map/
add.rs

1use std::collections::{BTreeMap, HashMap};
2use std::hash::Hash;
3
4use super::Map;
5use crate::common::{
6    Aggregate, AggregateAs, ConfigInto, FromConfig, FromPath, GroupAggregate, GroupAs, Init, Pair,
7};
8use async_trait::async_trait;
9use serde::Deserialize;
10
11#[derive(Deserialize)]
12pub struct AddAggregatorConfig {}
13
14#[async_trait]
15impl FromPath for AddAggregatorConfig {
16    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
17    where
18        P: AsRef<std::path::Path> + Send,
19    {
20        Ok(AddAggregatorConfig {})
21    }
22}
23
24#[async_trait]
25impl ConfigInto<AddAggregator> for AddAggregatorConfig {}
26
27/// Aggregate items with `+=`
28pub struct AddAggregator {}
29
30#[async_trait]
31impl FromConfig<AddAggregatorConfig> for AddAggregator {
32    async fn from_config(_config: AddAggregatorConfig) -> anyhow::Result<Self> {
33        Ok(AddAggregator {})
34    }
35}
36
37impl<I, T, U> Aggregate<I, T, U> for AddAggregator
38where
39    I: AggregateAs<U>,
40    U: std::ops::AddAssign<U> + Init,
41    T: IntoIterator<Item = I>,
42{
43    /// Merge items of I as U using `+=`
44    fn merge(&self, u: &mut U, i: &I) {
45        *u += i.aggregate_value();
46    }
47}
48
49#[async_trait]
50impl<I, T, U> Map<T, U, AddAggregatorConfig> for AddAggregator
51where
52    I: AggregateAs<U>,
53    U: std::ops::AddAssign<U> + Init,
54    T: IntoIterator<Item = I> + Send + 'static,
55{
56    /// Input: T
57    /// Output: U
58    async fn map(&mut self, data: T) -> anyhow::Result<U> {
59        Ok(self.aggregate(data))
60    }
61}
62
63#[cfg(test)]
64mod sum_aggregator_tests {
65    use crate::prelude::*;
66
67    #[tokio::test]
68    async fn test_sum_aggregator() {
69        let (tx0, rx0) = channel!(Vec<u32>, 1023);
70        let (tx1, mut rx1) = channel!(u32, 1024);
71        let channels = pipe_channels!(rx0, [tx1]);
72        let config = config!(AddAggregatorConfig);
73        let pipe = mapper!("summation");
74        let f0 = populate_records(tx0, vec![vec![1, 3, 5, 7], vec![2, 4, 6, 8]]);
75        f0.await;
76        join_pipes!([run_pipe!(pipe, config, channels)]);
77        let odd = rx1.recv().await.unwrap();
78        assert_eq!(16, odd);
79        let even = rx1.recv().await.unwrap();
80        assert_eq!(20, even);
81    }
82
83    #[derive(AggregateAs)]
84    struct Record {
85        #[agg(sum)]
86        value: u32,
87    }
88
89    impl Record {
90        pub fn new(value: u32) -> Self {
91            Record { value: value }
92        }
93    }
94
95    #[tokio::test]
96    async fn test_record_sum() {
97        let (tx0, rx0) = channel!(Vec<Record>, 1024);
98        let (tx1, mut rx1) = channel!(u32, 1024);
99        let channels = pipe_channels!(rx0, [tx1]);
100        let config = config!(AddAggregatorConfig);
101        let pipe = mapper!("record_sum");
102        let f0 = populate_records(
103            tx0,
104            vec![vec![Record::new(1), Record::new(2), Record::new(3)]],
105        );
106        f0.await;
107        let run_pipe = run_pipe!(pipe, config, channels);
108        let _ = run_pipe.await;
109        let sum = rx1.recv().await.unwrap();
110        assert_eq!(6, sum)
111    }
112}
113
114#[cfg(test)]
115mod count32_tests {
116
117    use crate::prelude::*;
118
119    #[derive(Debug, Clone, AggregateAs)]
120    #[agg(count32)]
121    struct Record {}
122
123    #[tokio::test]
124    async fn test_count32() {
125        let (tx0, rx0) = channel!(Vec<Record>, 1024);
126        let (tx1, mut rx1) = channel!(Count32, 1024);
127        let channels = pipe_channels!(rx0, [tx1]);
128        let config = config!(AddAggregatorConfig);
129        let pipe = mapper!("counter");
130        let pipe = run_pipe!(pipe, config, channels);
131        let f0 = populate_records(tx0, vec![vec![Record {}, Record {}, Record {}, Record {}]]);
132        f0.await;
133        join_pipes!([pipe]);
134        let c = rx1.recv().await.expect("count32 not found");
135        assert_eq!(4, c.get())
136    }
137}
138
139#[cfg(test)]
140mod test_avg {
141
142    use crate::prelude::*;
143
144    #[derive(Clone, Debug, AggregateAs)]
145    struct Record {
146        id: String,
147        #[agg(avgf32)]
148        value: i32,
149    }
150
151    #[tokio::test]
152    async fn test_averagef32() {
153        let (tx0, rx0) = channel!(Vec<Record>, 1024);
154        let (tx1, mut rx1) = channel!(Averagef32, 1024);
155        let channels = pipe_channels!(rx0, [tx1]);
156        let config = config!(AddAggregatorConfig);
157        let pipe = mapper!("average");
158        let pipe = run_pipe!(pipe, config, channels);
159        let f0 = populate_records(
160            tx0,
161            vec![vec![
162                Record {
163                    id: "a".to_owned(),
164                    value: 1,
165                },
166                Record {
167                    id: "a".to_owned(),
168                    value: 2,
169                },
170                Record {
171                    id: "a".to_owned(),
172                    value: 3,
173                },
174            ]],
175        );
176        f0.await;
177        join_pipes!([pipe]);
178        let avg = rx1.recv().await.expect("not average received");
179        assert_eq!(2.0, avg.average())
180    }
181}
182
183#[derive(Deserialize)]
184pub struct UnorderedGroupAddAggregatorConfig {}
185
186#[async_trait]
187impl FromPath for UnorderedGroupAddAggregatorConfig {
188    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
189    where
190        P: AsRef<std::path::Path> + Send,
191    {
192        Ok(UnorderedGroupAddAggregatorConfig {})
193    }
194}
195
196#[async_trait]
197impl ConfigInto<UnorderedGroupAddAggregator> for UnorderedGroupAddAggregatorConfig {}
198
199#[async_trait]
200impl FromConfig<UnorderedGroupAddAggregatorConfig> for UnorderedGroupAddAggregator {
201    async fn from_config(_config: UnorderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
202        Ok(UnorderedGroupAddAggregator {})
203    }
204}
205
206/// Group added result by key
207pub struct UnorderedGroupAddAggregator {}
208
209impl<I, T, K, V> GroupAggregate<I, T, K, V, Vec<Pair<K, V>>, HashMap<K, V>>
210    for UnorderedGroupAddAggregator
211where
212    I: GroupAs<K> + AggregateAs<V>,
213    T: IntoIterator<Item = I>,
214    K: Hash + Eq + PartialEq,
215    V: std::ops::AddAssign<V> + Init + Clone,
216{
217    /// Merge items per group using `+=`
218    fn merge(&self, v: &mut V, i: &I) {
219        *v += i.aggregate_value();
220    }
221
222    fn group_table(&self) -> anyhow::Result<HashMap<K, V>> {
223        Ok(HashMap::new())
224    }
225}
226
227#[async_trait]
228impl<I, T, K, V> Map<T, Vec<Pair<K, V>>, UnorderedGroupAddAggregatorConfig>
229    for UnorderedGroupAddAggregator
230where
231    I: GroupAs<K> + AggregateAs<V>,
232    K: Hash + Eq + PartialEq,
233    V: std::ops::AddAssign<V> + Init + Clone,
234    T: IntoIterator<Item = I> + Send + 'static,
235{
236    /// Input: T
237    /// Output: Vec<Pair<K, V>>
238    async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
239        Ok(self.group_aggregate(data)?)
240    }
241}
242
243#[cfg(test)]
244mod test_group_sum_aggregator {
245    use crate::prelude::*;
246
247    #[tokio::test]
248    async fn test_u32_group_sum_aggregator() {
249        let (tx0, rx0) = channel!(Vec<u32>, 1024);
250        let (tx1, mut rx1) = channel!(Vec<Pair<u32, u32>>, 1024);
251        let channels = pipe_channels!(rx0, [tx1]);
252        let config = config!(UnorderedGroupAddAggregatorConfig);
253        let pipe = mapper!("group_summation");
254        let f0 = populate_records(tx0, vec![vec![2, 3, 2, 3, 2, 3]]);
255        f0.await;
256        join_pipes!([run_pipe!(pipe, config, channels)]);
257        let gs = rx1.recv().await.unwrap();
258        for p in gs {
259            match p.left() {
260                &2 => assert_eq!(&6, p.right()),
261                &3 => assert_eq!(&9, p.right()),
262                _ => unreachable!(),
263            }
264        }
265    }
266
267    #[derive(AggregateAs, GroupAs)]
268    struct Record {
269        #[group]
270        id: String,
271        #[agg(sum)]
272        value: u32,
273    }
274
275    impl Record {
276        pub fn new(id: &str, value: u32) -> Self {
277            Record {
278                id: id.to_owned(),
279                value: value,
280            }
281        }
282    }
283
284    #[tokio::test]
285    async fn test_record_group_sum() {
286        let (tx0, rx0) = channel!(Vec<Record>, 1024);
287        let (tx1, mut rx1) = channel!(Vec<Pair<String, u32>>, 1024);
288        let channels = pipe_channels!(rx0, [tx1]);
289        let config = config!(UnorderedGroupAddAggregatorConfig);
290        let pipe = mapper!("record_sum");
291        let f0 = populate_records(
292            tx0,
293            vec![vec![
294                Record::new("foo", 1),
295                Record::new("foo", 2),
296                Record::new("bar", 3),
297            ]],
298        );
299        f0.await;
300        let pipe_run = run_pipe!(pipe, config, channels);
301        let _ = pipe_run.await;
302        let gs = rx1.recv().await.unwrap();
303        assert_eq!(2, gs.len());
304        for sum in gs {
305            match sum.left().as_str() {
306                "foo" => assert_eq!(&3, sum.right()),
307                "bar" => assert_eq!(&3, sum.right()),
308                _ => unreachable!(),
309            }
310        }
311    }
312}
313
314#[cfg(test)]
315mod unordered_group_avg_f32_tests {
316
317    use crate::prelude::*;
318
319    #[derive(Clone, Debug, AggregateAs, GroupAs)]
320    struct Record {
321        #[group]
322        id: String,
323        #[agg(avgf32)]
324        value: i32,
325    }
326
327    #[tokio::test]
328    async fn test_unordered_group_avg_f32() {
329        let (tx0, rx0) = channel!(Vec<Record>, 1024);
330        let (tx1, mut rx1) = channel!(Vec<Pair<String, Averagef32>>, 1024);
331        let channels = pipe_channels!(rx0, [tx1]);
332        let config = config!(UnorderedGroupAddAggregatorConfig);
333        let pipe = mapper!("group_avg_f32");
334        let pipe = run_pipe!(pipe, config, channels);
335        let f0 = populate_records(
336            tx0,
337            vec![vec![
338                Record {
339                    id: "foo".to_owned(),
340                    value: 1,
341                },
342                Record {
343                    id: "foo".to_owned(),
344                    value: 2,
345                },
346                Record {
347                    id: "bar".to_owned(),
348                    value: 2,
349                },
350                Record {
351                    id: "bar".to_owned(),
352                    value: 3,
353                },
354            ]],
355        );
356        f0.await;
357        join_pipes!([pipe]);
358        let group_avgs = rx1.recv().await.expect("group average not found");
359        for avg in group_avgs {
360            match &avg.left()[..] {
361                "foo" => {
362                    assert_eq!(1.5, avg.right().average())
363                }
364                "bar" => {
365                    assert_eq!(2.5, avg.right().average())
366                }
367                _ => unreachable!("unexpected group {}", avg.left()),
368            }
369        }
370    }
371}
372
373#[cfg(test)]
374mod group_count32_tests {
375
376    use crate::prelude::*;
377
378    #[derive(Debug, Clone, GroupAs, AggregateAs)]
379    #[agg(count32)]
380    struct Record {
381        #[group]
382        key: String,
383    }
384
385    #[tokio::test]
386    async fn test_word_group_count_aggregate() {
387        let (tx0, rx0) = channel!(Vec<String>, 1024);
388        let (tx1, mut rx2) = channel!(Vec<Pair<String, Count32>>, 1024);
389        let channels = pipe_channels!(rx0, [tx1]);
390        let config = config!(UnorderedGroupAddAggregatorConfig);
391        let pipe = mapper!("word_count");
392        let f0 = populate_records(
393            tx0,
394            vec![vec![
395                "foo".to_owned(),
396                "foo".to_owned(),
397                "bar".to_owned(),
398                "buz".to_owned(),
399                "buz".to_owned(),
400                "buz".to_owned(),
401            ]],
402        );
403        f0.await;
404        join_pipes!([run_pipe!(pipe, config, channels)]);
405        let wcs = rx2.recv().await.unwrap();
406        for wc in wcs {
407            match wc.left().as_str() {
408                "foo" => assert_eq!(2, wc.right().get()),
409                "bar" => assert_eq!(1, wc.right().get()),
410                "buz" => assert_eq!(3, wc.right().get()),
411                _ => unreachable!(),
412            }
413        }
414    }
415
416    #[tokio::test]
417    async fn test_record_group_count32() {
418        let (tx0, rx0) = channel!(Vec<Record>, 1024);
419        let (tx1, mut rx1) = channel!(Vec<Pair<String, Count32>>, 1024);
420        let channels = pipe_channels!(rx0, [tx1]);
421        let config = config!(UnorderedGroupAddAggregatorConfig);
422        let pipe = mapper!("group_count32");
423        let pipe = run_pipe!(pipe, config, channels);
424        let f0 = populate_records(
425            tx0,
426            vec![vec![
427                Record {
428                    key: "foo".to_owned(),
429                },
430                Record {
431                    key: "foo".to_owned(),
432                },
433                Record {
434                    key: "bar".to_owned(),
435                },
436            ]],
437        );
438        f0.await;
439        join_pipes!([pipe]);
440        let group_counts = rx1.recv().await.expect("group count32 not found");
441        for count in group_counts {
442            match &count.left()[..] {
443                "foo" => {
444                    assert_eq!(2, count.right().get())
445                }
446                "bar" => {
447                    assert_eq!(1, count.right().get())
448                }
449                _ => unreachable!("unexpected group {}", count.left()),
450            }
451        }
452    }
453}
454
455#[derive(Deserialize)]
456pub struct OrderedGroupAddAggregatorConfig {}
457
458#[async_trait]
459impl FromPath for OrderedGroupAddAggregatorConfig {
460    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
461    where
462        P: AsRef<std::path::Path> + Send,
463    {
464        Ok(OrderedGroupAddAggregatorConfig {})
465    }
466}
467
468#[async_trait]
469impl ConfigInto<OrderedGroupAddAggregator> for OrderedGroupAddAggregatorConfig {}
470
471#[async_trait]
472impl FromConfig<OrderedGroupAddAggregatorConfig> for OrderedGroupAddAggregator {
473    async fn from_config(_config: OrderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
474        Ok(OrderedGroupAddAggregator {})
475    }
476}
477
478/// Group added result by ordered key
479pub struct OrderedGroupAddAggregator {}
480
481impl<I, T, K, V> GroupAggregate<I, T, K, V, Vec<Pair<K, V>>, BTreeMap<K, V>>
482    for OrderedGroupAddAggregator
483where
484    I: GroupAs<K> + AggregateAs<V>,
485    T: IntoIterator<Item = I>,
486    K: Ord,
487    V: std::ops::AddAssign<V> + Init + Clone,
488{
489    /// Merge items per group using `+=`
490    fn merge(&self, v: &mut V, i: &I) {
491        *v += i.aggregate_value();
492    }
493
494    fn group_table(&self) -> anyhow::Result<BTreeMap<K, V>> {
495        Ok(BTreeMap::new())
496    }
497}
498
499/// # Parameters
500/// * T: input
501/// * Vec<Pair<K, V>>: output
502/// * K: group value
503/// * V: aggregate value
504#[async_trait]
505impl<I, T, K, V> Map<T, Vec<Pair<K, V>>, OrderedGroupAddAggregatorConfig>
506    for OrderedGroupAddAggregator
507where
508    I: GroupAs<K> + AggregateAs<V>,
509    K: Ord,
510    V: std::ops::AddAssign<V> + Init + Clone,
511    T: IntoIterator<Item = I> + Send + 'static,
512{
513    async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
514        Ok(self.group_aggregate(data)?)
515    }
516}
517
518#[cfg(test)]
519mod test_ordered_group_aggregator {
520    use crate::prelude::*;
521
522    #[tokio::test]
523    async fn test_word_group_count_aggregate() {
524        let (tx0, rx0) = channel!(Vec<String>, 1024);
525        let (tx1, mut rx2) = channel!(Vec<Pair<String, Count32>>, 1024);
526        let channels = pipe_channels!(rx0, [tx1]);
527        let config = config!(OrderedGroupAddAggregatorConfig);
528        let pipe = mapper!("ordered_word_count");
529        let f0 = populate_records(
530            tx0,
531            vec![vec![
532                "foo".to_owned(),
533                "foo".to_owned(),
534                "bar".to_owned(),
535                "buz".to_owned(),
536                "buz".to_owned(),
537                "buz".to_owned(),
538            ]],
539        );
540        f0.await;
541        join_pipes!([run_pipe!(pipe, config, channels)]);
542        let wcs = rx2.recv().await.unwrap();
543        let mut wcs_iter = wcs.into_iter();
544        let bar = wcs_iter.next().unwrap();
545        assert_eq!("bar", bar.left());
546        assert_eq!(1, bar.right().get());
547        let buz = wcs_iter.next().unwrap();
548        assert_eq!("buz", buz.left());
549        assert_eq!(3, buz.right().get());
550        let foo = wcs_iter.next().unwrap();
551        assert_eq!("foo", foo.left());
552        assert_eq!(2, foo.right().get());
553    }
554}