use std::ptr::NonNull;
use std::slice;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use cxx;
use thin_dst::{ThinRef,ThinBox};
use crate::bridge::ffi;
struct ThinByteBox(ThinBox<(), u8>);
impl Borrow<[u8]> for ThinByteBox {
fn borrow(&self) -> &[u8] {
&self.0.slice
}
}
impl Hash for ThinByteBox {
fn hash<H: Hasher>(&self, state: &mut H) {
let slice: &[u8] = self.borrow();
slice.hash(state);
}
}
impl PartialEq for ThinByteBox {
fn eq(&self, other: &Self) -> bool {
let mine: &[u8] = self.borrow();
let yours: &[u8] = other.borrow();
mine.eq(yours)
}
}
impl Eq for ThinByteBox {
}
pub struct HhSketch {
inner: cxx::UniquePtr<ffi::OpaqueHhSketch>,
intern: Box<HashSet<ThinByteBox>>, lg2_k: u8
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct HhRow<'a> {
pub key: &'a [u8],
pub lb: u64,
pub ub: u64,
}
unsafe fn addr_to_thinref<'a>(addr: usize) -> ThinRef<'a, (), u8> {
let ptr = addr as *mut _;
let nonnull = NonNull::<_>::new(ptr).expect("non-null pointer");
ThinRef::<'a, (), u8>::from_erased(nonnull)
}
unsafe fn addr_to_hashset<'a>(addr: usize) -> &'a mut HashSet<ThinByteBox> {
let ptr = addr as *mut _;
let mut nonnull = NonNull::<_>::new(ptr).expect("non-null pointer");
nonnull.as_mut()
}
pub(crate) unsafe fn remove_from_hashset(hashset_addr:usize, addr: usize) {
let hs = addr_to_hashset(hashset_addr);
let thinref = addr_to_thinref(addr);
let did_remove = hs.remove(&thinref.slice);
assert!(did_remove, "thinbox {:?}", thinref);
}
impl HhSketch {
pub fn new(lg2_k: u8) -> Self {
let intern = Box::new(HashSet::<_>::default());
Self {
inner: ffi::new_opaque_hh_sketch(lg2_k, intern.as_ref() as *const _ as usize),
intern,
lg2_k,
}
}
fn thin_row_to_owned<'a>(&'a self, row: &ffi::ThinHeavyHitterRow) -> HhRow<'a> {
let thinref = unsafe { addr_to_thinref::<'a>(row.addr) };
let ptr = thinref.slice.as_ptr();
HhRow {
key: unsafe { slice::from_raw_parts(ptr, thinref.slice.len()) },
lb: row.lb,
ub: row.ub,
}
}
pub fn estimate_no_fp(&self) -> Vec<HhRow> {
self.inner
.estimate_no_fp()
.into_iter()
.map(|x| self.thin_row_to_owned(x))
.collect()
}
pub fn estimate_no_fn(&self) -> Vec<HhRow> {
self.inner
.estimate_no_fn()
.into_iter()
.map(|x| self.thin_row_to_owned(x))
.collect()
}
pub fn update(&mut self, value: &[u8], weight: u64) {
let key = if let Some(key) = self.intern.get(value) {
&*key.0
} else {
let key = ThinByteBox(ThinBox::new((), value.iter().cloned()));
self.intern.insert(key);
&*self.intern.get(value).expect("present key").0
};
let thinref = ThinRef::<(), u8>::from(key);
let key = ThinRef::<(), u8>::erase(thinref).as_ptr() as *const _ as usize;
self.inner.pin_mut().update(key, weight)
}
pub fn merge(&mut self, other: &Self) {
let state = other.inner.state();
let total_weight = self.inner.get_total_weight() + other.inner.get_total_weight();
for row in state.iter() {
let row = other.thin_row_to_owned(row);
self.update(row.key, row.lb);
}
let offset = self.inner.get_offset() + other.inner.get_offset();
self.inner.pin_mut().set_weights(total_weight, offset);
}
}
impl Clone for HhSketch {
fn clone(&self) -> Self {
let mut hh = Self::new(self.lg2_k);
hh.merge(self);
hh
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::iter;
use byte_slice_cast::{AsByteSlice, AsSliceOf};
use super::*;
fn check_cycle(s: &HhSketch) {
let mut est_fn = s.estimate_no_fn();
let mut est_fp = s.estimate_no_fp();
assert!(est_fp
.clone()
.into_iter()
.collect::<HashSet<_>>()
.is_subset(&est_fn.clone().into_iter().collect::<HashSet<_>>()));
let cpy2 = s.clone();
let cpy3 = cpy2.clone();
let cpys = [cpy2, cpy3];
est_fn.sort_unstable();
est_fp.sort_unstable();
for cpy in cpys.iter() {
let mut cpy_fn = cpy.estimate_no_fn();
cpy_fn.sort_unstable();
let mut cpy_fp = cpy.estimate_no_fp();
cpy_fp.sort_unstable();
assert_eq!(est_fn, cpy_fn);
assert_eq!(est_fp, cpy_fp);
}
}
fn row2keys(hh: &HhSketch) -> Vec<(u64, u64, u64)> {
let results = hh.estimate_no_fn();
let mut v: Vec<_> = results
.into_iter()
.map(|row| {
let key = row.key.as_slice_of::<u64>().unwrap();
assert!(row.lb <= row.ub);
(key[0], row.lb, row.ub)
})
.collect();
v.sort_unstable();
v
}
fn matches(hh: &HhSketch, expected: &[(u64, u64)]) {
let present = row2keys(&hh)
.into_iter().map(|(key, lb, ub)| {
(key, (lb, ub))
}).collect::<HashMap<_, _>>();
for &(k, v) in expected {
assert!(present.contains_key(&k), "key missing {}", k);
let (lb, ub) = present[&k];
assert!(lb <= v, "key {} lb {} incorrect (true value {})", k, lb, v);
assert!(ub >= v, "key {} ub {} incorrect (true value {})", k, ub, v);
}
}
fn matches_violations(hh: &HhSketch, expected: &[(u64, u64)]) -> usize {
let present = row2keys(&hh)
.into_iter().map(|(key, lb, ub)| {
(key, (lb, ub))
}).collect::<HashMap<_, _>>();
let mut violations = 0;
for &(k, v) in expected {
if !present.contains_key(&k) {
violations += 1;
continue;
}
let (lb, ub) = present[&k];
if lb > v || ub < v {
violations += 1;
}
}
return violations
}
#[test]
fn basic_heavy() {
for &lg2_k in &[3, 4, 5] {
let mut hh = HhSketch::new(lg2_k);
let max = 1u64 << lg2_k;
let heavies = &[max, max + 1, max + 2];
let iters = 3;
for _ in 0..iters {
for i in 0u64..max {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
for &i in heavies {
let slice = [i];
hh.update(slice.as_byte_slice(), max * 2 + 1);
}
for i in 0u64..max {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
}
matches(&hh, &heavies.iter().cloned().map(|k| (k, (max * 2 + 1) * iters)).collect::<Vec<_>>());
check_cycle(&hh);
}
}
#[test]
fn retains_all() {
for &lg2_k in &[3, 4, 5] {
let mut hh = HhSketch::new(lg2_k + 1);
for i in 0u64..(1 << lg2_k) {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
assert_eq!(
row2keys(&hh),
(0u64..(1 << lg2_k)).map(|v| (v, 1, 1)).collect::<Vec<_>>()
);
for i in 0u64..(1 << lg2_k) {
let slice = [i];
hh.update(slice.as_byte_slice(), 4)
}
assert_eq!(
row2keys(&hh),
(0u64..(1 << lg2_k)).map(|v| (v, 5, 5)).collect::<Vec<_>>()
);
check_cycle(&hh);
}
}
#[test]
fn retains_all_clone() {
for &lg2_k in &[3, 4, 5] {
let mut hh = HhSketch::new(lg2_k + 1);
for i in 0u64..(1 << lg2_k) {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
hh.merge(&hh.clone());
assert_eq!(
row2keys(&hh),
(0u64..(1 << lg2_k)).map(|v| (v, 2, 2)).collect::<Vec<_>>()
);
check_cycle(&hh);
}
}
#[test]
fn basic_merge() {
for &lg2_k in &[3, 4, 5] {
let mut hhs = vec![HhSketch::new(lg2_k); 3];
let max = 1u64 << lg2_k;
let heavies = &[max, max + 1, max + 2];
let heavy_weight = max * 2 + 1;
for (&heavy_key, hh) in heavies.iter().zip(hhs.iter_mut()) {
for i in 0u64..max {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
let slice = [heavy_key];
hh.update(slice.as_byte_slice(), heavy_weight);
for i in 0u64..max {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
check_cycle(&hh);
}
let mut hh = hhs.pop().expect("some last");
hhs.into_iter().for_each(|other| hh.merge(&other));
matches(&hh, &heavies.iter().cloned().map(|k| (k, heavy_weight)).collect::<Vec<_>>());
check_cycle(&hh);
}
}
fn check_hh_property(lg2_k: u8, stream_multiplier: u8, nunique: u8) {
use rand::prelude::*;
let k: u64 = 1u64 << lg2_k;
let n = k * (stream_multiplier as u64);
let thresh = (7 * (stream_multiplier as u64) + 1) / 2;
let mut histogram = match nunique {
1 => {
assert!(n/thresh > 1);
vec![thresh; (n / thresh) as usize]
}
2 => {
assert!(n / thresh / 2 > 1);
let mut v = vec![thresh; (n / thresh / 2) as usize];
let remain = n - (n / thresh / 2) * thresh;
assert!(remain > 0);
let low = thresh - 1;
assert!(low > 0);
v.extend(vec![low; (remain / low) as usize]);
assert!(remain / low > 0);
v
}
3 => {
let hi = thresh + 1;
let nhi = n / thresh / 3;
assert!(nhi > 0);
let mut v = vec![hi; nhi as usize];
let remain = n - nhi * hi;
let med = thresh;
let nmed = remain / med / 2;
assert!(nmed > 0);
v.extend(vec![med; nmed as usize]);
let remain = remain - nmed * med;
let low = thresh - 1;
let nlow = remain / low;
assert!(nlow > 0);
v.extend(vec![low; nlow as usize]);
v
}
_ => panic!("invalid nunique {}", nunique),
};
let sum = histogram.iter().cloned().sum::<u64>();
for _ in sum..n {
histogram.push(1);
}
let mut data = histogram.iter().cloned().enumerate()
.flat_map(|(i, repeats)| iter::repeat(i as u64).take(repeats as usize))
.collect::<Vec<_>>();
assert!(data.len() == n as usize);
let expected = histogram.iter().cloned().enumerate().filter(|(_, repeats)| *repeats >= thresh)
.map(|(k, repeats)| (k as u64, repeats))
.collect::<Vec<_>>();
let ntrials = 25;
let mut rng = StdRng::seed_from_u64(1234);
let mut failures = 0;
for _ in 0..ntrials {
data.shuffle(&mut rng);
let mut hh = HhSketch::new(lg2_k);
for &i in &data {
let slice = [i];
hh.update(slice.as_byte_slice(), 1)
}
check_cycle(&hh);
let any_invalid =row2keys(&hh)
.into_iter()
.any(|(k, lb, ub)| {
lb > histogram[k as usize] || ub < histogram[k as usize]
});
if any_invalid || matches_violations(&hh, &expected) > 0 {
failures += 1;
}
}
assert!(failures <= 1, "failures {} ntrials {} n {}", failures, ntrials, n);
}
#[test]
fn check_hh_lgk4_multiplier2_nunique1() {
check_hh_property(4, 2, 1);
}
#[test]
fn check_hh_lgk4_multiplier2_nunique2() {
check_hh_property(4, 2, 2);
}
#[test]
fn check_hh_lgk4_multiplier2_nunique3() {
check_hh_property(4, 2, 3);
}
#[test]
fn check_hh_lgk4_multiplier5_nunique1() {
check_hh_property(4, 5, 1);
}
#[test]
fn check_hh_lgk4_multiplier5_nunique2() {
check_hh_property(4, 5, 2);
}
#[test]
fn check_hh_lgk4_multiplier5_nunique3() {
check_hh_property(4, 5, 3);
}
#[test]
fn check_hh_lgk4_multiplier20_nunique1() {
check_hh_property(4, 20, 1);
}
#[test]
fn check_hh_lgk4_multiplier20_nunique2() {
check_hh_property(4, 20, 2);
}
#[test]
fn check_hh_lgk4_multiplier20_nunique3() {
check_hh_property(4, 20, 3);
}
#[test]
fn hh_empty() {
let hh = HhSketch::new(12);
assert!(hh.estimate_no_fp().is_empty());
assert!(hh.estimate_no_fn().is_empty());
check_cycle(&hh);
}
}