use ahash::RandomState;
use std::hash::Hash;
use std::sync::atomic::{AtomicU8, Ordering};
pub struct Estimator {
estimator: Box<[(Box<[AtomicU8]>, RandomState)]>,
}
impl Estimator {
pub fn optimal_paras(items: usize) -> (usize, usize) {
use std::cmp::max;
let error_range = 1.0 / (items as f64);
let failure_probability = 1.0 / (items as f64);
(
max((std::f64::consts::E / error_range).ceil() as usize, 16),
max((failure_probability.ln() / 0.5f64.ln()).ceil() as usize, 2),
)
}
pub fn optimal(items: usize) -> Self {
let (slots, hashes) = Self::optimal_paras(items);
Self::new(hashes, slots)
}
pub fn compact(items: usize) -> Self {
let (slots, hashes) = Self::optimal_paras(items / 100);
Self::new(hashes, slots)
}
pub fn new(hashes: usize, slots: usize) -> Self {
let mut estimator = Vec::with_capacity(hashes);
for _ in 0..hashes {
let mut slot = Vec::with_capacity(slots);
for _ in 0..slots {
slot.push(AtomicU8::new(0));
}
estimator.push((slot.into_boxed_slice(), RandomState::new()));
}
Estimator {
estimator: estimator.into_boxed_slice(),
}
}
pub fn incr<T: Hash>(&self, key: T) -> u8 {
let mut min = u8::MAX;
for (slot, hasher) in self.estimator.iter() {
let hash = hasher.hash_one(&key) as usize;
let counter = &slot[hash % slot.len()];
let (_current, new) = incr_no_overflow(counter);
min = std::cmp::min(min, new);
}
min
}
pub fn get<T: Hash>(&self, key: T) -> u8 {
let mut min = u8::MAX;
for (slot, hasher) in self.estimator.iter() {
let hash = hasher.hash_one(&key) as usize;
let counter = &slot[hash % slot.len()];
let current = counter.load(Ordering::Relaxed);
min = std::cmp::min(min, current);
}
min
}
pub fn exponential_decay(&self, decay_factor: f32) {
for (slot, _) in self.estimator.iter() {
for counter in slot.iter() {
let c = counter.load(Ordering::Relaxed);
let new_value = (c as f32 * decay_factor).round() as u8;
counter.store(new_value, Ordering::Relaxed);
}
}
}
pub fn age(&self, shift: u8) {
for (slot, _) in self.estimator.iter() {
for counter in slot.iter() {
let c = counter.load(Ordering::Relaxed);
counter.store(c >> shift, Ordering::Relaxed);
}
}
}
}
fn incr_no_overflow(var: &AtomicU8) -> (u8, u8) {
loop {
let current = var.load(Ordering::Relaxed);
if current == u8::MAX {
return (current, current);
}
let new = if current == u8::MAX - 1 {
u8::MAX
} else {
current + 1
};
if let Err(new) = var.compare_exchange(current, new, Ordering::Acquire, Ordering::Relaxed) {
if new == u8::MAX {
return (current, new);
} } else {
return (current, new);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cmk_paras() {
let (slots, hashes) = Estimator::optimal_paras(1_000_000);
assert_eq!(slots, 2718282);
assert_eq!(hashes, 20);
}
}