use crate::common::ResizeFactor;
use crate::error::Error;
use crate::hash::DEFAULT_UPDATE_SEED;
use crate::theta::CompactThetaSketch;
use crate::theta::HASH_TABLE_REBUILD_THRESHOLD;
use crate::theta::MAX_THETA;
use crate::theta::ThetaSketchView;
use crate::theta::hash_table::ThetaHashTable;
#[derive(Debug)]
pub struct ThetaIntersection {
is_valid: bool,
table: ThetaHashTable,
}
impl ThetaIntersection {
pub fn new(seed: u64) -> Self {
Self {
is_valid: false,
table: ThetaHashTable::from_raw_parts(
0,
0,
ResizeFactor::X1,
1.0,
MAX_THETA,
seed,
false,
),
}
}
pub fn new_with_default_seed() -> Self {
Self::new(DEFAULT_UPDATE_SEED)
}
pub fn update<S: ThetaSketchView>(&mut self, sketch: &S) -> Result<(), Error> {
let new_default_table = |table: &ThetaHashTable| {
ThetaHashTable::from_raw_parts(
0,
0,
ResizeFactor::X1,
1.0,
table.theta(),
table.hash_seed(),
table.is_empty(),
)
};
if self.table.is_empty() {
return Ok(());
}
if !sketch.is_empty() && sketch.seed_hash() != self.table.seed_hash() {
return Err(Error::invalid_argument(format!(
"incompatible seed hash: expected {}, got {}",
self.table.seed_hash(),
sketch.seed_hash()
)));
}
if sketch.is_empty() {
self.table.set_empty(true);
}
self.table.set_theta(if self.table.is_empty() {
MAX_THETA
} else {
self.table.theta().min(sketch.theta64())
});
if self.is_valid && self.table.num_retained() == 0 {
return Ok(());
}
if sketch.num_retained() == 0 {
self.is_valid = true;
self.table = new_default_table(&self.table);
return Ok(());
}
if !self.is_valid {
self.is_valid = true;
let lg_size = ThetaHashTable::lg_size_from_count_for_rebuild(
sketch.num_retained(),
HASH_TABLE_REBUILD_THRESHOLD,
);
self.table = ThetaHashTable::from_raw_parts(
lg_size,
lg_size - 1,
ResizeFactor::X1,
1.0,
self.table.theta(),
self.table.hash_seed(),
self.table.is_empty(),
);
for hash in sketch.iter() {
if !self.table.try_insert_hash(hash) {
return Err(Error::invalid_argument(
"Insert entries from sketch fail, possibly corrupted input sketch",
));
}
}
if self.table.num_retained() != sketch.num_retained() {
return Err(Error::invalid_argument(
"num entries mismatch, possibly corrupted input sketch",
));
}
} else {
let max_matches = self.table.num_retained().min(sketch.num_retained());
let mut matched_entries = Vec::with_capacity(max_matches);
let mut count = 0;
for hash in sketch.iter() {
if hash < self.table.theta() {
if self.table.contains_hash(hash) {
if matched_entries.len() == max_matches {
return Err(Error::invalid_argument(
"max matches exceeded, possibly corrupted input sketch",
));
}
matched_entries.push(hash);
}
} else if sketch.is_ordered() {
break; }
count += 1;
}
if count > sketch.num_retained() {
return Err(Error::invalid_argument(
"more keys than expected, possibly corrupted input sketch",
));
} else if !sketch.is_ordered() && count < sketch.num_retained() {
return Err(Error::invalid_argument(
"fewer keys than expected, possibly corrupted input sketch",
));
}
if matched_entries.is_empty() {
self.table = new_default_table(&self.table);
if self.table.theta() == MAX_THETA {
self.table.set_empty(true);
}
} else {
let lg_size = ThetaHashTable::lg_size_from_count_for_rebuild(
matched_entries.len(),
HASH_TABLE_REBUILD_THRESHOLD,
);
self.table = ThetaHashTable::from_raw_parts(
lg_size,
lg_size - 1,
ResizeFactor::X1,
1.0,
self.table.theta(),
self.table.hash_seed(),
self.table.is_empty(),
);
for hash in matched_entries {
if !self.table.try_insert_hash(hash) {
return Err(Error::invalid_argument(
"duplicate key, possibly corrupted input sketch",
));
}
}
}
}
Ok(())
}
pub fn has_result(&self) -> bool {
self.is_valid
}
pub fn result(&self) -> CompactThetaSketch {
self.result_with_ordered(true)
}
pub fn result_with_ordered(&self, ordered: bool) -> CompactThetaSketch {
assert!(
self.is_valid,
"ThetaIntersection::result() called before first update()"
);
let mut hashes: Vec<u64> = self.table.iter().collect();
if ordered {
hashes.sort_unstable();
}
CompactThetaSketch::from_parts(
hashes,
self.table.theta(),
self.table.seed_hash(),
ordered,
self.table.is_empty(),
)
}
}