1use std::collections::{BTreeMap, HashMap};
2use std::hash::Hash;
3
4use super::Map;
5use crate::common::{
6 Aggregate, AggregateAs, ConfigInto, FromConfig, FromPath, GroupAggregate, GroupAs, Init, Pair,
7};
8use async_trait::async_trait;
9use serde::Deserialize;
10
11#[derive(Deserialize)]
12pub struct AddAggregatorConfig {}
13
14#[async_trait]
15impl FromPath for AddAggregatorConfig {
16 async fn from_path<P>(_path: P) -> anyhow::Result<Self>
17 where
18 P: AsRef<std::path::Path> + Send,
19 {
20 Ok(AddAggregatorConfig {})
21 }
22}
23
24#[async_trait]
25impl ConfigInto<AddAggregator> for AddAggregatorConfig {}
26
27pub struct AddAggregator {}
29
30#[async_trait]
31impl FromConfig<AddAggregatorConfig> for AddAggregator {
32 async fn from_config(_config: AddAggregatorConfig) -> anyhow::Result<Self> {
33 Ok(AddAggregator {})
34 }
35}
36
37impl<I, T, U> Aggregate<I, T, U> for AddAggregator
38where
39 I: AggregateAs<U>,
40 U: std::ops::AddAssign<U> + Init,
41 T: IntoIterator<Item = I>,
42{
43 fn merge(&self, u: &mut U, i: &I) {
45 *u += i.aggregate_value();
46 }
47}
48
49#[async_trait]
50impl<I, T, U> Map<T, U, AddAggregatorConfig> for AddAggregator
51where
52 I: AggregateAs<U>,
53 U: std::ops::AddAssign<U> + Init,
54 T: IntoIterator<Item = I> + Send + 'static,
55{
56 async fn map(&mut self, data: T) -> anyhow::Result<U> {
59 Ok(self.aggregate(data))
60 }
61}
62
63#[cfg(test)]
64mod sum_aggregator_tests {
65 use crate::prelude::*;
66
67 #[tokio::test]
68 async fn test_sum_aggregator() {
69 let (tx0, rx0) = channel!(Vec<u32>, 1023);
70 let (tx1, mut rx1) = channel!(u32, 1024);
71 let channels = pipe_channels!(rx0, [tx1]);
72 let config = config!(AddAggregatorConfig);
73 let pipe = mapper!("summation");
74 let f0 = populate_records(tx0, vec![vec![1, 3, 5, 7], vec![2, 4, 6, 8]]);
75 f0.await;
76 join_pipes!([run_pipe!(pipe, config, channels)]);
77 let odd = rx1.recv().await.unwrap();
78 assert_eq!(16, odd);
79 let even = rx1.recv().await.unwrap();
80 assert_eq!(20, even);
81 }
82
83 #[derive(AggregateAs)]
84 struct Record {
85 #[agg(sum)]
86 value: u32,
87 }
88
89 impl Record {
90 pub fn new(value: u32) -> Self {
91 Record { value: value }
92 }
93 }
94
95 #[tokio::test]
96 async fn test_record_sum() {
97 let (tx0, rx0) = channel!(Vec<Record>, 1024);
98 let (tx1, mut rx1) = channel!(u32, 1024);
99 let channels = pipe_channels!(rx0, [tx1]);
100 let config = config!(AddAggregatorConfig);
101 let pipe = mapper!("record_sum");
102 let f0 = populate_records(
103 tx0,
104 vec![vec![Record::new(1), Record::new(2), Record::new(3)]],
105 );
106 f0.await;
107 let run_pipe = run_pipe!(pipe, config, channels);
108 let _ = run_pipe.await;
109 let sum = rx1.recv().await.unwrap();
110 assert_eq!(6, sum)
111 }
112}
113
114#[cfg(test)]
115mod count32_tests {
116
117 use crate::prelude::*;
118
119 #[derive(Debug, Clone, AggregateAs)]
120 #[agg(count32)]
121 struct Record {}
122
123 #[tokio::test]
124 async fn test_count32() {
125 let (tx0, rx0) = channel!(Vec<Record>, 1024);
126 let (tx1, mut rx1) = channel!(Count32, 1024);
127 let channels = pipe_channels!(rx0, [tx1]);
128 let config = config!(AddAggregatorConfig);
129 let pipe = mapper!("counter");
130 let pipe = run_pipe!(pipe, config, channels);
131 let f0 = populate_records(tx0, vec![vec![Record {}, Record {}, Record {}, Record {}]]);
132 f0.await;
133 join_pipes!([pipe]);
134 let c = rx1.recv().await.expect("count32 not found");
135 assert_eq!(4, c.get())
136 }
137}
138
139#[cfg(test)]
140mod test_avg {
141
142 use crate::prelude::*;
143
144 #[derive(Clone, Debug, AggregateAs)]
145 struct Record {
146 id: String,
147 #[agg(avgf32)]
148 value: i32,
149 }
150
151 #[tokio::test]
152 async fn test_averagef32() {
153 let (tx0, rx0) = channel!(Vec<Record>, 1024);
154 let (tx1, mut rx1) = channel!(Averagef32, 1024);
155 let channels = pipe_channels!(rx0, [tx1]);
156 let config = config!(AddAggregatorConfig);
157 let pipe = mapper!("average");
158 let pipe = run_pipe!(pipe, config, channels);
159 let f0 = populate_records(
160 tx0,
161 vec![vec![
162 Record {
163 id: "a".to_owned(),
164 value: 1,
165 },
166 Record {
167 id: "a".to_owned(),
168 value: 2,
169 },
170 Record {
171 id: "a".to_owned(),
172 value: 3,
173 },
174 ]],
175 );
176 f0.await;
177 join_pipes!([pipe]);
178 let avg = rx1.recv().await.expect("not average received");
179 assert_eq!(2.0, avg.average())
180 }
181}
182
183#[derive(Deserialize)]
184pub struct UnorderedGroupAddAggregatorConfig {}
185
186#[async_trait]
187impl FromPath for UnorderedGroupAddAggregatorConfig {
188 async fn from_path<P>(_path: P) -> anyhow::Result<Self>
189 where
190 P: AsRef<std::path::Path> + Send,
191 {
192 Ok(UnorderedGroupAddAggregatorConfig {})
193 }
194}
195
196#[async_trait]
197impl ConfigInto<UnorderedGroupAddAggregator> for UnorderedGroupAddAggregatorConfig {}
198
199#[async_trait]
200impl FromConfig<UnorderedGroupAddAggregatorConfig> for UnorderedGroupAddAggregator {
201 async fn from_config(_config: UnorderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
202 Ok(UnorderedGroupAddAggregator {})
203 }
204}
205
206pub struct UnorderedGroupAddAggregator {}
208
209impl<I, T, K, V> GroupAggregate<I, T, K, V, Vec<Pair<K, V>>, HashMap<K, V>>
210 for UnorderedGroupAddAggregator
211where
212 I: GroupAs<K> + AggregateAs<V>,
213 T: IntoIterator<Item = I>,
214 K: Hash + Eq + PartialEq,
215 V: std::ops::AddAssign<V> + Init + Clone,
216{
217 fn merge(&self, v: &mut V, i: &I) {
219 *v += i.aggregate_value();
220 }
221
222 fn group_table(&self) -> anyhow::Result<HashMap<K, V>> {
223 Ok(HashMap::new())
224 }
225}
226
227#[async_trait]
228impl<I, T, K, V> Map<T, Vec<Pair<K, V>>, UnorderedGroupAddAggregatorConfig>
229 for UnorderedGroupAddAggregator
230where
231 I: GroupAs<K> + AggregateAs<V>,
232 K: Hash + Eq + PartialEq,
233 V: std::ops::AddAssign<V> + Init + Clone,
234 T: IntoIterator<Item = I> + Send + 'static,
235{
236 async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
239 Ok(self.group_aggregate(data)?)
240 }
241}
242
243#[cfg(test)]
244mod test_group_sum_aggregator {
245 use crate::prelude::*;
246
247 #[tokio::test]
248 async fn test_u32_group_sum_aggregator() {
249 let (tx0, rx0) = channel!(Vec<u32>, 1024);
250 let (tx1, mut rx1) = channel!(Vec<Pair<u32, u32>>, 1024);
251 let channels = pipe_channels!(rx0, [tx1]);
252 let config = config!(UnorderedGroupAddAggregatorConfig);
253 let pipe = mapper!("group_summation");
254 let f0 = populate_records(tx0, vec![vec![2, 3, 2, 3, 2, 3]]);
255 f0.await;
256 join_pipes!([run_pipe!(pipe, config, channels)]);
257 let gs = rx1.recv().await.unwrap();
258 for p in gs {
259 match p.left() {
260 &2 => assert_eq!(&6, p.right()),
261 &3 => assert_eq!(&9, p.right()),
262 _ => unreachable!(),
263 }
264 }
265 }
266
267 #[derive(AggregateAs, GroupAs)]
268 struct Record {
269 #[group]
270 id: String,
271 #[agg(sum)]
272 value: u32,
273 }
274
275 impl Record {
276 pub fn new(id: &str, value: u32) -> Self {
277 Record {
278 id: id.to_owned(),
279 value: value,
280 }
281 }
282 }
283
284 #[tokio::test]
285 async fn test_record_group_sum() {
286 let (tx0, rx0) = channel!(Vec<Record>, 1024);
287 let (tx1, mut rx1) = channel!(Vec<Pair<String, u32>>, 1024);
288 let channels = pipe_channels!(rx0, [tx1]);
289 let config = config!(UnorderedGroupAddAggregatorConfig);
290 let pipe = mapper!("record_sum");
291 let f0 = populate_records(
292 tx0,
293 vec![vec![
294 Record::new("foo", 1),
295 Record::new("foo", 2),
296 Record::new("bar", 3),
297 ]],
298 );
299 f0.await;
300 let pipe_run = run_pipe!(pipe, config, channels);
301 let _ = pipe_run.await;
302 let gs = rx1.recv().await.unwrap();
303 assert_eq!(2, gs.len());
304 for sum in gs {
305 match sum.left().as_str() {
306 "foo" => assert_eq!(&3, sum.right()),
307 "bar" => assert_eq!(&3, sum.right()),
308 _ => unreachable!(),
309 }
310 }
311 }
312}
313
314#[cfg(test)]
315mod unordered_group_avg_f32_tests {
316
317 use crate::prelude::*;
318
319 #[derive(Clone, Debug, AggregateAs, GroupAs)]
320 struct Record {
321 #[group]
322 id: String,
323 #[agg(avgf32)]
324 value: i32,
325 }
326
327 #[tokio::test]
328 async fn test_unordered_group_avg_f32() {
329 let (tx0, rx0) = channel!(Vec<Record>, 1024);
330 let (tx1, mut rx1) = channel!(Vec<Pair<String, Averagef32>>, 1024);
331 let channels = pipe_channels!(rx0, [tx1]);
332 let config = config!(UnorderedGroupAddAggregatorConfig);
333 let pipe = mapper!("group_avg_f32");
334 let pipe = run_pipe!(pipe, config, channels);
335 let f0 = populate_records(
336 tx0,
337 vec![vec![
338 Record {
339 id: "foo".to_owned(),
340 value: 1,
341 },
342 Record {
343 id: "foo".to_owned(),
344 value: 2,
345 },
346 Record {
347 id: "bar".to_owned(),
348 value: 2,
349 },
350 Record {
351 id: "bar".to_owned(),
352 value: 3,
353 },
354 ]],
355 );
356 f0.await;
357 join_pipes!([pipe]);
358 let group_avgs = rx1.recv().await.expect("group average not found");
359 for avg in group_avgs {
360 match &avg.left()[..] {
361 "foo" => {
362 assert_eq!(1.5, avg.right().average())
363 }
364 "bar" => {
365 assert_eq!(2.5, avg.right().average())
366 }
367 _ => unreachable!("unexpected group {}", avg.left()),
368 }
369 }
370 }
371}
372
373#[cfg(test)]
374mod group_count32_tests {
375
376 use crate::prelude::*;
377
378 #[derive(Debug, Clone, GroupAs, AggregateAs)]
379 #[agg(count32)]
380 struct Record {
381 #[group]
382 key: String,
383 }
384
385 #[tokio::test]
386 async fn test_word_group_count_aggregate() {
387 let (tx0, rx0) = channel!(Vec<String>, 1024);
388 let (tx1, mut rx2) = channel!(Vec<Pair<String, Count32>>, 1024);
389 let channels = pipe_channels!(rx0, [tx1]);
390 let config = config!(UnorderedGroupAddAggregatorConfig);
391 let pipe = mapper!("word_count");
392 let f0 = populate_records(
393 tx0,
394 vec![vec![
395 "foo".to_owned(),
396 "foo".to_owned(),
397 "bar".to_owned(),
398 "buz".to_owned(),
399 "buz".to_owned(),
400 "buz".to_owned(),
401 ]],
402 );
403 f0.await;
404 join_pipes!([run_pipe!(pipe, config, channels)]);
405 let wcs = rx2.recv().await.unwrap();
406 for wc in wcs {
407 match wc.left().as_str() {
408 "foo" => assert_eq!(2, wc.right().get()),
409 "bar" => assert_eq!(1, wc.right().get()),
410 "buz" => assert_eq!(3, wc.right().get()),
411 _ => unreachable!(),
412 }
413 }
414 }
415
416 #[tokio::test]
417 async fn test_record_group_count32() {
418 let (tx0, rx0) = channel!(Vec<Record>, 1024);
419 let (tx1, mut rx1) = channel!(Vec<Pair<String, Count32>>, 1024);
420 let channels = pipe_channels!(rx0, [tx1]);
421 let config = config!(UnorderedGroupAddAggregatorConfig);
422 let pipe = mapper!("group_count32");
423 let pipe = run_pipe!(pipe, config, channels);
424 let f0 = populate_records(
425 tx0,
426 vec![vec![
427 Record {
428 key: "foo".to_owned(),
429 },
430 Record {
431 key: "foo".to_owned(),
432 },
433 Record {
434 key: "bar".to_owned(),
435 },
436 ]],
437 );
438 f0.await;
439 join_pipes!([pipe]);
440 let group_counts = rx1.recv().await.expect("group count32 not found");
441 for count in group_counts {
442 match &count.left()[..] {
443 "foo" => {
444 assert_eq!(2, count.right().get())
445 }
446 "bar" => {
447 assert_eq!(1, count.right().get())
448 }
449 _ => unreachable!("unexpected group {}", count.left()),
450 }
451 }
452 }
453}
454
455#[derive(Deserialize)]
456pub struct OrderedGroupAddAggregatorConfig {}
457
458#[async_trait]
459impl FromPath for OrderedGroupAddAggregatorConfig {
460 async fn from_path<P>(_path: P) -> anyhow::Result<Self>
461 where
462 P: AsRef<std::path::Path> + Send,
463 {
464 Ok(OrderedGroupAddAggregatorConfig {})
465 }
466}
467
468#[async_trait]
469impl ConfigInto<OrderedGroupAddAggregator> for OrderedGroupAddAggregatorConfig {}
470
471#[async_trait]
472impl FromConfig<OrderedGroupAddAggregatorConfig> for OrderedGroupAddAggregator {
473 async fn from_config(_config: OrderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
474 Ok(OrderedGroupAddAggregator {})
475 }
476}
477
478pub struct OrderedGroupAddAggregator {}
480
481impl<I, T, K, V> GroupAggregate<I, T, K, V, Vec<Pair<K, V>>, BTreeMap<K, V>>
482 for OrderedGroupAddAggregator
483where
484 I: GroupAs<K> + AggregateAs<V>,
485 T: IntoIterator<Item = I>,
486 K: Ord,
487 V: std::ops::AddAssign<V> + Init + Clone,
488{
489 fn merge(&self, v: &mut V, i: &I) {
491 *v += i.aggregate_value();
492 }
493
494 fn group_table(&self) -> anyhow::Result<BTreeMap<K, V>> {
495 Ok(BTreeMap::new())
496 }
497}
498
499#[async_trait]
505impl<I, T, K, V> Map<T, Vec<Pair<K, V>>, OrderedGroupAddAggregatorConfig>
506 for OrderedGroupAddAggregator
507where
508 I: GroupAs<K> + AggregateAs<V>,
509 K: Ord,
510 V: std::ops::AddAssign<V> + Init + Clone,
511 T: IntoIterator<Item = I> + Send + 'static,
512{
513 async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
514 Ok(self.group_aggregate(data)?)
515 }
516}
517
518#[cfg(test)]
519mod test_ordered_group_aggregator {
520 use crate::prelude::*;
521
522 #[tokio::test]
523 async fn test_word_group_count_aggregate() {
524 let (tx0, rx0) = channel!(Vec<String>, 1024);
525 let (tx1, mut rx2) = channel!(Vec<Pair<String, Count32>>, 1024);
526 let channels = pipe_channels!(rx0, [tx1]);
527 let config = config!(OrderedGroupAddAggregatorConfig);
528 let pipe = mapper!("ordered_word_count");
529 let f0 = populate_records(
530 tx0,
531 vec![vec![
532 "foo".to_owned(),
533 "foo".to_owned(),
534 "bar".to_owned(),
535 "buz".to_owned(),
536 "buz".to_owned(),
537 "buz".to_owned(),
538 ]],
539 );
540 f0.await;
541 join_pipes!([run_pipe!(pipe, config, channels)]);
542 let wcs = rx2.recv().await.unwrap();
543 let mut wcs_iter = wcs.into_iter();
544 let bar = wcs_iter.next().unwrap();
545 assert_eq!("bar", bar.left());
546 assert_eq!(1, bar.right().get());
547 let buz = wcs_iter.next().unwrap();
548 assert_eq!("buz", buz.left());
549 assert_eq!(3, buz.right().get());
550 let foo = wcs_iter.next().unwrap();
551 assert_eq!("foo", foo.left());
552 assert_eq!(2, foo.right().get());
553 }
554}