use std::hash::Hash;
use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::codec::assert::ensure_preamble_longs_in;
use crate::codec::assert::ensure_serial_version_is;
use crate::codec::assert::insufficient_data;
use crate::codec::family::Family;
use crate::common::NumStdDev;
use crate::common::inv_pow2_table::INVERSE_POWERS_OF_2;
use crate::cpc::DEFAULT_LG_K;
use crate::cpc::Flavor;
use crate::cpc::MAX_LG_K;
use crate::cpc::MIN_LG_K;
use crate::cpc::compression::CompressedState;
use crate::cpc::count_bits_set_in_matrix;
use crate::cpc::determine_correct_offset;
use crate::cpc::determine_flavor;
use crate::cpc::estimator::estimate;
use crate::cpc::estimator::lower_bound;
use crate::cpc::estimator::upper_bound;
use crate::cpc::kxp_byte_lookup::KXP_BYTE_TABLE;
use crate::cpc::pair_table::PairTable;
use crate::cpc::serialization::FLAG_COMPRESSED;
use crate::cpc::serialization::FLAG_HAS_HIP;
use crate::cpc::serialization::FLAG_HAS_TABLE;
use crate::cpc::serialization::FLAG_HAS_WINDOW;
use crate::cpc::serialization::SERIAL_VERSION;
use crate::cpc::serialization::make_preamble_ints;
use crate::error::Error;
use crate::error::ErrorKind;
use crate::hash::DEFAULT_UPDATE_SEED;
use crate::hash::MurmurHash3X64128;
use crate::hash::compute_seed_hash;
#[derive(Debug, Clone)]
pub struct CpcSketch {
lg_k: u8,
seed: u64,
seed_hash: u16,
pub(super) first_interesting_column: u8,
pub(super) num_coupons: u32,
pub(super) surprising_value_table: Option<PairTable>,
pub(super) window_offset: u8,
pub(super) sliding_window: Vec<u8>,
pub(super) merge_flag: bool,
kxp: f64,
hip_est_accum: f64,
}
impl Default for CpcSketch {
fn default() -> Self {
Self::new(DEFAULT_LG_K)
}
}
impl CpcSketch {
pub fn new(lg_k: u8) -> Self {
Self::with_seed(lg_k, DEFAULT_UPDATE_SEED)
}
pub fn with_seed(lg_k: u8, seed: u64) -> Self {
assert!(
(MIN_LG_K..=MAX_LG_K).contains(&lg_k),
"lg_k out of range; got {lg_k}",
);
Self {
lg_k,
seed,
seed_hash: compute_seed_hash(seed),
first_interesting_column: 0,
num_coupons: 0,
surprising_value_table: None,
window_offset: 0,
sliding_window: vec![],
merge_flag: false,
kxp: (1 << lg_k) as f64,
hip_est_accum: 0.0,
}
}
pub fn lg_k(&self) -> u8 {
self.lg_k
}
pub fn estimate(&self) -> f64 {
estimate(
self.merge_flag,
self.hip_est_accum,
self.lg_k,
self.num_coupons,
)
}
pub fn lower_bound(&self, kappa: NumStdDev) -> f64 {
lower_bound(
self.merge_flag,
self.hip_est_accum,
self.lg_k,
self.num_coupons,
kappa,
)
}
pub fn upper_bound(&self, kappa: NumStdDev) -> f64 {
upper_bound(
self.merge_flag,
self.hip_est_accum,
self.lg_k,
self.num_coupons,
kappa,
)
}
pub fn is_empty(&self) -> bool {
self.num_coupons == 0
}
pub fn update<T: Hash>(&mut self, value: T) {
let mut hasher = MurmurHash3X64128::with_seed(self.seed);
value.hash(&mut hasher);
let (h1, h2) = hasher.finish128();
let k = 1 << self.lg_k;
let col = h2.leading_zeros(); let col = if col > 63 { 63 } else { col as u8 }; let row = (h1 & (k - 1)) as u32;
let mut row_col = (row << 6) | (col as u32);
if row_col == u32::MAX {
row_col ^= 1 << 6;
}
self.row_col_update(row_col);
}
pub(super) fn flavor(&self) -> Flavor {
determine_flavor(self.lg_k, self.num_coupons)
}
pub(super) fn row_col_update(&mut self, row_col: u32) {
let col = (row_col & 63) as u8;
if col < self.first_interesting_column {
return;
}
if self.num_coupons == 0 {
self.surprising_value_table = Some(PairTable::new(2, 6 + self.lg_k));
}
if self.sliding_window.is_empty() {
self.update_sparse(row_col);
} else {
self.update_windowed(row_col);
}
}
pub(super) fn seed(&self) -> u64 {
self.seed
}
pub(super) fn surprising_value_table(&self) -> &PairTable {
self.surprising_value_table
.as_ref()
.expect("surprising value table must be initialized")
}
pub(super) fn mut_surprising_value_table(&mut self) -> &mut PairTable {
self.surprising_value_table
.as_mut()
.expect("surprising value table must be initialized")
}
fn update_hip(&mut self, row_col: u32) {
let k = 1 << self.lg_k;
let col = (row_col & 63) as usize;
let one_over_p = (k as f64) / self.kxp;
self.hip_est_accum += one_over_p;
self.kxp -= INVERSE_POWERS_OF_2[col + 1] }
fn update_sparse(&mut self, row_col: u32) {
let k = 1 << self.lg_k;
let c32pre = (self.num_coupons as u64) << 5;
debug_assert!(c32pre < 3 * k); let is_novel = self.mut_surprising_value_table().maybe_insert(row_col);
if is_novel {
self.num_coupons += 1;
self.update_hip(row_col);
let c32post = (self.num_coupons as u64) << 5;
if c32post >= 3 * k {
self.promote_sparse_to_windowed();
}
}
}
fn promote_sparse_to_windowed(&mut self) {
debug_assert_eq!(self.window_offset, 0);
let k = 1 << self.lg_k;
let c32 = (self.num_coupons as u64) << 5;
debug_assert!((c32 == (3 * k)) || ((self.lg_k == 4) && (c32 > (3 * k))));
self.sliding_window.resize(k as usize, 0);
let old_table = self
.surprising_value_table
.replace(PairTable::new(2, 6 + self.lg_k))
.expect("surprising value table must be initialized");
let old_slots = old_table.slots();
for &row_col in old_slots {
if row_col != u32::MAX {
let col = (row_col & 63) as u8;
if col < 8 {
let row = (row_col >> 6) as usize;
self.sliding_window[row] |= 1 << col;
} else {
let is_novel = self.mut_surprising_value_table().maybe_insert(row_col);
debug_assert!(is_novel);
}
}
}
}
fn update_windowed(&mut self, row_col: u32) {
debug_assert!(self.window_offset <= 56);
let k = 1 << self.lg_k;
let c32pre = (self.num_coupons as u64) << 5;
debug_assert!(c32pre >= 3 * k); let c8pre = (self.num_coupons as u64) << 3;
let w8pre = (self.window_offset as u64) << 3;
debug_assert!(c8pre < (27 + w8pre) * k);
let mut is_novel = false; let col = (row_col & 63) as u8;
if col < self.window_offset {
is_novel = self.mut_surprising_value_table().maybe_delete(row_col); } else if col < self.window_offset + 8 {
let row = (row_col >> 6) as usize;
let old_bits = self.sliding_window[row];
let new_bits = old_bits | (1 << (col - self.window_offset));
if old_bits != new_bits {
self.sliding_window[row] = new_bits;
is_novel = true;
}
} else {
is_novel = self.mut_surprising_value_table().maybe_insert(row_col); }
if is_novel {
self.num_coupons += 1;
self.update_hip(row_col);
let c8post = (self.num_coupons as u64) << 3;
if c8post >= (27 + w8pre) * k {
self.move_window();
debug_assert!((1..=56).contains(&self.window_offset));
let w8post = (self.window_offset as u64) << 3;
debug_assert!(c8post < ((27 + w8post) * k)); }
}
}
fn move_window(&mut self) {
let new_offset = self.window_offset + 1;
debug_assert!(new_offset <= 56);
debug_assert_eq!(
new_offset,
determine_correct_offset(self.lg_k, self.num_coupons)
);
let k = 1 << self.lg_k;
let bit_matrix = self.build_bit_matrix();
if (new_offset & 0x7) == 0 {
self.refresh_kxp(&bit_matrix);
}
self.mut_surprising_value_table().clear();
let mask_for_clearing_window = (0xFF << new_offset) ^ u64::MAX;
let mask_for_flipping_early_zone = (1u64 << new_offset) - 1;
let mut all_surprises_ored = 0u64;
for i in 0..k {
let mut pattern = bit_matrix[i];
self.sliding_window[i] = ((pattern >> new_offset) & 0xff) as u8;
pattern &= mask_for_clearing_window;
pattern ^= mask_for_flipping_early_zone;
all_surprises_ored |= pattern; while pattern != 0 {
let col = pattern.trailing_zeros();
pattern ^= 1 << col; let row_col = ((i as u32) << 6) | col;
let is_novel = self.mut_surprising_value_table().maybe_insert(row_col);
debug_assert!(is_novel);
}
}
self.window_offset = new_offset;
self.first_interesting_column = all_surprises_ored.trailing_zeros() as u8;
if self.first_interesting_column > new_offset {
self.first_interesting_column = new_offset; }
}
fn refresh_kxp(&mut self, bit_matrix: &[u64]) {
let mut byte_sums = [0.0; 8];
for &bit in bit_matrix {
let mut word = bit;
for sum in byte_sums.iter_mut() {
let byte = (word & 0xFF) as usize;
*sum += KXP_BYTE_TABLE[byte];
word >>= 8;
}
}
let mut total = 0.0;
for i in (0..8).rev() {
let factor = INVERSE_POWERS_OF_2[i * 8]; total += factor * byte_sums[i];
}
self.kxp = total;
}
pub(super) fn build_bit_matrix(&self) -> Vec<u64> {
let k = 1 << self.lg_k;
let offset = self.window_offset;
debug_assert!(offset <= 56);
let default_row = (1u64 << offset) - 1;
let mut matrix = vec![default_row; k];
if self.num_coupons == 0 {
return matrix;
}
if !self.sliding_window.is_empty() {
for i in 0..k {
matrix[i] |= (self.sliding_window[i] as u64) << offset;
}
}
for &row_col in self.surprising_value_table().slots() {
if row_col != u32::MAX {
let col = (row_col & 63) as u8;
let row = (row_col >> 6) as usize;
matrix[row] ^= 1 << col;
}
}
matrix
}
}
impl CpcSketch {
pub fn serialize(&self) -> Vec<u8> {
let mut bytes = SketchBytes::with_capacity(256);
let mut compressed = CompressedState::default();
compressed.compress(self);
let has_hip = !self.merge_flag;
let has_table = !compressed.table_data.is_empty();
let has_window = !compressed.window_data.is_empty();
let preamble_ints = make_preamble_ints(self.num_coupons, has_hip, has_table, has_window);
bytes.write_u8(preamble_ints);
bytes.write_u8(SERIAL_VERSION);
bytes.write_u8(Family::CPC.id);
bytes.write_u8(self.lg_k);
bytes.write_u8(self.first_interesting_column);
let flags = (1 << FLAG_COMPRESSED)
| (if has_hip { 1 } else { 0 } << FLAG_HAS_HIP)
| (if has_table { 1 } else { 0 } << FLAG_HAS_TABLE)
| (if has_window { 1 } else { 0 } << FLAG_HAS_WINDOW);
bytes.write_u8(flags);
debug_assert_eq!(self.seed_hash, compute_seed_hash(self.seed));
bytes.write_u16_le(self.seed_hash);
if !self.is_empty() {
bytes.write_u32_le(self.num_coupons);
if has_table && has_window {
bytes.write_u32_le(compressed.table_num_entries);
if has_hip {
self.write_hip(&mut bytes);
}
}
if has_table {
debug_assert!(compressed.table_data_words <= u32::MAX as usize);
bytes.write_u32_le(compressed.table_data_words as u32);
}
if has_window {
debug_assert!(compressed.window_data_words <= u32::MAX as usize);
bytes.write_u32_le(compressed.window_data_words as u32);
}
if has_hip && !(has_table && has_window) {
self.write_hip(&mut bytes);
}
if has_window {
for i in 0..compressed.window_data_words {
bytes.write_u32_le(compressed.window_data[i]);
}
}
if has_table {
for i in 0..compressed.table_data_words {
bytes.write_u32_le(compressed.table_data[i]);
}
}
}
bytes.into_bytes()
}
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_with_seed(bytes, DEFAULT_UPDATE_SEED)
}
pub fn deserialize_with_seed(bytes: &[u8], seed: u64) -> Result<Self, Error> {
let mut cursor = SketchSlice::new(bytes);
let preamble_ints = cursor
.read_u8()
.map_err(insufficient_data("preamble_ints"))?;
let serial_version = cursor
.read_u8()
.map_err(insufficient_data("serial_version"))?;
let family_id = cursor.read_u8().map_err(insufficient_data("family_id"))?;
Family::CPC.validate_id(family_id)?;
ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
let lg_k = cursor.read_u8().map_err(insufficient_data("lg_k"))?;
let first_interesting_column = cursor
.read_u8()
.map_err(insufficient_data("first_interesting_column"))?;
let flags = cursor.read_u8().map_err(insufficient_data("flags"))?;
let seed_hash = cursor
.read_u16_le()
.map_err(insufficient_data("seed_hash"))?;
let is_compressed = flags & (1 << FLAG_COMPRESSED) != 0;
if !is_compressed {
return Err(Error::new(
ErrorKind::InvalidData,
"only compressed sketches are supported",
));
}
let has_hip = flags & (1 << FLAG_HAS_HIP) != 0;
let has_table = flags & (1 << FLAG_HAS_TABLE) != 0;
let has_window = flags & (1 << FLAG_HAS_WINDOW) != 0;
let mut compressed = CompressedState::default();
let mut num_coupons = 0;
let mut kxp = 0.0;
let mut hip_est_accum = 0.0;
if has_table || has_window {
num_coupons = cursor
.read_u32_le()
.map_err(insufficient_data("num_coupons"))?;
if has_table && has_window {
compressed.table_num_entries = cursor
.read_u32_le()
.map_err(insufficient_data("table_num_entries"))?;
if has_hip {
kxp = cursor.read_f64_le().map_err(insufficient_data("kxp"))?;
hip_est_accum = cursor
.read_f64_le()
.map_err(insufficient_data("hip_est_accum"))?;
}
}
if has_table {
compressed.table_data_words = cursor
.read_u32_le()
.map_err(insufficient_data("table_data_words"))?
as usize;
}
if has_window {
compressed.window_data_words = cursor
.read_u32_le()
.map_err(insufficient_data("window_data_words"))?
as usize;
}
if has_hip && !(has_table && has_window) {
kxp = cursor.read_f64_le().map_err(insufficient_data("kxp"))?;
hip_est_accum = cursor
.read_f64_le()
.map_err(insufficient_data("hip_est_accum"))?;
}
if has_window {
for _ in 0..compressed.window_data_words {
let word = cursor
.read_u32_le()
.map_err(insufficient_data("window_data"))?;
compressed.window_data.push(word);
}
}
if has_table {
for _ in 0..compressed.table_data_words {
let word = cursor
.read_u32_le()
.map_err(insufficient_data("table_data"))?;
compressed.table_data.push(word);
}
}
if !has_window {
compressed.table_num_entries = num_coupons;
}
}
let expected_preamble_ints =
make_preamble_ints(num_coupons, has_hip, has_table, has_window);
ensure_preamble_longs_in(&[expected_preamble_ints], preamble_ints)?;
if seed_hash != compute_seed_hash(seed) {
return Err(Error::new(
ErrorKind::InvalidData,
format!(
"seed hash mismatch: expected {}, got {}",
compute_seed_hash(seed),
seed_hash
),
));
}
if !(MIN_LG_K..=MAX_LG_K).contains(&lg_k) {
return Err(Error::invalid_argument(format!(
"lg_k out of range; got {}",
lg_k
)));
}
if first_interesting_column > 63 {
return Err(Error::invalid_argument(format!(
"first_interesting_column out of range; got {}",
first_interesting_column
)));
}
let uncompressed = compressed.uncompress(lg_k, num_coupons);
Ok(CpcSketch {
lg_k,
seed,
seed_hash,
first_interesting_column,
num_coupons,
surprising_value_table: Some(uncompressed.table),
window_offset: determine_correct_offset(lg_k, num_coupons),
sliding_window: uncompressed.window,
merge_flag: !has_hip,
kxp,
hip_est_accum,
})
}
fn write_hip(&self, bytes: &mut SketchBytes) {
bytes.write_f64_le(self.kxp);
bytes.write_f64_le(self.hip_est_accum);
}
}
impl CpcSketch {
pub fn max_serialized_bytes(lg_k: u8) -> usize {
assert!(
(MIN_LG_K..=MAX_LG_K).contains(&lg_k),
"lg_k out of range; got {lg_k}",
);
const MAX_PREAMBLE_SIZE_BYTES: usize = 40;
const EMPIRICAL_SIZE_MAX_LGK: u8 = 19;
const EMPIRICAL_MAX_SIZE_FACTOR: f64 = 0.6; static EMPIRICAL_MAX_SIZE_BYTES: [usize; 16] = [
24, 36, 56, 100, 180, 344, 660, 1292, 2540, 5020, 9968, 19836, 39532, 78880, 157516, 314656, ];
if lg_k <= EMPIRICAL_SIZE_MAX_LGK {
EMPIRICAL_MAX_SIZE_BYTES[(lg_k - MIN_LG_K) as usize] + MAX_PREAMBLE_SIZE_BYTES
} else {
let k = 1 << lg_k;
((EMPIRICAL_MAX_SIZE_FACTOR * k as f64) as usize) + MAX_PREAMBLE_SIZE_BYTES
}
}
}
impl CpcSketch {
pub fn validate(&self) -> bool {
let bit_matrix = self.build_bit_matrix();
let num_bits_set = count_bits_set_in_matrix(&bit_matrix);
num_bits_set == self.num_coupons
}
pub fn num_coupons(&self) -> u32 {
self.num_coupons
}
}