1use super::Map;
2use crate::common::{Aggregate, AggregateAs, ConfigInto, FromConfig, FromPath};
3use async_trait::async_trait;
4use serde::Deserialize;
5
6#[derive(Deserialize)]
7pub struct TopAggregatorConfig {
8 pub n: usize,
9 pub desc: bool,
10}
11
12impl FromPath for TopAggregatorConfig {}
13
14#[async_trait]
15impl ConfigInto<TopAggregator> for TopAggregatorConfig {}
16
17pub struct TopAggregator {
19 n: usize,
20 desc: bool,
21}
22
23impl TopAggregator {
24 fn qsort<U>(array: &mut [U], n: usize, desc: bool)
25 where
26 U: Ord + Clone,
27 {
28 let n = n.min(array.len());
29 let mut left = 0;
30 let mut right = array.len() - 1;
31 while left < right {
32 let position = Self::partition(array, left, right, desc);
33 if position + 1 == n {
34 return;
35 }
36 if position + 1 > n {
37 right = position - 1;
38 continue;
39 }
40 left = position + 1
41 }
42 }
43
44 fn partition<U>(v: &mut [U], left: usize, mut right: usize, desc: bool) -> usize
45 where
46 U: Ord + Clone,
47 {
48 let pivot = v[left].to_owned();
49 let mut j = left;
50 let mut i = left + 1;
51 while i <= right {
52 let u = v[i].to_owned();
53 if (u >= pivot && desc) || (u <= pivot && !desc) {
54 j += 1;
55 v.swap(i, j);
56 i += 1;
57 continue;
58 }
59 v.swap(i, right);
60 right -= 1;
61 }
62 v.swap(left, j);
63 j
64 }
65}
66
67#[async_trait]
68impl FromConfig<TopAggregatorConfig> for TopAggregator {
69 async fn from_config(config: TopAggregatorConfig) -> anyhow::Result<Self> {
70 Ok(TopAggregator {
71 n: config.n,
72 desc: config.desc,
73 })
74 }
75}
76
77impl<I, T, U> Aggregate<I, T, Vec<U>> for TopAggregator
78where
79 I: AggregateAs<Vec<U>>,
80 U: Ord + Clone,
81 T: IntoIterator<Item = I>,
82{
83 fn merge(&self, u: &mut Vec<U>, i: &I) {
85 u.extend(i.aggregate_value());
86 }
87
88 fn operate(&self, u: &mut Vec<U>) {
90 Self::qsort(u, self.n, self.desc);
91 u.truncate(self.n);
92 u.sort_by(|a, b| match self.desc {
93 true => b.partial_cmp(a).unwrap(),
94 false => a.partial_cmp(b).unwrap(),
95 });
96 }
97}
98
99#[async_trait]
103impl<I, T, U> Map<T, Vec<U>, TopAggregatorConfig> for TopAggregator
104where
105 I: AggregateAs<Vec<U>>,
106 U: Ord + Clone,
107 T: IntoIterator<Item = I> + Send + 'static,
108{
109 async fn map(&mut self, data: T) -> anyhow::Result<Vec<U>> {
110 Ok(self.aggregate(data))
111 }
112}
113
114#[cfg(test)]
115mod top_aggregator_tests {
116
117 use crate::prelude::*;
118
119 #[tokio::test]
120 async fn test_top_aggregator() {
121 let (tx0, rx0) = channel!(Vec<u32>, 1023);
122 let (tx1, mut rx1) = channel!(Vec<u32>, 1024);
123 let channels = pipe_channels!(rx0, [tx1]);
124 let config = config!(
125 TopAggregatorConfig,
126 "resources/catalogs/top_aggregator_desc.yml"
127 );
128 let pipe = mapper!("top");
129 let f0 = populate_records(
130 tx0,
131 vec![vec![1, 2, 2, 3], vec![1, 1, 2, 1], vec![2, 2, 2, 2]],
132 );
133 f0.await;
134 join_pipes!([run_pipe!(pipe, config, channels)]);
135 let r = rx1.recv().await.unwrap();
136 assert_eq!(vec![3, 2, 2], r);
137 let r = rx1.recv().await.unwrap();
138 assert_eq!(vec![2, 1, 1], r);
139 let r = rx1.recv().await.unwrap();
140 assert_eq!(vec![2, 2, 2], r);
141 }
142
143 #[derive(AggregateAs, Clone, Debug, Equal, Eq, OrderedBy)]
144 #[agg(top)]
145 struct Record {
146 pub id: String,
147 #[equal]
148 #[order]
149 pub value: u32,
150 }
151
152 impl Record {
153 pub fn new(id: &str, value: u32) -> Self {
154 Record {
155 id: id.to_owned(),
156 value: value,
157 }
158 }
159 }
160
161 #[tokio::test]
162 async fn test_top_record() {
163 let (tx0, rx0) = channel!(Vec<Record>, 1024);
164 let (tx1, mut rx1) = channel!(Vec<Record>, 1024);
165 let channels = pipe_channels!(rx0, [tx1]);
166 let config = config!(
167 TopAggregatorConfig,
168 "resources/catalogs/top_aggregator_asc.yml"
169 );
170 let f0 = populate_records(
171 tx0,
172 vec![vec![
173 Record::new("five", 5),
174 Record::new("four", 4),
175 Record::new("two", 2),
176 Record::new("one", 1),
177 Record::new("three", 3),
178 ]],
179 );
180 f0.await;
181 let pipe = mapper!("top_record");
182 let pipe = run_pipe!(pipe, config, channels);
183 let _ = pipe.await;
184 let mut sorted_records = rx1.recv().await.unwrap();
185 assert_eq!(3, sorted_records.len());
186 let record = sorted_records.pop().unwrap();
187 assert_eq!("three", &record.id);
188 assert_eq!(3, record.value);
189 let record = sorted_records.pop().unwrap();
190 assert_eq!("two", &record.id);
191 assert_eq!(2, record.value);
192 let record = sorted_records.pop().unwrap();
193 assert_eq!("one", &record.id);
194 assert_eq!(1, record.value);
195 }
196}