1use serde::{Deserialize, Serialize};
2use std::{
3 cmp, collections::{hash_map::Entry, HashMap}, fmt::{self, Debug}, hash::Hash, iter, ops
4};
5use twox_hash::RandomXxHashBuilder;
6
7use crate::{
8 count_min::CountMinSketch, ordered_linked_list::{OrderedLinkedList, OrderedLinkedListIndex, OrderedLinkedListIter}, traits::{Intersect, New, UnionAssign}, IntersectPlusUnionIsPlus
9};
10
11#[derive(Clone, Serialize, Deserialize)]
36#[serde(bound(
37 serialize = "A: Hash + Eq + Serialize, C: Serialize, <C as New>::Config: Serialize",
38 deserialize = "A: Hash + Eq + Deserialize<'de>, C: Deserialize<'de>, <C as New>::Config: Deserialize<'de>"
39))]
40pub struct Top<A, C: New> {
41 map: HashMap<A, OrderedLinkedListIndex<'static>, RandomXxHashBuilder>,
42 list: OrderedLinkedList<Node<A, C>>,
43 count_min: CountMinSketch<A, C>,
44 config: <C as New>::Config,
45}
46impl<A: Hash + Eq + Clone, C: Ord + New + for<'a> UnionAssign<&'a C> + Intersect> Top<A, C> {
47 pub fn new(n: usize, probability: f64, tolerance: f64, config: <C as New>::Config) -> Self {
49 Self {
50 map: HashMap::with_capacity_and_hasher(n, RandomXxHashBuilder::default()),
51 list: OrderedLinkedList::new(n),
52 count_min: CountMinSketch::new(probability, tolerance, config.clone()),
53 config,
54 }
55 }
56 fn assert(&self) {
57 if !cfg!(feature = "assert") {
58 return;
59 }
60 for (k, &v) in &self.map {
61 assert!(&self.list[v].0 == k);
62 }
63 let mut cur = &self.list[self.list.head().unwrap()].1;
64 for &Node(_, ref count) in self.list.iter() {
65 assert!(cur >= count);
66 cur = count;
67 }
68 }
69 pub fn capacity(&self) -> usize {
71 self.list.capacity()
72 }
73 pub fn push<V: ?Sized>(&mut self, item: A, value: &V)
75 where
76 C: for<'a> ops::AddAssign<&'a V> + IntersectPlusUnionIsPlus,
77 {
78 match self.map.entry(item.clone()) {
79 Entry::Occupied(entry) => {
80 let offset = *entry.get();
81 self.list.mutate(offset, |Node(t, mut count)| {
82 count += value;
83 Node(t, count)
84 });
85 }
86 Entry::Vacant(entry) => {
87 if self.list.len() < self.list.capacity() {
88 let mut x = C::new(&self.config);
89 x += value;
90 let new = self.list.push_back(Node(item, x));
91 let new = unsafe { new.staticify() };
92 let _ = entry.insert(new);
93 } else {
94 let score = self.count_min.push(&item, value);
95 if score > self.list[self.list.tail().unwrap()].1 {
96 let old = self.list.pop_back();
97 let new = self.list.push_back(Node(item, score));
98 let new = unsafe { new.staticify() };
99 let _ = entry.insert(new);
100 let _ = self.map.remove(&old.0).unwrap();
101 self.count_min.union_assign(&old.0, &old.1);
102 }
103 }
104 }
105 }
106 self.assert();
107 }
108 pub fn clear(&mut self) {
110 self.map.clear();
111 self.list.clear();
112 self.count_min.clear();
113 }
114 pub fn iter(&self) -> TopIter<'_, A, C> {
116 TopIter {
117 list_iter: self.list.iter(),
118 }
119 }
120}
121impl<
122 A: Hash + Eq + Clone + Debug,
123 C: Ord + New + Clone + for<'a> UnionAssign<&'a C> + Intersect + Debug,
124 > Debug for Top<A, C>
125{
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 f.debug_list().entries(self.iter()).finish()
128 }
129}
130
131pub struct TopIter<'a, A: Hash + Eq + Clone + 'a, C: Ord + 'a> {
135 list_iter: OrderedLinkedListIter<'a, Node<A, C>>,
136}
137impl<'a, A: Hash + Eq + Clone, C: Ord + 'a> Clone for TopIter<'a, A, C> {
138 fn clone(&self) -> Self {
139 Self {
140 list_iter: self.list_iter.clone(),
141 }
142 }
143}
144impl<'a, A: Hash + Eq + Clone, C: Ord + 'a> Iterator for TopIter<'a, A, C> {
145 type Item = (&'a A, &'a C);
146 fn next(&mut self) -> Option<(&'a A, &'a C)> {
147 self.list_iter.next().map(|x| (&x.0, &x.1))
148 }
149}
150impl<'a, A: Hash + Eq + Clone + Debug, C: Ord + Debug + 'a> Debug for TopIter<'a, A, C> {
151 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
152 f.debug_list().entries(self.clone()).finish()
153 }
154}
155
156impl<
157 A: Hash + Eq + Clone,
158 C: Ord
159 + New
160 + Clone
161 + for<'a> ops::AddAssign<&'a C>
162 + for<'a> UnionAssign<&'a C>
163 + Intersect
164 + IntersectPlusUnionIsPlus,
165 > iter::Sum<Top<A, C>> for Option<Top<A, C>>
166{
167 fn sum<I>(mut iter: I) -> Self
168 where
169 I: Iterator<Item = Top<A, C>>,
170 {
171 let mut total = iter.next()?;
172 for sample in iter {
173 total += sample;
174 }
175 Some(total)
176 }
177}
178impl<
179 A: Hash + Eq + Clone,
180 C: Ord
181 + New
182 + Clone
183 + for<'a> ops::AddAssign<&'a C>
184 + for<'a> UnionAssign<&'a C>
185 + Intersect
186 + IntersectPlusUnionIsPlus,
187 > ops::Add for Top<A, C>
188{
189 type Output = Self;
190 fn add(mut self, other: Self) -> Self {
191 self += other;
192 self
193 }
194}
195impl<
196 A: Hash + Eq + Clone,
197 C: Ord
198 + New
199 + Clone
200 + for<'a> ops::AddAssign<&'a C>
201 + for<'a> UnionAssign<&'a C>
202 + Intersect
203 + IntersectPlusUnionIsPlus,
204 > ops::AddAssign for Top<A, C>
205{
206 fn add_assign(&mut self, other: Self) {
207 assert_eq!(self.capacity(), other.capacity());
208
209 let mut scores = HashMap::<_, C>::new();
210 for (url, count) in self.iter() {
211 *scores
212 .entry(url.clone())
213 .or_insert_with(|| C::new(&self.config)) += count;
214 }
215 for (url, count) in other.iter() {
216 *scores
217 .entry(url.clone())
218 .or_insert_with(|| C::new(&self.config)) += count;
219 }
220 let mut top = self.clone();
221 top.clear();
222 for (url, count) in scores {
223 top.push(url.clone(), &count);
224 }
225 *self = top;
226 }
227}
228
229#[derive(Clone, Serialize, Deserialize)]
230struct Node<T, C>(T, C);
231impl<T, C: Ord> Ord for Node<T, C> {
232 #[inline(always)]
233 fn cmp(&self, other: &Self) -> cmp::Ordering {
234 self.1.cmp(&other.1)
235 }
236}
237impl<T, C: PartialOrd> PartialOrd for Node<T, C> {
238 #[inline(always)]
239 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
240 self.1.partial_cmp(&other.1)
241 }
242}
243impl<T, C: PartialEq> PartialEq for Node<T, C> {
244 #[inline(always)]
245 fn eq(&self, other: &Self) -> bool {
246 self.1.eq(&other.1)
247 }
248}
249impl<T, C: Eq> Eq for Node<T, C> {}
250
251#[cfg(test)]
252mod test {
253 use super::*;
254 use crate::{distinct::HyperLogLog, traits::IntersectPlusUnionIsPlus};
255 use rand::{self, Rng, SeedableRng};
256 use std::time;
257
258 #[test]
259 #[cfg_attr(miri, ignore)]
260 fn abc() {
261 let mut rng =
262 rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
263 let mut top = Top::<String, usize>::new(100, 0.99, 2.0 / 1000.0, ());
264 let mut x = HashMap::new();
265 for _ in 0..10_000 {
266 let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0);
267 let c = rng.gen_range(0, 50);
268 let record = match (a, b) {
269 (true, _) => format!("a{}", c),
270 (false, true) => format!("b{}", c),
271 (false, false) => format!("c{}", c),
272 };
273 top.push(record.clone(), &1);
274 *x.entry(record).or_insert(0) += 1;
275 }
276 println!("{:#?}", top);
277 let mut x = x.into_iter().collect::<Vec<_>>();
278 x.sort_by_key(|x| cmp::Reverse(x.1));
279 println!("{:#?}", x);
280 }
281
282 #[derive(Serialize, Deserialize)]
283 #[serde(bound = "")]
284 struct Hll<V>(HyperLogLog<V>);
285 impl<V: Hash> Ord for Hll<V> {
286 #[inline(always)]
287 fn cmp(&self, other: &Self) -> cmp::Ordering {
288 self.0.len().partial_cmp(&other.0.len()).unwrap()
289 }
290 }
291 impl<V: Hash> PartialOrd for Hll<V> {
292 #[inline(always)]
293 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
294 self.0.len().partial_cmp(&other.0.len())
295 }
296 }
297 impl<V: Hash> PartialEq for Hll<V> {
298 #[inline(always)]
299 fn eq(&self, other: &Self) -> bool {
300 self.0.len().eq(&other.0.len())
301 }
302 }
303 impl<V: Hash> Eq for Hll<V> {}
304 impl<V: Hash> Clone for Hll<V> {
305 fn clone(&self) -> Self {
306 Self(self.0.clone())
307 }
308 }
309 impl<V: Hash> New for Hll<V> {
310 type Config = f64;
311 fn new(config: &Self::Config) -> Self {
312 Self(New::new(config))
313 }
314 }
315 impl<V: Hash> Intersect for Hll<V> {
316 fn intersect<'a>(iter: impl Iterator<Item = &'a Self>) -> Option<Self>
317 where
318 Self: Sized + 'a,
319 {
320 Intersect::intersect(iter.map(|x| &x.0)).map(Self)
321 }
322 }
323 impl<'a, V: Hash> UnionAssign<&'a Hll<V>> for Hll<V> {
324 fn union_assign(&mut self, rhs: &'a Self) {
325 self.0.union_assign(&rhs.0)
326 }
327 }
328 impl<'a, V: Hash> ops::AddAssign<&'a V> for Hll<V> {
329 fn add_assign(&mut self, rhs: &'a V) {
330 self.0.add_assign(rhs)
331 }
332 }
333 impl<V: Hash> Debug for Hll<V> {
334 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
335 self.0.fmt(fmt)
336 }
337 }
338 impl<V> IntersectPlusUnionIsPlus for Hll<V> {
339 const VAL: bool = <HyperLogLog<V> as IntersectPlusUnionIsPlus>::VAL;
340 }
341
342 #[test]
343 #[cfg_attr(miri, ignore)]
344 fn top_hll() {
345 let mut rng =
346 rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
347 let mut top = Top::<String, Hll<String>>::new(1000, 0.99, 2.0 / 1000.0, 0.00408);
348 for _ in 0..5_000 {
350 let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0);
351 let c = rng.gen_range(0, 800);
352 let record = match (a, b) {
353 (true, _) => (format!("a{}", c), format!("{}", rng.gen_range(0, 500))),
354 (false, true) => (format!("b{}", c), format!("{}", rng.gen_range(0, 200))),
355 (false, false) => (format!("c{}", c), format!("{}", rng.gen_range(0, 200))),
356 };
357 top.push(record.0, &record.1);
362 }
363 println!("{:#?}", top);
364 }
368
369 #[ignore] #[test]
371 fn many() {
372 let start = time::Instant::now();
373
374 let mut rng =
375 rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
376 let mut top = Top::<String, Hll<String>>::new(1000, 0.99, 2.0 / 1000.0, 0.05);
377 for _ in 0..5_000_000 {
379 let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0);
380 let c = rng.gen_range(0, 800);
381 let record = match (a, b) {
382 (true, _) => (format!("a{}", c), format!("{}", rng.gen_range(0, 500))),
383 (false, true) => (format!("b{}", c), format!("{}", rng.gen_range(0, 200))),
384 (false, false) => (format!("c{}", c), format!("{}", rng.gen_range(0, 200))),
385 };
386 top.push(record.0, &record.1);
391 }
392
393 println!("{:?}", start.elapsed());
394 }
399}
400
401