Skip to main content

pipebase/map/
sort.rs

1use super::Map;
2use crate::common::{Aggregate, AggregateAs, ConfigInto, FromConfig, FromPath};
3use async_trait::async_trait;
4use serde::Deserialize;
5
6#[derive(Deserialize)]
7pub struct TopAggregatorConfig {
8    pub n: usize,
9    pub desc: bool,
10}
11
12impl FromPath for TopAggregatorConfig {}
13
14#[async_trait]
15impl ConfigInto<TopAggregator> for TopAggregatorConfig {}
16
17/// Find topN
18pub struct TopAggregator {
19    n: usize,
20    desc: bool,
21}
22
23impl TopAggregator {
24    fn qsort<U>(array: &mut [U], n: usize, desc: bool)
25    where
26        U: Ord + Clone,
27    {
28        let n = n.min(array.len());
29        let mut left = 0;
30        let mut right = array.len() - 1;
31        while left < right {
32            let position = Self::partition(array, left, right, desc);
33            if position + 1 == n {
34                return;
35            }
36            if position + 1 > n {
37                right = position - 1;
38                continue;
39            }
40            left = position + 1
41        }
42    }
43
44    fn partition<U>(v: &mut [U], left: usize, mut right: usize, desc: bool) -> usize
45    where
46        U: Ord + Clone,
47    {
48        let pivot = v[left].to_owned();
49        let mut j = left;
50        let mut i = left + 1;
51        while i <= right {
52            let u = v[i].to_owned();
53            if (u >= pivot && desc) || (u <= pivot && !desc) {
54                j += 1;
55                v.swap(i, j);
56                i += 1;
57                continue;
58            }
59            v.swap(i, right);
60            right -= 1;
61        }
62        v.swap(left, j);
63        j
64    }
65}
66
67#[async_trait]
68impl FromConfig<TopAggregatorConfig> for TopAggregator {
69    async fn from_config(config: TopAggregatorConfig) -> anyhow::Result<Self> {
70        Ok(TopAggregator {
71            n: config.n,
72            desc: config.desc,
73        })
74    }
75}
76
77impl<I, T, U> Aggregate<I, T, Vec<U>> for TopAggregator
78where
79    I: AggregateAs<Vec<U>>,
80    U: Ord + Clone,
81    T: IntoIterator<Item = I>,
82{
83    /// Merge comparable items into vectors
84    fn merge(&self, u: &mut Vec<U>, i: &I) {
85        u.extend(i.aggregate_value());
86    }
87
88    /// Sort merged items
89    fn operate(&self, u: &mut Vec<U>) {
90        Self::qsort(u, self.n, self.desc);
91        u.truncate(self.n);
92        u.sort_by(|a, b| match self.desc {
93            true => b.partial_cmp(a).unwrap(),
94            false => a.partial_cmp(b).unwrap(),
95        });
96    }
97}
98
99/// # Parameters
100/// * T: input
101/// * Vec<U>: output
102#[async_trait]
103impl<I, T, U> Map<T, Vec<U>, TopAggregatorConfig> for TopAggregator
104where
105    I: AggregateAs<Vec<U>>,
106    U: Ord + Clone,
107    T: IntoIterator<Item = I> + Send + 'static,
108{
109    async fn map(&mut self, data: T) -> anyhow::Result<Vec<U>> {
110        Ok(self.aggregate(data))
111    }
112}
113
114#[cfg(test)]
115mod top_aggregator_tests {
116
117    use crate::prelude::*;
118
119    #[tokio::test]
120    async fn test_top_aggregator() {
121        let (tx0, rx0) = channel!(Vec<u32>, 1023);
122        let (tx1, mut rx1) = channel!(Vec<u32>, 1024);
123        let channels = pipe_channels!(rx0, [tx1]);
124        let config = config!(
125            TopAggregatorConfig,
126            "resources/catalogs/top_aggregator_desc.yml"
127        );
128        let pipe = mapper!("top");
129        let f0 = populate_records(
130            tx0,
131            vec![vec![1, 2, 2, 3], vec![1, 1, 2, 1], vec![2, 2, 2, 2]],
132        );
133        f0.await;
134        join_pipes!([run_pipe!(pipe, config, channels)]);
135        let r = rx1.recv().await.unwrap();
136        assert_eq!(vec![3, 2, 2], r);
137        let r = rx1.recv().await.unwrap();
138        assert_eq!(vec![2, 1, 1], r);
139        let r = rx1.recv().await.unwrap();
140        assert_eq!(vec![2, 2, 2], r);
141    }
142
143    #[derive(AggregateAs, Clone, Debug, Equal, Eq, OrderedBy)]
144    #[agg(top)]
145    struct Record {
146        pub id: String,
147        #[equal]
148        #[order]
149        pub value: u32,
150    }
151
152    impl Record {
153        pub fn new(id: &str, value: u32) -> Self {
154            Record {
155                id: id.to_owned(),
156                value: value,
157            }
158        }
159    }
160
161    #[tokio::test]
162    async fn test_top_record() {
163        let (tx0, rx0) = channel!(Vec<Record>, 1024);
164        let (tx1, mut rx1) = channel!(Vec<Record>, 1024);
165        let channels = pipe_channels!(rx0, [tx1]);
166        let config = config!(
167            TopAggregatorConfig,
168            "resources/catalogs/top_aggregator_asc.yml"
169        );
170        let f0 = populate_records(
171            tx0,
172            vec![vec![
173                Record::new("five", 5),
174                Record::new("four", 4),
175                Record::new("two", 2),
176                Record::new("one", 1),
177                Record::new("three", 3),
178            ]],
179        );
180        f0.await;
181        let pipe = mapper!("top_record");
182        let pipe = run_pipe!(pipe, config, channels);
183        let _ = pipe.await;
184        let mut sorted_records = rx1.recv().await.unwrap();
185        assert_eq!(3, sorted_records.len());
186        let record = sorted_records.pop().unwrap();
187        assert_eq!("three", &record.id);
188        assert_eq!(3, record.value);
189        let record = sorted_records.pop().unwrap();
190        assert_eq!("two", &record.id);
191        assert_eq!(2, record.value);
192        let record = sorted_records.pop().unwrap();
193        assert_eq!("one", &record.id);
194        assert_eq!(1, record.value);
195    }
196}