use super::aux_map::AuxMap;
use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::common::NumStdDev;
use crate::error::Error;
use crate::hll::estimator::HipEstimator;
use crate::hll::get_slot;
use crate::hll::get_value;
const AUX_TOKEN: u8 = 15;
#[derive(Debug, Clone, PartialEq)]
pub struct Array4 {
lg_config_k: u8,
bytes: Box<[u8]>,
cur_min: u8,
num_at_cur_min: u32,
aux_map: Option<AuxMap>,
estimator: HipEstimator,
}
impl Array4 {
pub fn new(lg_config_k: u8) -> Self {
let num_bytes = 1 << (lg_config_k - 1);
let num_at_cur_min = 1 << lg_config_k;
Self {
lg_config_k,
bytes: vec![0u8; num_bytes].into_boxed_slice(),
cur_min: 0,
num_at_cur_min,
aux_map: None,
estimator: HipEstimator::new(lg_config_k),
}
}
#[inline]
fn get_raw(&self, slot: u32) -> u8 {
debug_assert!(slot >> 1 < self.bytes.len() as u32);
let byte = self.bytes[(slot >> 1) as usize];
if slot & 1 == 0 {
byte & 15 } else {
byte >> 4 }
}
pub(super) fn get(&self, slot: u32) -> u8 {
let raw = self.get_raw(slot);
if raw < AUX_TOKEN {
self.cur_min + raw
} else {
self.aux_map
.as_ref()
.and_then(|map| map.get(slot))
.unwrap_or(self.cur_min) }
}
pub(super) fn num_registers(&self) -> usize {
1 << self.lg_config_k
}
pub(super) fn hip_accum(&self) -> f64 {
self.estimator.hip_accum()
}
#[inline]
fn put_raw(&mut self, slot: u32, value: u8) {
debug_assert!(value <= AUX_TOKEN);
debug_assert!(slot >> 1 < self.bytes.len() as u32);
let byte_idx = (slot >> 1) as usize;
let old_byte = self.bytes[byte_idx];
self.bytes[byte_idx] = if slot & 1 == 0 {
(old_byte & 0xF0) | (value & 0x0F) } else {
(old_byte & 0x0F) | (value << 4) };
}
pub fn update(&mut self, coupon: u32) {
let mask = (1 << self.lg_config_k) - 1;
let slot = get_slot(coupon) & mask;
let new_value = get_value(coupon);
if new_value <= self.cur_min {
return;
}
let raw_stored = self.get_raw(slot);
let lower_bound = raw_stored + self.cur_min;
if new_value <= lower_bound {
return;
}
let old_value = if raw_stored < AUX_TOKEN {
lower_bound
} else {
self.aux_map
.as_ref()
.expect("aux_map should be initialized since stored value is AUX_TOKEN")
.get(slot)
.expect("slot should be in aux_map since associated value is AUX_TOKEN")
};
if new_value <= old_value {
return;
}
self.estimator
.update(self.lg_config_k, old_value, new_value);
let shifted_new = new_value - self.cur_min;
match (raw_stored, shifted_new) {
(AUX_TOKEN, shifted) if shifted >= AUX_TOKEN => {
self.aux_map
.as_mut()
.expect("aux_map should be initialized since stored value is AUX_TOKEN")
.replace(slot, new_value);
}
(AUX_TOKEN, _) => {
unreachable!("AUX_TOKEN present with non-exception new value");
}
(_, shifted) if shifted >= AUX_TOKEN => {
self.put_raw(slot, AUX_TOKEN);
let aux = self
.aux_map
.get_or_insert_with(|| AuxMap::new(self.lg_config_k));
aux.insert(slot, new_value);
}
_ => {
self.put_raw(slot, shifted_new);
}
}
if old_value == self.cur_min {
self.num_at_cur_min -= 1;
while self.num_at_cur_min == 0 {
self.shift_to_bigger_cur_min();
}
}
}
fn shift_to_bigger_cur_min(&mut self) {
let new_cur_min = self.cur_min + 1;
let k = 1 << self.lg_config_k;
let mut num_at_new = 0;
for slot in 0..k {
let raw = self.get_raw(slot);
debug_assert_ne!(raw, 0, "value cannot be 0 when shifting cur_min");
if raw < AUX_TOKEN {
let decremented = raw - 1;
self.put_raw(slot, decremented);
if decremented == 0 {
num_at_new += 1;
}
}
}
if let Some(old_aux) = self.aux_map.take() {
let mut new_aux = None;
for (slot, old_actual_val) in old_aux.into_iter() {
debug_assert_ne!(
self.get_raw(slot),
AUX_TOKEN,
"AuxMap contains slow without AUX_TOKEN"
);
let new_shifted = old_actual_val - new_cur_min;
if new_shifted < AUX_TOKEN {
self.put_raw(slot, new_shifted);
} else {
let aux = new_aux.get_or_insert_with(|| AuxMap::new(self.lg_config_k));
aux.insert(slot, old_actual_val);
}
}
self.aux_map = new_aux;
}
self.cur_min = new_cur_min;
self.num_at_cur_min = num_at_new;
}
pub fn estimate(&self) -> f64 {
self.estimator
.estimate(self.lg_config_k, self.cur_min, self.num_at_cur_min)
}
pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 {
self.estimator.upper_bound(
self.lg_config_k,
self.cur_min,
self.num_at_cur_min,
num_std_dev,
)
}
pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 {
self.estimator.lower_bound(
self.lg_config_k,
self.cur_min,
self.num_at_cur_min,
num_std_dev,
)
}
pub fn set_hip_accum(&mut self, value: f64) {
self.estimator.set_hip_accum(value);
}
pub fn is_empty(&self) -> bool {
self.num_at_cur_min == (1 << self.lg_config_k) && self.cur_min == 0
}
pub fn deserialize(
mut cursor: SketchSlice,
cur_min: u8,
lg_config_k: u8,
compact: bool,
ooo: bool,
) -> Result<Self, Error> {
use crate::hll::get_slot;
use crate::hll::get_value;
fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error {
move |_| Error::insufficient_data(tag)
}
let num_bytes = 1 << (lg_config_k - 1);
let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?;
let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?;
let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?;
let num_at_cur_min = cursor.read_u32_le().map_err(make_error("num_at_cur_min"))?;
let aux_count = cursor.read_u32_le().map_err(make_error("aux_count"))?;
let mut data = vec![0u8; num_bytes];
if !compact {
cursor.read_exact(&mut data).map_err(make_error("data"))?;
} else {
cursor.advance(num_bytes as u64);
}
let mut aux_map = None;
if aux_count > 0 {
let mut aux = AuxMap::new(lg_config_k);
for i in 0..aux_count {
let coupon = cursor.read_u32_le().map_err(|_| {
Error::insufficient_data(format!(
"expected {aux_count} aux coupons, failed at index {i}",
))
})?;
let slot = get_slot(coupon) & ((1 << lg_config_k) - 1);
let value = get_value(coupon);
aux.insert(slot, value);
}
aux_map = Some(aux);
}
let mut estimator = HipEstimator::new(lg_config_k);
estimator.set_hip_accum(hip_accum);
estimator.set_kxq0(kxq0);
estimator.set_kxq1(kxq1);
estimator.set_out_of_order(ooo);
Ok(Self {
lg_config_k,
bytes: data.into_boxed_slice(),
cur_min,
num_at_cur_min,
aux_map,
estimator,
})
}
pub fn serialize(&self, lg_config_k: u8) -> Vec<u8> {
use crate::hll::pack_coupon;
use crate::hll::serialization::*;
let num_bytes = 1 << (lg_config_k - 1);
let aux_entries: Vec<(u32, u8)> = if let Some(aux) = &self.aux_map {
aux.iter().collect()
} else {
vec![]
};
let aux_count = aux_entries.len() as u32;
let total_size = HLL_PREAMBLE_SIZE + num_bytes + (aux_count as usize * COUPON_SIZE_BYTES);
let mut bytes = SketchBytes::with_capacity(total_size);
bytes.write_u8(HLL_PREINTS);
bytes.write_u8(SERIAL_VERSION);
bytes.write_u8(HLL_FAMILY_ID);
bytes.write_u8(lg_config_k);
bytes.write_u8(0);
let mut flags = 0u8;
if self.estimator.is_out_of_order() {
flags |= OUT_OF_ORDER_FLAG_MASK;
}
bytes.write_u8(flags);
bytes.write_u8(self.cur_min);
bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL4));
bytes.write_f64_le(self.estimator.hip_accum());
bytes.write_f64_le(self.estimator.kxq0());
bytes.write_f64_le(self.estimator.kxq1());
bytes.write_u32_le(self.num_at_cur_min);
bytes.write_u32_le(aux_count);
bytes.write(&self.bytes);
for (slot, value) in aux_entries.iter().copied() {
let coupon = pack_coupon(slot, value);
bytes.write_u32_le(coupon);
}
bytes.into_bytes()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hll::coupon;
use crate::hll::pack_coupon;
#[test]
fn test_get_set_raw() {
let mut data = Array4::new(4);
data.put_raw(0, 5);
assert_eq!(data.get_raw(0), 5);
data.put_raw(1, 7);
assert_eq!(data.get_raw(1), 7);
assert_eq!(data.bytes[0], 0x75);
data.put_raw(2, 15);
data.put_raw(3, 3);
assert_eq!(data.get_raw(2), 15);
assert_eq!(data.get_raw(3), 3);
}
#[test]
fn test_hip_estimator_basic() {
let mut arr = Array4::new(10);
assert_eq!(arr.estimate(), 0.0);
for i in 0..10_000u32 {
let coupon = coupon(i);
arr.update(coupon);
}
let estimate = arr.estimate();
assert!(estimate > 0.0, "Estimate should be positive");
assert!(estimate.is_finite(), "Estimate should be finite");
assert!(estimate < 100_000.0, "Estimate should be reasonable");
assert!(
estimate > 1_000.0,
"Estimate seems too low for 10_000 updates"
);
assert!(
estimate < 100_000.0,
"Estimate seems too high for 10_000 updates"
);
}
#[test]
fn test_kxq_register_split() {
let mut arr = Array4::new(8);
arr.update(pack_coupon(0, 10)); arr.update(pack_coupon(1, 40));
assert!(arr.estimator.kxq0() < 256.0, "kxq0 should have decreased");
assert!(arr.estimator.kxq1() > 0.0, "kxq1 should be positive");
assert!(
arr.estimator.kxq1() < 0.001,
"kxq1 should be small (1/2^40 is tiny)"
);
}
}