use std::hash::Hash;
use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::codec::assert::ensure_preamble_longs_in_range;
use crate::codec::assert::insufficient_data;
use crate::codec::family::Family;
use crate::common::NumStdDev;
use crate::common::ResizeFactor;
use crate::common::binomial_bounds;
use crate::error::Error;
use crate::hash::DEFAULT_UPDATE_SEED;
use crate::hash::compute_seed_hash;
use crate::theta::DEFAULT_LG_K;
use crate::theta::MAX_LG_K;
use crate::theta::MAX_THETA;
use crate::theta::MIN_LG_K;
use crate::theta::bit_pack::BLOCK_WIDTH;
use crate::theta::bit_pack::BitPacker;
use crate::theta::bit_pack::BitUnpacker;
use crate::theta::bit_pack::pack_bits_block;
use crate::theta::bit_pack::unpack_bits_block;
use crate::theta::hash_table::ThetaHashTable;
use crate::theta::serialization;
use crate::theta::serialization::V2_PREAMBLE_EMPTY;
use crate::theta::serialization::V2_PREAMBLE_ESTIMATE;
use crate::theta::serialization::V2_PREAMBLE_PRECISE;
mod private {
use super::*;
pub trait Sealed {}
impl Sealed for ThetaSketch {}
impl Sealed for CompactThetaSketch {}
}
pub trait ThetaSketchView: private::Sealed {
fn seed_hash(&self) -> u16;
fn theta64(&self) -> u64;
fn is_empty(&self) -> bool;
fn iter(&self) -> impl Iterator<Item = u64> + '_;
fn num_retained(&self) -> usize;
fn is_ordered(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct ThetaSketch {
table: ThetaHashTable,
}
impl ThetaSketch {
pub fn builder() -> ThetaSketchBuilder {
ThetaSketchBuilder::default()
}
pub fn update<T: Hash>(&mut self, value: T) {
self.table.try_insert(value);
}
pub fn estimate(&self) -> f64 {
if self.is_empty() {
return 0.0;
}
let num_retained = self.table.num_retained() as f64;
let theta = self.table.theta() as f64 / MAX_THETA as f64;
num_retained / theta
}
pub fn theta(&self) -> f64 {
self.table.theta() as f64 / MAX_THETA as f64
}
pub fn theta64(&self) -> u64 {
self.table.theta()
}
pub fn seed_hash(&self) -> u16 {
self.table.seed_hash()
}
pub fn is_empty(&self) -> bool {
self.table.is_empty()
}
pub fn is_estimation_mode(&self) -> bool {
self.table.theta() < MAX_THETA
}
pub fn num_retained(&self) -> usize {
self.table.num_retained()
}
pub fn lg_k(&self) -> u8 {
self.table.lg_nom_size()
}
pub fn trim(&mut self) {
self.table.trim();
}
pub fn reset(&mut self) {
self.table.reset();
}
pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
self.table.iter()
}
pub fn compact(&self, ordered: bool) -> CompactThetaSketch {
let mut entries: Vec<u64> = self.iter().collect();
let empty = self.is_empty();
let theta = if empty {
MAX_THETA
} else {
self.table.theta()
};
let is_single = entries.len() == 1 && theta == MAX_THETA;
let ordered = ordered || empty || is_single;
if ordered && entries.len() > 1 {
entries.sort_unstable();
}
CompactThetaSketch::from_parts(entries, theta, self.table.seed_hash(), ordered, empty)
}
pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 {
if !self.is_estimation_mode() {
return self.num_retained() as f64;
}
binomial_bounds::lower_bound(self.num_retained() as u64, self.theta(), num_std_dev)
.expect("theta should always be valid")
}
pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 {
if !self.is_estimation_mode() {
return self.num_retained() as f64;
}
binomial_bounds::upper_bound(
self.num_retained() as u64,
self.theta(),
num_std_dev,
self.is_empty(),
)
.expect("theta should always be valid")
}
}
impl ThetaSketchView for ThetaSketch {
fn seed_hash(&self) -> u16 {
ThetaSketch::seed_hash(self)
}
fn theta64(&self) -> u64 {
ThetaSketch::theta64(self)
}
fn is_empty(&self) -> bool {
ThetaSketch::is_empty(self)
}
fn iter(&self) -> impl Iterator<Item = u64> + '_ {
ThetaSketch::iter(self)
}
fn num_retained(&self) -> usize {
ThetaSketch::num_retained(self)
}
}
#[derive(Clone, Debug)]
pub struct CompactThetaSketch {
entries: Vec<u64>,
theta: u64,
seed_hash: u16,
ordered: bool,
empty: bool,
}
impl CompactThetaSketch {
pub(super) fn from_parts(
entries: Vec<u64>,
theta: u64,
seed_hash: u16,
ordered: bool,
empty: bool,
) -> Self {
Self {
entries,
theta,
seed_hash,
ordered,
empty,
}
}
pub fn estimate(&self) -> f64 {
if self.is_empty() {
return 0.0;
}
let num_retained = self.num_retained() as f64;
if self.theta == MAX_THETA {
return num_retained;
}
let theta = self.theta as f64 / MAX_THETA as f64;
num_retained / theta
}
pub fn theta(&self) -> f64 {
self.theta as f64 / MAX_THETA as f64
}
pub fn theta64(&self) -> u64 {
self.theta
}
pub fn is_empty(&self) -> bool {
self.empty
}
pub fn is_estimation_mode(&self) -> bool {
self.theta < MAX_THETA
}
pub fn num_retained(&self) -> usize {
self.entries.len()
}
pub fn is_ordered(&self) -> bool {
self.ordered
}
pub fn seed_hash(&self) -> u16 {
self.seed_hash
}
pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
self.entries.iter().copied()
}
pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 {
if !self.is_estimation_mode() {
return self.num_retained() as f64;
}
binomial_bounds::lower_bound(self.num_retained() as u64, self.theta(), num_std_dev)
.expect("compact theta should always be valid")
}
pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 {
if !self.is_estimation_mode() {
return self.num_retained() as f64;
}
binomial_bounds::upper_bound(
self.num_retained() as u64,
self.theta(),
num_std_dev,
self.is_empty(),
)
.expect("compact theta should always be valid")
}
fn preamble_longs(&self, compressed: bool) -> u8 {
if compressed {
if self.is_estimation_mode() { 2 } else { 1 }
} else if self.is_estimation_mode() {
3
} else if self.is_empty() || self.entries.len() == 1 {
1
} else {
2
}
}
pub fn serialize_compressed(&self) -> Vec<u8> {
if self.is_suitable_for_compression() {
self.serialize_v4()
} else {
self.serialize()
}
}
fn is_suitable_for_compression(&self) -> bool {
self.ordered
&& !self.entries.is_empty()
&& (self.entries.len() != 1 || self.is_estimation_mode())
}
pub fn serialize(&self) -> Vec<u8> {
let mut bytes = SketchBytes::with_capacity(64 + self.entries.len() * 8);
let pre_longs = self.preamble_longs(false);
bytes.write_u8(pre_longs);
bytes.write_u8(serialization::UNCOMPRESSED_SERIAL_VERSION);
bytes.write_u8(Family::THETA.id);
bytes.write_u16_be(0);
let mut flags = 0u8;
flags |= serialization::FLAGS_IS_READ_ONLY;
flags |= serialization::FLAGS_IS_COMPACT;
if self.is_empty() {
flags |= serialization::FLAGS_IS_EMPTY;
}
if self.is_ordered() {
flags |= serialization::FLAGS_IS_ORDERED;
}
bytes.write_u8(flags);
bytes.write_u16_le(self.seed_hash);
if pre_longs > 1 {
bytes.write_u32_le(self.entries.len() as u32);
bytes.write_u32_be(0); }
if self.is_estimation_mode() {
bytes.write_u64_le(self.theta64());
}
for hash in self.entries.iter() {
bytes.write_u64_le(*hash);
}
bytes.into_bytes()
}
fn serialize_v4(&self) -> Vec<u8> {
let pre_longs = self.preamble_longs(true);
let entry_bits = Self::compute_entry_bits(&self.entries);
let num_entries_bytes = Self::num_entries_bytes(self.entries.len());
let compressed_bits = entry_bits as usize * self.entries.len();
let compressed_bytes = compressed_bits.div_ceil(8);
let out_bytes = (pre_longs as usize * 8) + (num_entries_bytes as usize) + compressed_bytes;
let mut bytes = SketchBytes::with_capacity(out_bytes);
bytes.write_u8(pre_longs);
bytes.write_u8(serialization::COMPRESSED_SERIAL_VERSION);
bytes.write_u8(Family::THETA.id);
bytes.write_u8(entry_bits);
bytes.write_u8(num_entries_bytes);
let mut flags = 0u8;
flags |= serialization::FLAGS_IS_READ_ONLY;
flags |= serialization::FLAGS_IS_COMPACT;
flags |= serialization::FLAGS_IS_ORDERED;
bytes.write_u8(flags);
bytes.write_u16_le(self.seed_hash);
if self.is_estimation_mode() {
bytes.write_u64_le(self.theta);
}
let mut n = self.entries.len() as u32;
for _ in 0..num_entries_bytes {
bytes.write_u8((n & 0xff) as u8);
n >>= 8;
}
let mut previous = 0u64;
let mut i = 0usize;
let mut block = vec![0u8; entry_bits as usize];
while i + BLOCK_WIDTH <= self.entries.len() {
let mut deltas = [0u64; BLOCK_WIDTH];
for j in 0..BLOCK_WIDTH {
let entry = self.entries[i + j];
deltas[j] = entry - previous;
previous = entry;
}
block.fill(0);
pack_bits_block(&deltas, &mut block, entry_bits);
bytes.write(&block);
i += BLOCK_WIDTH;
}
if i < self.entries.len() {
let mut block = vec![0u8; entry_bits as usize];
let mut packer = BitPacker::new(&mut block);
while i < self.entries.len() {
let delta = self.entries[i] - previous;
previous = self.entries[i];
packer.pack_value(delta, entry_bits);
i += 1;
}
let bytes_used = packer.byte_used();
bytes.write(&block[0..bytes_used]);
}
bytes.into_bytes()
}
fn compute_entry_bits(entries: &[u64]) -> u8 {
let mut previous = 0u64;
let mut ored = 0u64;
for &entry in entries {
let delta = entry - previous;
ored |= delta;
previous = entry;
}
(64 - ored.leading_zeros()) as u8
}
fn num_entries_bytes(num_entries: usize) -> u8 {
let n = num_entries as u32;
let bits = u32::BITS - n.leading_zeros();
bits.div_ceil(8) as u8
}
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_with_seed(bytes, DEFAULT_UPDATE_SEED)
}
pub fn deserialize_with_seed(bytes: &[u8], seed: u64) -> Result<Self, Error> {
let mut cursor = SketchSlice::new(bytes);
let pre_longs = cursor
.read_u8()
.map_err(insufficient_data("preamble_longs"))?;
let ser_ver = cursor
.read_u8()
.map_err(insufficient_data("serial_version"))?;
let family_id = cursor.read_u8().map_err(insufficient_data("family_id"))?;
Family::THETA.validate_id(family_id)?;
ensure_preamble_longs_in_range(
Family::THETA.min_pre_longs..=Family::THETA.max_pre_longs,
pre_longs,
)?;
match ser_ver {
1 => Self::deserialize_v1(cursor, seed),
2 => Self::deserialize_v2(pre_longs, cursor, seed),
3 => Self::deserialize_v3(pre_longs, cursor, seed),
4 => Self::deserialize_v4(pre_longs, cursor, seed),
_ => Err(Error::deserial(format!(
"unsupported serial version: expected 1, 2, 3, or 4, got {ser_ver}",
))),
}
}
fn read_entries(
cursor: &mut SketchSlice<'_>,
num_entries: usize,
theta: u64,
) -> Result<Vec<u64>, Error> {
let mut entries = Vec::with_capacity(num_entries);
for _ in 0..num_entries {
let hash = cursor.read_u64_le().map_err(insufficient_data("entries"))?;
if hash == 0 || hash >= theta {
return Err(Error::deserial("corrupted: invalid retained hash value"));
}
entries.push(hash);
}
Ok(entries)
}
fn deserialize_v1(mut cursor: SketchSlice<'_>, expected_seed: u64) -> Result<Self, Error> {
let seed_hash = compute_seed_hash(expected_seed);
cursor.read_u8().map_err(insufficient_data("<unused>"))?;
cursor
.read_u32_le()
.map_err(insufficient_data("<unused_u32_0>"))?;
let num_entries = cursor
.read_u32_le()
.map_err(insufficient_data("num_entries"))? as usize;
cursor
.read_u32_le()
.map_err(insufficient_data("<unused_u32_1>"))?;
let theta = cursor
.read_u64_le()
.map_err(insufficient_data("theta_long"))?;
let empty = num_entries == 0 && theta == MAX_THETA;
if empty {
return Ok(Self {
entries: vec![],
theta,
seed_hash,
ordered: true,
empty: true,
});
}
let entries = Self::read_entries(&mut cursor, num_entries, theta)?;
Ok(Self {
entries,
theta,
seed_hash,
ordered: true,
empty: false,
})
}
fn deserialize_v2(
pre_longs: u8,
mut cursor: SketchSlice<'_>,
expected_seed: u64,
) -> Result<Self, Error> {
cursor.read_u8().map_err(insufficient_data("<unused>"))?;
cursor
.read_u16_le()
.map_err(insufficient_data("<unused_u16>"))?;
let seed_hash = cursor
.read_u16_le()
.map_err(insufficient_data("seed_hash"))?;
let expected_seed_hash = compute_seed_hash(expected_seed);
if seed_hash != expected_seed_hash {
return Err(Error::deserial(format!(
"incompatible seed hash: expected {expected_seed_hash}, got {seed_hash}",
)));
}
match pre_longs {
V2_PREAMBLE_EMPTY => Ok(Self {
entries: vec![],
theta: MAX_THETA,
seed_hash,
ordered: true,
empty: true,
}),
V2_PREAMBLE_PRECISE => {
let num_entries = cursor
.read_u32_le()
.map_err(insufficient_data("num_entries"))?
as usize;
cursor
.read_u32_le()
.map_err(insufficient_data("<unused_u32>"))?;
let entries = Self::read_entries(&mut cursor, num_entries, MAX_THETA)?;
Ok(Self {
entries,
theta: MAX_THETA,
seed_hash,
ordered: true,
empty: true,
})
}
V2_PREAMBLE_ESTIMATE => {
let num_entries = cursor
.read_u32_le()
.map_err(insufficient_data("num_entries"))?
as usize;
cursor
.read_u32_le()
.map_err(insufficient_data("<unused_u32>"))?;
let theta = cursor
.read_u64_le()
.map_err(insufficient_data("theta_long"))?;
let empty = (num_entries == 0) && (theta == MAX_THETA);
let entries = Self::read_entries(&mut cursor, num_entries, theta)?;
Ok(Self {
entries,
theta,
seed_hash,
ordered: true,
empty,
})
}
_ => Err(Error::invalid_preamble_longs(&[1, 2, 3], pre_longs)),
}
}
fn deserialize_v3(
pre_longs: u8,
mut cursor: SketchSlice<'_>,
expected_seed: u64,
) -> Result<Self, Error> {
cursor
.read_u16_le()
.map_err(insufficient_data("<unused_u32>"))?;
let flags = cursor.read_u8().map_err(insufficient_data("flags"))?;
let seed_hash = cursor
.read_u16_le()
.map_err(insufficient_data("seed_hash"))?;
let empty = (flags & serialization::FLAGS_IS_EMPTY) != 0;
let mut theta = MAX_THETA;
let num_entries;
let mut entries = vec![];
if !empty {
let expected_seed_hash = compute_seed_hash(expected_seed);
if seed_hash != expected_seed_hash {
return Err(Error::deserial(format!(
"incompatible seed hash: expected {expected_seed_hash}, got {seed_hash}",
)));
}
if pre_longs == 1 {
num_entries = 1;
} else {
num_entries = cursor
.read_u32_le()
.map_err(insufficient_data("num_entries"))?;
cursor
.read_u32_le()
.map_err(insufficient_data("<unused_u32>"))?;
if pre_longs > 2 {
theta = cursor
.read_u64_le()
.map_err(insufficient_data("theta_long"))?;
}
}
entries = Self::read_entries(&mut cursor, num_entries as usize, theta)?;
}
let ordered = (flags & serialization::FLAGS_IS_ORDERED) != 0;
Ok(Self {
entries,
theta,
seed_hash,
ordered,
empty,
})
}
fn deserialize_v4(
pre_longs: u8,
mut cursor: SketchSlice<'_>,
expected_seed: u64,
) -> Result<Self, Error> {
let entry_bits = cursor.read_u8().map_err(insufficient_data("entry_bits"))?;
let num_entries_bytes = cursor.read_u8().map_err(insufficient_data("num_entries"))?;
let flags = cursor.read_u8().map_err(insufficient_data("flags"))?;
let seed_hash = cursor
.read_u16_le()
.map_err(insufficient_data("seed_hash"))?;
let empty = (flags & serialization::FLAGS_IS_EMPTY) != 0;
if !empty {
let expected_seed_hash = compute_seed_hash(expected_seed);
if seed_hash != expected_seed_hash {
return Err(Error::deserial(format!(
"incompatible seed hash: expected {expected_seed_hash}, got {seed_hash}",
)));
}
}
let theta = if pre_longs > 1 {
cursor
.read_u64_le()
.map_err(insufficient_data("theta_long"))?
} else {
MAX_THETA
};
let mut num_entries = 0usize;
for i in 0..num_entries_bytes {
let entry_count_byte = cursor
.read_u8()
.map_err(insufficient_data("num_entries_byte"))?;
num_entries |= (entry_count_byte as usize) << ((i as usize) << 3);
}
let mut i = 0usize;
let mut entries = vec![0u64; num_entries];
while i + BLOCK_WIDTH <= num_entries {
let mut block = vec![0u8; entry_bits as usize];
cursor
.read_exact(&mut block)
.map_err(insufficient_data("delta_block"))?;
unpack_bits_block(&mut entries[i..i + BLOCK_WIDTH], &block, entry_bits);
i += BLOCK_WIDTH;
}
if i < num_entries {
let rem = num_entries - i;
let bytes_needed = (rem * entry_bits as usize).div_ceil(8);
let mut tail = vec![0u8; bytes_needed];
cursor
.read_exact(&mut tail)
.map_err(insufficient_data("delta_tail"))?;
let mut unpacker = BitUnpacker::new(&tail);
for slot in entries.iter_mut().take(num_entries).skip(i) {
*slot = unpacker.unpack_value(entry_bits);
}
}
let mut previous = 0;
for e in &mut entries {
*e += previous;
previous = *e;
if *e == 0 || *e >= theta {
return Err(Error::deserial("corrupted: invalid retained hash value"));
}
}
let ordered = (flags & serialization::FLAGS_IS_ORDERED) != 0;
Ok(Self {
entries,
theta,
seed_hash,
ordered,
empty,
})
}
}
impl ThetaSketchView for CompactThetaSketch {
fn seed_hash(&self) -> u16 {
CompactThetaSketch::seed_hash(self)
}
fn theta64(&self) -> u64 {
CompactThetaSketch::theta64(self)
}
fn is_empty(&self) -> bool {
CompactThetaSketch::is_empty(self)
}
fn iter(&self) -> impl Iterator<Item = u64> + '_ {
CompactThetaSketch::iter(self)
}
fn num_retained(&self) -> usize {
CompactThetaSketch::num_retained(self)
}
fn is_ordered(&self) -> bool {
CompactThetaSketch::is_ordered(self)
}
}
#[derive(Debug)]
pub struct ThetaSketchBuilder {
lg_k: u8,
resize_factor: ResizeFactor,
sampling_probability: f32,
seed: u64,
}
impl Default for ThetaSketchBuilder {
fn default() -> Self {
Self {
lg_k: DEFAULT_LG_K,
resize_factor: ResizeFactor::X8,
sampling_probability: 1.0,
seed: DEFAULT_UPDATE_SEED,
}
}
}
impl ThetaSketchBuilder {
pub fn lg_k(mut self, lg_k: u8) -> Self {
assert!(
(MIN_LG_K..=MAX_LG_K).contains(&lg_k),
"lg_k must be in [{}, {}], got {}",
MIN_LG_K,
MAX_LG_K,
lg_k
);
self.lg_k = lg_k;
self
}
pub fn resize_factor(mut self, factor: ResizeFactor) -> Self {
self.resize_factor = factor;
self
}
pub fn sampling_probability(mut self, probability: f32) -> Self {
assert!(
(0.0..=1.0).contains(&probability) && probability > 0.0,
"sampling_probability must be in (0.0, 1.0], got {probability}"
);
self.sampling_probability = probability;
self
}
pub fn seed(mut self, seed: u64) -> Self {
self.seed = seed;
self
}
pub fn build(self) -> ThetaSketch {
let table = ThetaHashTable::new(
self.lg_k,
self.resize_factor,
self.sampling_probability,
self.seed,
);
ThetaSketch { table }
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sorted_theta_entries(sketch: &ThetaSketch) -> Vec<u64> {
let mut entries: Vec<u64> = sketch.iter().collect();
entries.sort_unstable();
entries
}
fn sorted_compact_entries(sketch: &CompactThetaSketch) -> Vec<u64> {
let mut entries: Vec<u64> = sketch.iter().collect();
entries.sort_unstable();
entries
}
fn assert_theta_and_compact_equivalent_ordered(theta: &ThetaSketch, ordered: bool) {
let compact = theta.compact(ordered);
assert_theta_and_compact_equivalent(theta, &compact);
if compact.num_retained() > 1 {
assert_eq!(compact.is_ordered(), ordered);
}
}
fn assert_theta_and_compact_equivalent(theta: &ThetaSketch, compact: &CompactThetaSketch) {
assert_eq!(theta.is_empty(), compact.is_empty());
assert_eq!(theta.is_estimation_mode(), compact.is_estimation_mode());
assert_eq!(theta.num_retained(), compact.num_retained());
assert_eq!(theta.theta64(), compact.theta64());
assert_eq!(sorted_theta_entries(theta), sorted_compact_entries(compact));
assert!((theta.estimate() - compact.estimate()).abs() <= 1e-12);
}
fn assert_compact_equivalent(a: &CompactThetaSketch, b: &CompactThetaSketch) {
assert_eq!(a.is_empty(), b.is_empty());
assert_eq!(a.is_estimation_mode(), b.is_estimation_mode());
assert_eq!(a.is_ordered(), b.is_ordered());
assert_eq!(a.num_retained(), b.num_retained());
assert_eq!(a.theta64(), b.theta64());
assert_eq!(a.seed_hash(), b.seed_hash());
assert_eq!(sorted_compact_entries(a), sorted_compact_entries(b));
assert!((a.estimate() - b.estimate()).abs() <= 1e-12);
}
fn assert_compressed_round_trip(theta: &ThetaSketch, compact: &CompactThetaSketch) {
let bytes_v4 = compact.serialize_compressed();
assert_eq!(bytes_v4[1], serialization::COMPRESSED_SERIAL_VERSION);
let decoded_v4 = CompactThetaSketch::deserialize(&bytes_v4).unwrap();
assert_compact_equivalent(compact, &decoded_v4);
assert_theta_and_compact_equivalent(theta, &decoded_v4);
}
#[test]
fn theta_and_compact_theta_equivalent() {
let mut exact_theta = ThetaSketch::builder().lg_k(12).build();
for i in 0..2000 {
exact_theta.update(i);
}
assert!(!exact_theta.is_estimation_mode());
for ordered in [false, true] {
assert_theta_and_compact_equivalent_ordered(&exact_theta, ordered);
}
let mut estimation_theta = ThetaSketch::builder().lg_k(5).build();
for i in 0..5000 {
estimation_theta.update(i);
}
assert!(estimation_theta.is_estimation_mode());
for ordered in [false, true] {
assert_theta_and_compact_equivalent_ordered(&estimation_theta, ordered);
}
}
#[test]
fn compact_theta_serialize_deserialize_round_trip_equivalent_to_compact_and_theta() {
let mut theta = ThetaSketch::builder().lg_k(5).build();
for i in 0..5000 {
theta.update(i);
}
let compact = theta.compact(true);
assert!(compact.is_ordered());
assert!(compact.is_estimation_mode());
let bytes_v3 = compact.serialize();
let decoded_v3 = CompactThetaSketch::deserialize(&bytes_v3).unwrap();
assert_compact_equivalent(&compact, &decoded_v3);
assert_theta_and_compact_equivalent(&theta, &decoded_v3);
}
#[test]
fn compact_theta_serialize_compressed_round_trip_tail_entries() {
let mut theta = ThetaSketch::builder().lg_k(12).build();
for i in 0..13 {
theta.update(i);
}
let compact = theta.compact(true);
assert_eq!(compact.num_retained() % 8, 5);
assert!(!compact.is_estimation_mode());
assert!(compact.is_ordered());
assert_compressed_round_trip(&theta, &compact);
}
#[test]
fn compact_theta_serialize_compressed_round_trip_more_than_255_entries() {
let mut theta = ThetaSketch::builder().lg_k(12).build();
for i in 0..300 {
theta.update(i);
}
let compact = theta.compact(true);
assert!(compact.num_retained() > 255);
assert!(!compact.is_estimation_mode());
assert!(compact.is_ordered());
assert_compressed_round_trip(&theta, &compact);
}
#[test]
fn compact_theta_serialize_compressed_round_trip_estimation_mode() {
let mut theta = ThetaSketch::builder().lg_k(5).build();
for i in 0..5000 {
theta.update(i);
}
let compact = theta.compact(true);
assert!(compact.is_estimation_mode());
assert!(compact.is_ordered());
assert_compressed_round_trip(&theta, &compact);
}
#[test]
fn deserialize_rejects_seed_hash_mismatch() {
let mut theta = ThetaSketch::builder().seed(7).build();
theta.update("apple");
let bytes = theta.compact(true).serialize();
let err = CompactThetaSketch::deserialize_with_seed(&bytes, 8).unwrap_err();
assert_eq!(err.kind(), crate::error::ErrorKind::InvalidData);
assert!(err.message().contains("incompatible seed hash"));
}
#[test]
fn deserialize_rejects_invalid_family_id() {
let mut theta = ThetaSketch::builder().build();
theta.update("apple");
let mut bytes = theta.compact(true).serialize();
bytes[2] = 0;
let err = CompactThetaSketch::deserialize(&bytes).unwrap_err();
assert_eq!(err.kind(), crate::error::ErrorKind::InvalidData);
assert!(err.message().contains("invalid family"));
}
#[test]
fn deserialize_rejects_unsupported_serial_version() {
let mut theta = ThetaSketch::builder().build();
theta.update("apple");
let mut bytes = theta.compact(true).serialize();
bytes[1] = 99;
let err = CompactThetaSketch::deserialize(&bytes).unwrap_err();
assert_eq!(err.kind(), crate::error::ErrorKind::InvalidData);
assert!(err.message().contains("unsupported serial version"));
}
#[test]
fn deserialize_rejects_truncated_payload() {
let mut theta = ThetaSketch::builder().lg_k(5).build();
for i in 0..5000 {
theta.update(i);
}
let mut bytes = theta.compact(true).serialize();
bytes.pop();
let err = CompactThetaSketch::deserialize(&bytes).unwrap_err();
assert_eq!(err.kind(), crate::error::ErrorKind::InvalidData);
assert!(err.message().contains("insufficient data"));
}
}