use serde::{Deserialize, Serialize};
use std::{
cmp, collections::{hash_map::Entry, HashMap}, fmt::{self, Debug}, hash::Hash, iter, ops
};
use twox_hash::RandomXxHashBuilder;
use crate::{
count_min::CountMinSketch, ordered_linked_list::{OrderedLinkedList, OrderedLinkedListIndex, OrderedLinkedListIter}, traits::{Intersect, New, UnionAssign}, IntersectPlusUnionIsPlus
};
#[derive(Clone, Serialize, Deserialize)]
#[serde(bound(
serialize = "A: Hash + Eq + Serialize, C: Serialize, <C as New>::Config: Serialize",
deserialize = "A: Hash + Eq + Deserialize<'de>, C: Deserialize<'de>, <C as New>::Config: Deserialize<'de>"
))]
pub struct Top<A, C: New> {
map: HashMap<A, OrderedLinkedListIndex<'static>, RandomXxHashBuilder>,
list: OrderedLinkedList<Node<A, C>>,
count_min: CountMinSketch<A, C>,
config: <C as New>::Config,
}
impl<A: Hash + Eq + Clone, C: Ord + New + for<'a> UnionAssign<&'a C> + Intersect> Top<A, C> {
pub fn new(n: usize, probability: f64, tolerance: f64, config: <C as New>::Config) -> Self {
Self {
map: HashMap::with_capacity_and_hasher(n, RandomXxHashBuilder::default()),
list: OrderedLinkedList::new(n),
count_min: CountMinSketch::new(probability, tolerance, config.clone()),
config,
}
}
fn assert(&self) {
if !cfg!(feature = "assert") {
return;
}
for (k, &v) in &self.map {
assert!(&self.list[v].0 == k);
}
let mut cur = &self.list[self.list.head().unwrap()].1;
for &Node(_, ref count) in self.list.iter() {
assert!(cur >= count);
cur = count;
}
}
pub fn capacity(&self) -> usize {
self.list.capacity()
}
pub fn push<V: ?Sized>(&mut self, item: A, value: &V)
where
C: for<'a> ops::AddAssign<&'a V> + IntersectPlusUnionIsPlus,
{
match self.map.entry(item.clone()) {
Entry::Occupied(entry) => {
let offset = *entry.get();
self.list.mutate(offset, |Node(t, mut count)| {
count += value;
Node(t, count)
});
}
Entry::Vacant(entry) => {
if self.list.len() < self.list.capacity() {
let mut x = C::new(&self.config);
x += value;
let new = self.list.push_back(Node(item, x));
let new = unsafe { new.staticify() };
let _ = entry.insert(new);
} else {
let score = self.count_min.push(&item, value);
if score > self.list[self.list.tail().unwrap()].1 {
let old = self.list.pop_back();
let new = self.list.push_back(Node(item, score));
let new = unsafe { new.staticify() };
let _ = entry.insert(new);
let _ = self.map.remove(&old.0).unwrap();
self.count_min.union_assign(&old.0, &old.1);
}
}
}
}
self.assert();
}
pub fn clear(&mut self) {
self.map.clear();
self.list.clear();
self.count_min.clear();
}
pub fn iter(&self) -> TopIter<'_, A, C> {
TopIter {
list_iter: self.list.iter(),
}
}
}
impl<
A: Hash + Eq + Clone + Debug,
C: Ord + New + Clone + for<'a> UnionAssign<&'a C> + Intersect + Debug,
> Debug for Top<A, C>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
pub struct TopIter<'a, A: Hash + Eq + Clone + 'a, C: Ord + 'a> {
list_iter: OrderedLinkedListIter<'a, Node<A, C>>,
}
impl<'a, A: Hash + Eq + Clone, C: Ord + 'a> Clone for TopIter<'a, A, C> {
fn clone(&self) -> Self {
Self {
list_iter: self.list_iter.clone(),
}
}
}
impl<'a, A: Hash + Eq + Clone, C: Ord + 'a> Iterator for TopIter<'a, A, C> {
type Item = (&'a A, &'a C);
fn next(&mut self) -> Option<(&'a A, &'a C)> {
self.list_iter.next().map(|x| (&x.0, &x.1))
}
}
impl<'a, A: Hash + Eq + Clone + Debug, C: Ord + Debug + 'a> Debug for TopIter<'a, A, C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.clone()).finish()
}
}
impl<
A: Hash + Eq + Clone,
C: Ord
+ New
+ Clone
+ for<'a> ops::AddAssign<&'a C>
+ for<'a> UnionAssign<&'a C>
+ Intersect
+ IntersectPlusUnionIsPlus,
> iter::Sum<Top<A, C>> for Option<Top<A, C>>
{
fn sum<I>(mut iter: I) -> Self
where
I: Iterator<Item = Top<A, C>>,
{
let mut total = iter.next()?;
for sample in iter {
total += sample;
}
Some(total)
}
}
impl<
A: Hash + Eq + Clone,
C: Ord
+ New
+ Clone
+ for<'a> ops::AddAssign<&'a C>
+ for<'a> UnionAssign<&'a C>
+ Intersect
+ IntersectPlusUnionIsPlus,
> ops::Add for Top<A, C>
{
type Output = Self;
fn add(mut self, other: Self) -> Self {
self += other;
self
}
}
impl<
A: Hash + Eq + Clone,
C: Ord
+ New
+ Clone
+ for<'a> ops::AddAssign<&'a C>
+ for<'a> UnionAssign<&'a C>
+ Intersect
+ IntersectPlusUnionIsPlus,
> ops::AddAssign for Top<A, C>
{
fn add_assign(&mut self, other: Self) {
assert_eq!(self.capacity(), other.capacity());
let mut scores = HashMap::<_, C>::new();
for (url, count) in self.iter() {
*scores
.entry(url.clone())
.or_insert_with(|| C::new(&self.config)) += count;
}
for (url, count) in other.iter() {
*scores
.entry(url.clone())
.or_insert_with(|| C::new(&self.config)) += count;
}
let mut top = self.clone();
top.clear();
for (url, count) in scores {
top.push(url.clone(), &count);
}
*self = top;
}
}
#[derive(Clone, Serialize, Deserialize)]
struct Node<T, C>(T, C);
impl<T, C: Ord> Ord for Node<T, C> {
#[inline(always)]
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.1.cmp(&other.1)
}
}
impl<T, C: PartialOrd> PartialOrd for Node<T, C> {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
self.1.partial_cmp(&other.1)
}
}
impl<T, C: PartialEq> PartialEq for Node<T, C> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.1.eq(&other.1)
}
}
impl<T, C: Eq> Eq for Node<T, C> {}
#[cfg(test)]
mod test {
use super::*;
use crate::{distinct::HyperLogLog, traits::IntersectPlusUnionIsPlus};
use rand::{self, Rng, SeedableRng};
use std::time;
#[test]
fn abc() {
let mut rng =
rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
let mut top = Top::<String, usize>::new(100, 0.99, 2.0 / 1000.0, ());
let mut x = HashMap::new();
for _ in 0..10_000 {
let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0);
let c = rng.gen_range(0, 50);
let record = match (a, b) {
(true, _) => format!("a{}", c),
(false, true) => format!("b{}", c),
(false, false) => format!("c{}", c),
};
top.push(record.clone(), &1);
*x.entry(record).or_insert(0) += 1;
}
println!("{:#?}", top);
let mut x = x.into_iter().collect::<Vec<_>>();
x.sort_by_key(|x| cmp::Reverse(x.1));
println!("{:#?}", x);
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
struct HLL<V>(HyperLogLog<V>);
impl<V: Hash> Ord for HLL<V> {
#[inline(always)]
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.0.len().partial_cmp(&other.0.len()).unwrap()
}
}
impl<V: Hash> PartialOrd for HLL<V> {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
self.0.len().partial_cmp(&other.0.len())
}
}
impl<V: Hash> PartialEq for HLL<V> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.0.len().eq(&other.0.len())
}
}
impl<V: Hash> Eq for HLL<V> {}
impl<V: Hash> Clone for HLL<V> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<V: Hash> New for HLL<V> {
type Config = f64;
fn new(config: &Self::Config) -> Self {
Self(New::new(config))
}
}
impl<V: Hash> Intersect for HLL<V> {
fn intersect<'a>(iter: impl Iterator<Item = &'a Self>) -> Option<Self>
where
Self: Sized + 'a,
{
Intersect::intersect(iter.map(|x| &x.0)).map(Self)
}
}
impl<'a, V: Hash> UnionAssign<&'a HLL<V>> for HLL<V> {
fn union_assign(&mut self, rhs: &'a Self) {
self.0.union_assign(&rhs.0)
}
}
impl<'a, V: Hash> ops::AddAssign<&'a V> for HLL<V> {
fn add_assign(&mut self, rhs: &'a V) {
self.0.add_assign(rhs)
}
}
impl<V: Hash> Debug for HLL<V> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
impl<V> IntersectPlusUnionIsPlus for HLL<V> {
const VAL: bool = <HyperLogLog<V> as IntersectPlusUnionIsPlus>::VAL;
}
#[test]
fn top_hll() {
let mut rng =
rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
let mut top = Top::<String, HLL<String>>::new(1000, 0.99, 2.0 / 1000.0, 0.00408);
for _ in 0..5_000 {
let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0);
let c = rng.gen_range(0, 800);
let record = match (a, b) {
(true, _) => (format!("a{}", c), format!("{}", rng.gen_range(0, 500))),
(false, true) => (format!("b{}", c), format!("{}", rng.gen_range(0, 200))),
(false, false) => (format!("c{}", c), format!("{}", rng.gen_range(0, 200))),
};
top.push(record.0, &record.1);
}
println!("{:#?}", top);
}
#[ignore]
#[test]
fn many() {
let start = time::Instant::now();
let mut rng =
rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
let mut top = Top::<String, HLL<String>>::new(1000, 0.99, 2.0 / 1000.0, 0.05);
for _ in 0..5_000_000 {
let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0);
let c = rng.gen_range(0, 800);
let record = match (a, b) {
(true, _) => (format!("a{}", c), format!("{}", rng.gen_range(0, 500))),
(false, true) => (format!("b{}", c), format!("{}", rng.gen_range(0, 200))),
(false, false) => (format!("c{}", c), format!("{}", rng.gen_range(0, 200))),
};
top.push(record.0, &record.1);
}
println!("{:?}", start.elapsed());
}
}