use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::common::NumStdDev;
use crate::error::Error;
use crate::hll::estimator::HipEstimator;
use crate::hll::get_slot;
use crate::hll::get_value;
const VAL_MASK_6: u16 = 0x3F;
#[derive(Debug, Clone, PartialEq)]
pub struct Array6 {
lg_config_k: u8,
bytes: Box<[u8]>,
num_zeros: u32,
estimator: HipEstimator,
}
impl Array6 {
pub fn new(lg_config_k: u8) -> Self {
let k = 1 << lg_config_k;
let num_bytes = num_bytes_for_k(k);
Self {
lg_config_k,
bytes: vec![0u8; num_bytes].into_boxed_slice(),
num_zeros: k,
estimator: HipEstimator::new(lg_config_k),
}
}
#[inline]
fn get_raw(&self, slot: u32) -> u8 {
let start_bit = slot * 6;
let byte_idx = (start_bit >> 3) as usize; let shift = (start_bit & 7) as u8;
let two_bytes = u16::from_le_bytes([self.bytes[byte_idx], self.bytes[byte_idx + 1]]);
((two_bytes >> shift) & VAL_MASK_6) as u8
}
#[inline]
pub(super) fn get(&self, slot: u32) -> u8 {
self.get_raw(slot)
}
pub(super) fn num_registers(&self) -> usize {
1 << self.lg_config_k
}
pub(super) fn hip_accum(&self) -> f64 {
self.estimator.hip_accum()
}
#[inline]
fn put_raw(&mut self, slot: u32, value: u8) {
debug_assert!(value <= 63, "6-bit value must be 0-63");
let start_bit = slot * 6;
let byte_idx = (start_bit >> 3) as usize;
let shift = (start_bit & 0x7) as u8;
let mut two_bytes = u16::from_le_bytes([self.bytes[byte_idx], self.bytes[byte_idx + 1]]);
two_bytes &= !(VAL_MASK_6 << shift);
two_bytes |= ((value as u16) & VAL_MASK_6) << shift;
let bytes_out = two_bytes.to_le_bytes();
self.bytes[byte_idx] = bytes_out[0];
self.bytes[byte_idx + 1] = bytes_out[1];
}
pub fn update(&mut self, coupon: u32) {
let mask = (1 << self.lg_config_k) - 1;
let slot = get_slot(coupon) & mask;
let new_value = get_value(coupon);
let old_value = self.get_raw(slot);
if new_value > old_value {
self.estimator
.update(self.lg_config_k, old_value, new_value);
self.put_raw(slot, new_value);
if old_value == 0 {
self.num_zeros -= 1;
}
}
}
pub fn estimate(&self) -> f64 {
self.estimator.estimate(self.lg_config_k, 0, self.num_zeros)
}
pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 {
self.estimator
.upper_bound(self.lg_config_k, 0, self.num_zeros, num_std_dev)
}
pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 {
self.estimator
.lower_bound(self.lg_config_k, 0, self.num_zeros, num_std_dev)
}
pub fn set_hip_accum(&mut self, value: f64) {
self.estimator.set_hip_accum(value);
}
pub fn is_empty(&self) -> bool {
self.num_zeros == (1 << self.lg_config_k)
}
pub fn deserialize(
mut cursor: SketchSlice,
lg_config_k: u8,
compact: bool,
ooo: bool,
) -> Result<Self, Error> {
fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error {
move |_| Error::insufficient_data(tag)
}
let k = 1 << lg_config_k;
let num_bytes = num_bytes_for_k(k);
let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?;
let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?;
let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?;
let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?;
let _aux_count = cursor.read_u32_le().map_err(make_error("aux_count"))?;
let mut data = vec![0u8; num_bytes];
if !compact {
cursor.read_exact(&mut data).map_err(make_error("data"))?;
} else {
cursor.advance(num_bytes as u64);
}
let mut estimator = HipEstimator::new(lg_config_k);
estimator.set_hip_accum(hip_accum);
estimator.set_kxq0(kxq0);
estimator.set_kxq1(kxq1);
estimator.set_out_of_order(ooo);
Ok(Self {
lg_config_k,
bytes: data.into_boxed_slice(),
num_zeros,
estimator,
})
}
pub fn serialize(&self, lg_config_k: u8) -> Vec<u8> {
use crate::hll::serialization::*;
let k = 1 << lg_config_k;
let num_bytes = num_bytes_for_k(k);
let total_size = HLL_PREAMBLE_SIZE + num_bytes;
let mut bytes = SketchBytes::with_capacity(total_size);
bytes.write_u8(HLL_PREINTS);
bytes.write_u8(SERIAL_VERSION);
bytes.write_u8(HLL_FAMILY_ID);
bytes.write_u8(lg_config_k);
bytes.write_u8(0);
let mut flags = 0u8;
if self.estimator.is_out_of_order() {
flags |= OUT_OF_ORDER_FLAG_MASK;
}
bytes.write_u8(flags);
bytes.write_u8(0);
bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL6));
bytes.write_f64_le(self.estimator.hip_accum());
bytes.write_f64_le(self.estimator.kxq0());
bytes.write_f64_le(self.estimator.kxq1());
bytes.write_u32_le(self.num_zeros);
bytes.write_u32_le(0);
bytes.write(&self.bytes);
bytes.into_bytes()
}
}
fn num_bytes_for_k(k: u32) -> usize {
(((k * 3) >> 2) + 1) as usize
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hll::coupon;
use crate::hll::pack_coupon;
#[test]
fn test_num_bytes_calculation() {
assert_eq!(num_bytes_for_k(16), (16 * 3 / 4) + 1);
assert_eq!(num_bytes_for_k(1024), (1024 * 3 / 4) + 1);
}
#[test]
fn test_get_set_raw() {
let mut arr = Array6::new(4);
arr.put_raw(0, 0);
arr.put_raw(1, 1);
arr.put_raw(2, 31);
arr.put_raw(3, 63);
assert_eq!(arr.get_raw(0), 0);
assert_eq!(arr.get_raw(1), 1);
assert_eq!(arr.get_raw(2), 31);
assert_eq!(arr.get_raw(3), 63);
arr.put_raw(5, 42);
assert_eq!(arr.get_raw(5), 42);
assert_eq!(arr.get_raw(3), 63);
for slot in 0..16 {
arr.put_raw(slot, (slot % 64) as u8);
}
for slot in 0..16 {
assert_eq!(arr.get_raw(slot), (slot % 64) as u8);
}
}
#[test]
fn test_boundary_crossing() {
let mut arr = Array6::new(8);
arr.put_raw(1, 0b111111);
assert_eq!(arr.get_raw(1), 63);
arr.put_raw(2, 0b101010);
assert_eq!(arr.get_raw(2), 42);
arr.put_raw(3, 0b110011);
assert_eq!(arr.get_raw(3), 51);
assert_eq!(arr.get_raw(1), 63);
assert_eq!(arr.get_raw(2), 42);
assert_eq!(arr.get_raw(3), 51);
}
#[test]
fn test_hip_estimator() {
let mut arr = Array6::new(10);
assert_eq!(arr.estimate(), 0.0);
for i in 0..10_000u32 {
let coupon = coupon(i);
arr.update(coupon);
}
let estimate = arr.estimate();
assert!(estimate > 0.0, "Estimate should be positive");
assert!(estimate.is_finite(), "Estimate should be finite");
assert!(estimate > 1_000.0, "Estimate seems too low");
assert!(estimate < 100_000.0, "Estimate seems too high");
}
#[test]
fn test_full_range() {
let mut arr = Array6::new(6);
for val in 0..64u8 {
arr.put_raw(val as u32, val);
}
for val in 0..64u8 {
assert_eq!(arr.get_raw(val as u32), val);
}
}
#[test]
fn test_kxq_register_split() {
let mut arr = Array6::new(8);
arr.update(pack_coupon(0, 10)); arr.update(pack_coupon(1, 40));
assert!(arr.estimator.kxq0() < 256.0, "kxq0 should have decreased");
assert!(arr.estimator.kxq1() > 0.0, "kxq1 should be positive");
assert!(
arr.estimator.kxq1() < 0.001,
"kxq1 should be small (1/2^40 is tiny)"
);
}
}