use super::aux_map::AuxMap;
use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::codec::assert::insufficient_data;
use crate::codec::family::Family;
use crate::common::NumStdDev;
use crate::error::Error;
use crate::hll::Coupon;
use crate::hll::estimator::HipEstimator;
use crate::hll::serialization::COUPON_SIZE_BYTES;
use crate::hll::serialization::CUR_MODE_HLL;
use crate::hll::serialization::HLL_PREAMBLE_SIZE;
use crate::hll::serialization::HLL_PREINTS;
use crate::hll::serialization::OUT_OF_ORDER_FLAG_MASK;
use crate::hll::serialization::SERIAL_VERSION;
use crate::hll::serialization::TGT_HLL4;
use crate::hll::serialization::encode_mode_byte;
const AUX_TOKEN: u8 = 15;
#[derive(Debug, Clone, PartialEq)]
pub struct Array4 {
lg_config_k: u8,
bytes: Box<[u8]>,
cur_min: u8,
num_at_cur_min: u32,
aux_map: Option<AuxMap>,
estimator: HipEstimator,
}
impl Array4 {
pub fn new(lg_config_k: u8) -> Self {
let num_bytes = 1 << (lg_config_k - 1);
let num_at_cur_min = 1 << lg_config_k;
Self {
lg_config_k,
bytes: vec![0u8; num_bytes].into_boxed_slice(),
cur_min: 0,
num_at_cur_min,
aux_map: None,
estimator: HipEstimator::new(lg_config_k),
}
}
#[inline]
fn get_raw(&self, slot: u32) -> u8 {
debug_assert!(slot >> 1 < self.bytes.len() as u32);
let byte = self.bytes[(slot >> 1) as usize];
if slot & 1 == 0 {
byte & 15 } else {
byte >> 4 }
}
pub(super) fn get(&self, slot: u32) -> u8 {
let raw = self.get_raw(slot);
if raw < AUX_TOKEN {
self.cur_min + raw
} else {
self.aux_map
.as_ref()
.and_then(|map| map.get(slot))
.unwrap_or(self.cur_min) }
}
pub(super) fn num_registers(&self) -> usize {
1 << self.lg_config_k
}
pub(super) fn hip_accum(&self) -> f64 {
self.estimator.hip_accum()
}
#[inline]
fn put_raw(&mut self, slot: u32, value: u8) {
debug_assert!(value <= AUX_TOKEN);
debug_assert!(slot >> 1 < self.bytes.len() as u32);
let byte_idx = (slot >> 1) as usize;
let old_byte = self.bytes[byte_idx];
self.bytes[byte_idx] = if slot & 1 == 0 {
(old_byte & 0xF0) | (value & 0x0F) } else {
(old_byte & 0x0F) | (value << 4) };
}
pub fn update(&mut self, coupon: Coupon) {
let mask = (1 << self.lg_config_k) - 1;
let slot = coupon.slot() & mask;
let new_value = coupon.value();
if new_value <= self.cur_min {
return;
}
let raw_stored = self.get_raw(slot);
let lower_bound = raw_stored + self.cur_min;
if new_value <= lower_bound {
return;
}
let old_value = if raw_stored < AUX_TOKEN {
lower_bound
} else {
self.aux_map
.as_ref()
.expect("aux_map should be initialized since stored value is AUX_TOKEN")
.get(slot)
.expect("slot should be in aux_map since associated value is AUX_TOKEN")
};
if new_value <= old_value {
return;
}
self.estimator
.update(self.lg_config_k, old_value, new_value);
let shifted_new = new_value - self.cur_min;
match (raw_stored, shifted_new) {
(AUX_TOKEN, shifted) if shifted >= AUX_TOKEN => {
self.aux_map
.as_mut()
.expect("aux_map should be initialized since stored value is AUX_TOKEN")
.replace(slot, new_value);
}
(AUX_TOKEN, _) => {
unreachable!("AUX_TOKEN present with non-exception new value");
}
(_, shifted) if shifted >= AUX_TOKEN => {
self.put_raw(slot, AUX_TOKEN);
let aux = self
.aux_map
.get_or_insert_with(|| AuxMap::new(self.lg_config_k));
aux.insert(slot, new_value);
}
_ => {
self.put_raw(slot, shifted_new);
}
}
if old_value == self.cur_min {
self.num_at_cur_min -= 1;
while self.num_at_cur_min == 0 {
self.shift_to_bigger_cur_min();
}
}
}
fn shift_to_bigger_cur_min(&mut self) {
let new_cur_min = self.cur_min + 1;
let k = 1 << self.lg_config_k;
let mut num_at_new = 0;
for slot in 0..k {
let raw = self.get_raw(slot);
debug_assert_ne!(raw, 0, "value cannot be 0 when shifting cur_min");
if raw < AUX_TOKEN {
let decremented = raw - 1;
self.put_raw(slot, decremented);
if decremented == 0 {
num_at_new += 1;
}
}
}
if let Some(old_aux) = self.aux_map.take() {
let mut new_aux = None;
for (slot, old_actual_val) in old_aux.into_iter() {
debug_assert_eq!(
self.get_raw(slot),
AUX_TOKEN,
"AuxMap contains slot != AUX_TOKEN"
);
let new_shifted = old_actual_val - new_cur_min;
if new_shifted < AUX_TOKEN {
self.put_raw(slot, new_shifted);
} else {
let aux = new_aux.get_or_insert_with(|| AuxMap::new(self.lg_config_k));
aux.insert(slot, old_actual_val);
}
}
self.aux_map = new_aux;
}
self.cur_min = new_cur_min;
self.num_at_cur_min = num_at_new;
}
pub fn estimate(&self) -> f64 {
self.estimator
.estimate(self.lg_config_k, self.cur_min, self.num_at_cur_min)
}
pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 {
self.estimator.upper_bound(
self.lg_config_k,
self.cur_min,
self.num_at_cur_min,
num_std_dev,
)
}
pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 {
self.estimator.lower_bound(
self.lg_config_k,
self.cur_min,
self.num_at_cur_min,
num_std_dev,
)
}
pub fn set_hip_accum(&mut self, value: f64) {
self.estimator.set_hip_accum(value);
}
pub fn is_empty(&self) -> bool {
self.num_at_cur_min == (1 << self.lg_config_k) && self.cur_min == 0
}
pub fn deserialize(
mut cursor: SketchSlice,
cur_min: u8,
lg_config_k: u8,
compact: bool,
ooo: bool,
) -> Result<Self, Error> {
let num_bytes = 1 << (lg_config_k - 1);
let hip_accum = cursor
.read_f64_le()
.map_err(insufficient_data("hip_accum"))?;
let kxq0 = cursor.read_f64_le().map_err(insufficient_data("kxq0"))?;
let kxq1 = cursor.read_f64_le().map_err(insufficient_data("kxq1"))?;
let num_at_cur_min = cursor
.read_u32_le()
.map_err(insufficient_data("num_at_cur_min"))?;
let aux_count = cursor
.read_u32_le()
.map_err(insufficient_data("aux_count"))?;
let mut data = vec![0u8; num_bytes];
if !compact {
cursor
.read_exact(&mut data)
.map_err(insufficient_data("data"))?;
} else {
cursor.advance(num_bytes as u64);
}
let mut aux_map = None;
if aux_count > 0 {
let mut aux = AuxMap::new(lg_config_k);
for i in 0..aux_count {
let coupon = cursor.read_u32_le().map_err(|_| {
Error::insufficient_data(format!(
"expected {aux_count} aux coupons, failed at index {i}",
))
})?;
let coupon = Coupon(coupon);
let slot = coupon.slot() & ((1 << lg_config_k) - 1);
let value = coupon.value();
aux.insert(slot, value);
}
aux_map = Some(aux);
}
let mut estimator = HipEstimator::new(lg_config_k);
estimator.set_hip_accum(hip_accum);
estimator.set_kxq0(kxq0);
estimator.set_kxq1(kxq1);
estimator.set_out_of_order(ooo);
Ok(Self {
lg_config_k,
bytes: data.into_boxed_slice(),
cur_min,
num_at_cur_min,
aux_map,
estimator,
})
}
pub fn serialize(&self, lg_config_k: u8) -> Vec<u8> {
let num_bytes = 1 << (lg_config_k - 1);
let aux_entries: Vec<(u32, u8)> = if let Some(aux) = &self.aux_map {
aux.iter().collect()
} else {
vec![]
};
let aux_count = aux_entries.len() as u32;
let total_size = HLL_PREAMBLE_SIZE + num_bytes + (aux_count as usize * COUPON_SIZE_BYTES);
let mut bytes = SketchBytes::with_capacity(total_size);
bytes.write_u8(HLL_PREINTS);
bytes.write_u8(SERIAL_VERSION);
bytes.write_u8(Family::HLL.id);
bytes.write_u8(lg_config_k);
bytes.write_u8(0);
let mut flags = 0u8;
if self.estimator.is_out_of_order() {
flags |= OUT_OF_ORDER_FLAG_MASK;
}
bytes.write_u8(flags);
bytes.write_u8(self.cur_min);
bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL4));
bytes.write_f64_le(self.estimator.hip_accum());
bytes.write_f64_le(self.estimator.kxq0());
bytes.write_f64_le(self.estimator.kxq1());
bytes.write_u32_le(self.num_at_cur_min);
bytes.write_u32_le(aux_count);
bytes.write(&self.bytes);
for (slot, value) in aux_entries.iter().copied() {
bytes.write_u32_le(Coupon::pack(slot, value).raw());
}
bytes.into_bytes()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hll::Coupon;
#[test]
fn test_get_set_raw() {
let mut data = Array4::new(4);
data.put_raw(0, 5);
assert_eq!(data.get_raw(0), 5);
data.put_raw(1, 7);
assert_eq!(data.get_raw(1), 7);
assert_eq!(data.bytes[0], 0x75);
data.put_raw(2, 15);
data.put_raw(3, 3);
assert_eq!(data.get_raw(2), 15);
assert_eq!(data.get_raw(3), 3);
}
#[test]
fn test_hip_estimator_basic() {
let mut arr = Array4::new(10);
assert_eq!(arr.estimate(), 0.0);
for i in 0..10_000u32 {
arr.update(Coupon::from_hash(i));
}
let estimate = arr.estimate();
assert!(estimate > 0.0, "Estimate should be positive");
assert!(estimate.is_finite(), "Estimate should be finite");
assert!(estimate < 100_000.0, "Estimate should be reasonable");
assert!(
estimate > 1_000.0,
"Estimate seems too low for 10_000 updates"
);
assert!(
estimate < 100_000.0,
"Estimate seems too high for 10_000 updates"
);
}
#[test]
fn test_kxq_register_split() {
let mut arr = Array4::new(8);
arr.update(Coupon::pack(0, 10)); arr.update(Coupon::pack(1, 40));
assert!(arr.estimator.kxq0() < 256.0, "kxq0 should have decreased");
assert!(arr.estimator.kxq1() > 0.0, "kxq1 should be positive");
assert!(
arr.estimator.kxq1() < 0.001,
"kxq1 should be small (1/2^40 is tiny)"
);
}
#[test]
fn test_shift_cur_min_rebuilds_aux_entry() {
let lg_config_k = 4;
let num_slots = 1_u32 << lg_config_k;
let mut arr = Array4::new(lg_config_k);
arr.update(Coupon::pack(0, 15));
assert_eq!(arr.get_raw(0), AUX_TOKEN);
assert_eq!(arr.aux_map.as_ref().and_then(|aux| aux.get(0)), Some(15));
for slot in 1..num_slots {
arr.update(Coupon::pack(slot, 1));
}
assert_eq!(arr.cur_min, 1);
assert_eq!(arr.num_at_cur_min, num_slots - 1);
assert_eq!(arr.get_raw(0), 14);
assert_eq!(arr.get(0), 15);
assert!(arr.aux_map.is_none());
for slot in 1..num_slots {
assert_eq!(arr.get(slot), 1);
}
}
}