use std::collections::HashMap;
use std::fmt;
#[must_use]
#[allow(clippy::unreadable_literal)]
pub fn mix64(x: u64) -> u64 {
let mut z = x.wrapping_add(0x9E3779B97F4A7C15);
z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
z ^ (z >> 31)
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn pairwise_conflict_probability(write_set_size: u64, total_pages: u64) -> f64 {
if total_pages == 0 {
return 1.0;
}
let w = write_set_size as f64;
let p = total_pages as f64;
1.0 - (-w * w / p).exp()
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn birthday_conflict_probability_uniform(
n_writers: u64,
write_set_size: u64,
total_pages: u64,
) -> f64 {
if total_pages == 0 || n_writers < 2 {
return if n_writers < 2 { 0.0 } else { 1.0 };
}
let n = n_writers as f64;
let w = write_set_size as f64;
let p = total_pages as f64;
let exponent = n * (n - 1.0) * w * w / (2.0 * p);
1.0 - (-exponent).exp()
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn birthday_conflict_probability_m2(n_writers: u64, m2: f64) -> f64 {
if n_writers < 2 {
return 0.0;
}
let n = n_writers as f64;
let exponent = n * (n - 1.0) / 2.0 * m2;
1.0 - (-exponent).exp()
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn exact_m2(incidence_counts: &[u64], txn_count: u64) -> Option<f64> {
if txn_count == 0 {
return None;
}
let f2: u128 = incidence_counts
.iter()
.map(|&c| u128::from(c) * u128::from(c))
.sum();
let tc = txn_count as f64;
Some(f2 as f64 / (tc * tc))
}
#[must_use]
pub fn effective_collision_pool(m2: f64) -> f64 {
if m2 == 0.0 || !m2.is_finite() {
return f64::INFINITY;
}
1.0 / m2
}
pub const DEFAULT_AMS_R: usize = 12;
pub const MIN_AMS_R: usize = 8;
pub const MAX_AMS_R: usize = 32;
pub const AMS_SKETCH_VERSION: &str = "fsqlite:m2:ams:v1";
pub const NITRO_SKETCH_VERSION: &str = "fsqlite:cardinality:nitro:v1";
pub const DEFAULT_NITRO_PRECISION: u8 = 12;
pub const MIN_NITRO_PRECISION: u8 = 4;
pub const MAX_NITRO_PRECISION: u8 = 18;
pub const DEFAULT_HEAVY_HITTER_K: usize = 64;
pub const MIN_HEAVY_HITTER_K: usize = 32;
pub const MAX_HEAVY_HITTER_K: usize = 256;
pub const ZIPF_S_MIN: f64 = 0.1;
pub const ZIPF_S_MAX: f64 = 2.0;
pub const DEFAULT_ZIPF_MAX_ITERS: usize = 20;
#[must_use]
pub fn validate_ams_r(r: usize) -> bool {
(MIN_AMS_R..=MAX_AMS_R).contains(&r)
}
#[must_use]
pub fn validate_nitro_precision(precision: u8) -> bool {
(MIN_NITRO_PRECISION..=MAX_NITRO_PRECISION).contains(&precision)
}
#[must_use]
pub fn validate_heavy_hitter_k(k: usize) -> bool {
(MIN_HEAVY_HITTER_K..=MAX_HEAVY_HITTER_K).contains(&k)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NitroSketchConfig {
pub precision: u8,
pub seed: u64,
}
impl Default for NitroSketchConfig {
fn default() -> Self {
Self {
precision: DEFAULT_NITRO_PRECISION,
seed: 0,
}
}
}
#[derive(Clone)]
pub struct NitroSketch {
precision: u8,
seed: u64,
registers: Vec<u8>,
}
impl fmt::Debug for NitroSketch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NitroSketch")
.field("precision", &self.precision)
.field("register_count", &self.registers.len())
.finish_non_exhaustive()
}
}
impl NitroSketch {
#[must_use]
pub fn new(config: &NitroSketchConfig) -> Self {
assert!(
validate_nitro_precision(config.precision),
"NitroSketch precision must be in [{MIN_NITRO_PRECISION}, {MAX_NITRO_PRECISION}], got {}",
config.precision
);
let register_count = 1_usize << usize::from(config.precision);
Self {
precision: config.precision,
seed: config.seed,
registers: vec![0; register_count],
}
}
pub fn observe_u64(&mut self, value: u64) {
let salted = value ^ self.seed ^ 0x4E49_5452_4F53_4B45;
let hash = mix64(salted);
let precision_u32 = u32::from(self.precision);
let index_shift = 64_u32.saturating_sub(precision_u32);
let index_u64 = hash >> index_shift;
let index = usize::try_from(index_u64).expect("register index should fit usize");
let remaining = hash << precision_u32;
let rank_u32 = remaining.leading_zeros().saturating_add(1);
let max_rank_u32 = 64_u32.saturating_sub(precision_u32).saturating_add(1);
let rank_u8 =
u8::try_from(rank_u32.min(max_rank_u32)).expect("rank should fit in u8 register");
self.registers[index] = self.registers[index].max(rank_u8);
}
#[must_use]
#[allow(clippy::cast_precision_loss, clippy::naive_bytecount)]
pub fn estimate_cardinality(&self) -> f64 {
let register_count = self.registers.len();
let m = register_count as f64;
let alpha = nitro_alpha(register_count);
let denominator = self
.registers
.iter()
.map(|®ister| 2_f64.powi(-i32::from(register)))
.sum::<f64>();
let raw_estimate = alpha * m * m / denominator;
let zero_registers = self
.registers
.iter()
.filter(|&®ister| register == 0)
.count();
if raw_estimate <= 2.5 * m && zero_registers > 0 {
let zero_count = zero_registers as f64;
return m * (m / zero_count).ln();
}
raw_estimate
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn relative_standard_error(&self) -> f64 {
1.04 / (self.registers.len() as f64).sqrt()
}
#[must_use]
pub fn precision(&self) -> u8 {
self.precision
}
#[must_use]
pub fn register_count(&self) -> usize {
self.registers.len()
}
#[must_use]
pub fn memory_bytes(&self) -> usize {
self.registers.len()
}
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
fn nitro_alpha(register_count: usize) -> f64 {
match register_count {
16 => 0.673,
32 => 0.697,
64 => 0.709,
_ => {
let m = register_count as f64;
0.7213 / (1.0 + 1.079 / m)
}
}
}
#[derive(Debug, Clone)]
pub struct AmsSketchConfig {
pub r: usize,
pub db_epoch: u64,
pub regime_id: u64,
pub window_id: u64,
}
impl AmsSketchConfig {
#[must_use]
pub fn seed_for_index(&self, r_idx: usize) -> u64 {
let mut hasher = blake3::Hasher::new();
hasher.update(AMS_SKETCH_VERSION.as_bytes());
hasher.update(&self.db_epoch.to_le_bytes());
hasher.update(&self.regime_id.to_le_bytes());
hasher.update(&self.window_id.to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
hasher.update(&(r_idx as u64).to_le_bytes());
let hash = hasher.finalize();
let bytes: [u8; 8] = hash.as_bytes()[..8].try_into().expect("8 bytes");
u64::from_le_bytes(bytes)
}
#[must_use]
fn seed_for_r(&self, r_idx: usize) -> u64 {
self.seed_for_index(r_idx)
}
}
#[derive(Clone)]
pub struct AmsSketch {
seeds: Vec<u64>,
accumulators: Vec<i128>,
txn_count: u64,
}
impl fmt::Debug for AmsSketch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AmsSketch")
.field("r", &self.seeds.len())
.field("txn_count", &self.txn_count)
.finish_non_exhaustive()
}
}
impl AmsSketch {
#[must_use]
pub fn new(config: &AmsSketchConfig) -> Self {
assert!(
validate_ams_r(config.r),
"AMS r must be in [{MIN_AMS_R}, {MAX_AMS_R}], got {}",
config.r
);
let seeds: Vec<u64> = (0..config.r).map(|i| config.seed_for_r(i)).collect();
let accumulators = vec![0i128; config.r];
Self {
seeds,
accumulators,
txn_count: 0,
}
}
pub fn observe_write_set(&mut self, write_set: &[u64]) {
self.txn_count += 1;
for &pgno in write_set {
for (r, &seed) in self.seeds.iter().enumerate() {
let h = mix64(seed ^ pgno);
let sign: i128 = if (h & 1) == 0 { 1 } else { -1 };
self.accumulators[r] += sign;
}
}
}
#[must_use]
pub fn f2_hat(&self) -> u128 {
let mut squares: Vec<u128> = self
.accumulators
.iter()
.map(|&z| {
let abs = z.unsigned_abs();
abs.saturating_mul(abs)
})
.collect();
squares.sort_unstable();
let n = squares.len();
if n == 0 {
return 0;
}
squares[(n - 1) / 2]
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn m2_hat(&self) -> Option<f64> {
if self.txn_count == 0 {
return None;
}
let f2 = self.f2_hat() as f64;
let tc = self.txn_count as f64;
Some(f2 / (tc * tc))
}
#[must_use]
pub fn p_eff_hat(&self) -> f64 {
self.m2_hat()
.map_or(f64::INFINITY, effective_collision_pool)
}
#[must_use]
pub fn txn_count(&self) -> u64 {
self.txn_count
}
#[must_use]
pub fn r(&self) -> usize {
self.seeds.len()
}
#[must_use]
pub fn seed_for_index(&self, r_idx: usize) -> Option<u64> {
self.seeds.get(r_idx).copied()
}
#[must_use]
pub fn accumulators(&self) -> &[i128] {
&self.accumulators
}
#[must_use]
pub fn memory_bytes(&self) -> usize {
self.seeds.len() * 8 + self.accumulators.len() * 16 + 8
}
pub fn reset_window(&mut self) {
for acc in &mut self.accumulators {
*acc = 0;
}
self.txn_count = 0;
}
#[must_use]
pub fn would_overflow_for_txn(&self, write_set_len: usize) -> bool {
if write_set_len == 0 {
return false;
}
#[allow(clippy::cast_possible_truncation)]
let delta = write_set_len as u128;
let threshold = (i128::MAX as u128).saturating_sub(delta);
self.accumulators
.iter()
.any(|&acc| acc.unsigned_abs() > threshold)
}
#[cfg(test)]
fn set_accumulators_for_test(&mut self, values: &[i128]) {
assert_eq!(
values.len(),
self.accumulators.len(),
"set_accumulators_for_test length mismatch"
);
self.accumulators.clone_from_slice(values);
}
}
#[must_use]
pub fn ams_sign(seed: u64, pgno: u64) -> i8 {
let h = mix64(seed ^ pgno);
if (h & 1) == 0 { 1 } else { -1 }
}
#[must_use]
pub fn dedup_write_set(write_set: &[u64]) -> Vec<u64> {
let mut dedup = write_set.to_vec();
dedup.sort_unstable();
dedup.dedup();
dedup
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(clippy::struct_field_names)]
pub struct SpaceSavingEntry {
pub pgno: u64,
pub count_hat: u64,
pub err: u64,
}
impl SpaceSavingEntry {
#[must_use]
pub fn count_lower_bound(self) -> u64 {
self.count_hat.saturating_sub(self.err)
}
}
#[derive(Debug, Clone)]
pub struct SpaceSavingSummary {
capacity: usize,
entries: Vec<SpaceSavingEntry>,
}
impl SpaceSavingSummary {
#[must_use]
pub fn new(capacity: usize) -> Self {
assert!(
validate_heavy_hitter_k(capacity),
"SpaceSaving K must be in [{MIN_HEAVY_HITTER_K}, {MAX_HEAVY_HITTER_K}], got {capacity}"
);
Self {
capacity,
entries: Vec::with_capacity(capacity),
}
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn clear(&mut self) {
self.entries.clear();
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[must_use]
pub fn entry_for(&self, pgno: u64) -> Option<SpaceSavingEntry> {
self.entries
.iter()
.find(|entry| entry.pgno == pgno)
.copied()
}
pub fn observe_incidence(&mut self, pgno: u64) {
if let Some(entry) = self.entries.iter_mut().find(|entry| entry.pgno == pgno) {
entry.count_hat = entry.count_hat.saturating_add(1);
return;
}
if self.entries.len() < self.capacity {
self.entries.push(SpaceSavingEntry {
pgno,
count_hat: 1,
err: 0,
});
return;
}
let min_index = self
.entries
.iter()
.enumerate()
.min_by_key(|(_, entry)| (entry.count_hat, entry.pgno))
.map(|(index, _)| index)
.expect("SpaceSaving non-empty when replacing minimum");
let min_count = self.entries[min_index].count_hat;
self.entries[min_index] = SpaceSavingEntry {
pgno,
count_hat: min_count.saturating_add(1),
err: min_count,
};
}
pub fn observe_write_set(&mut self, write_set: &[u64]) {
for pgno in dedup_write_set(write_set) {
self.observe_incidence(pgno);
}
}
#[must_use]
pub fn entries_sorted(&self) -> Vec<SpaceSavingEntry> {
let mut sorted = self.entries.clone();
sorted.sort_by(|left, right| {
right
.count_hat
.cmp(&left.count_hat)
.then(left.pgno.cmp(&right.pgno))
});
sorted
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[allow(clippy::struct_field_names)]
pub struct HeadTailDecomposition {
pub f2_head_upper: u128,
pub f2_head_lower: u128,
pub f2_tail_hat: u128,
pub head_contrib_upper: f64,
pub head_contrib_lower: f64,
pub tail_contrib_hat: f64,
}
#[must_use]
fn square_u64(value: u64) -> u128 {
u128::from(value) * u128::from(value)
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn compute_head_tail_decomposition(
entries: &[SpaceSavingEntry],
f2_hat: u128,
txn_count: u64,
) -> HeadTailDecomposition {
let f2_head_upper: u128 = entries
.iter()
.map(|entry| square_u64(entry.count_hat))
.sum();
let f2_head_lower: u128 = entries
.iter()
.map(|entry| square_u64(entry.count_lower_bound()))
.sum();
let f2_tail_hat = f2_hat.saturating_sub(f2_head_lower);
let (head_contrib_upper, head_contrib_lower, tail_contrib_hat) = if txn_count == 0 {
(0.0, 0.0, 0.0)
} else {
let denom = {
let txn_count_f64 = txn_count as f64;
txn_count_f64 * txn_count_f64
};
(
f2_head_upper as f64 / denom,
f2_head_lower as f64 / denom,
f2_tail_hat as f64 / denom,
)
};
HeadTailDecomposition {
f2_head_upper,
f2_head_lower,
f2_tail_hat,
head_contrib_upper,
head_contrib_lower,
tail_contrib_hat,
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[allow(clippy::struct_field_names)]
pub struct ZipfMleResult {
pub s_hat: f64,
pub window_n: u64,
pub iterations: usize,
pub converged: bool,
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
fn zipf_harmonic_terms(k: usize, s: f64) -> (f64, f64, f64) {
let mut h = 0.0_f64;
let mut h_prime = 0.0_f64;
let mut h_second = 0.0_f64;
for rank in 1..=k {
let rank_f64 = rank as f64;
let ln_rank = rank_f64.ln();
let inv_pow = rank_f64.powf(-s);
h += inv_pow;
h_prime -= ln_rank * inv_pow;
h_second += ln_rank * ln_rank * inv_pow;
}
(h, h_prime, h_second)
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn zipf_mle_from_ranked_counts(ranked_counts: &[u64]) -> Option<ZipfMleResult> {
let positive_counts: Vec<u64> = ranked_counts
.iter()
.copied()
.filter(|count| *count > 0)
.collect();
if positive_counts.len() < 2 {
return None;
}
let window_n: u64 = positive_counts.iter().sum();
if window_n == 0 {
return None;
}
let weighted_log_rank: f64 = positive_counts
.iter()
.enumerate()
.map(|(index, count)| *count as f64 * ((index + 1) as f64).ln())
.sum();
let window_n_f64 = window_n as f64;
let mut s = 1.0_f64;
let mut iterations = 0usize;
let mut converged = false;
for iteration in 0..DEFAULT_ZIPF_MAX_ITERS {
iterations = iteration + 1;
let (harmonic, harmonic_prime, harmonic_second) =
zipf_harmonic_terms(positive_counts.len(), s);
if !harmonic.is_finite() || harmonic <= f64::EPSILON {
break;
}
let grad = -weighted_log_rank - window_n_f64 * harmonic_prime / harmonic;
if !grad.is_finite() {
break;
}
if grad.abs() < 1e-10 {
converged = true;
break;
}
let harmonic_prime_sq = harmonic_prime * harmonic_prime;
let curvature = harmonic_second.mul_add(harmonic, -harmonic_prime_sq);
let harmonic_sq = harmonic * harmonic;
let hess = (-window_n_f64 * curvature) / harmonic_sq;
if !hess.is_finite() {
break;
}
let step = if hess.abs() > 1e-12 {
grad / hess
} else {
-grad.signum() * 0.05
};
let next = (s - step).clamp(ZIPF_S_MIN, ZIPF_S_MAX);
if (next - s).abs() < 1e-8 {
s = next;
converged = true;
break;
}
s = next;
}
Some(ZipfMleResult {
s_hat: s.clamp(ZIPF_S_MIN, ZIPF_S_MAX),
window_n,
iterations,
converged,
})
}
#[must_use]
pub fn policy_collision_mass_input(m2_hat: Option<f64>, _zipf_s_hat: Option<f64>) -> Option<f64> {
m2_hat
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[allow(clippy::struct_field_names)]
pub struct HeavyHitterLedgerEntry {
pub pgno: u64,
pub count_hat: u64,
pub err: u64,
pub contrib_upper: f64,
}
#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::struct_field_names)]
pub struct AmsEvidenceLedger {
pub txn_count: u64,
pub window_duration_ticks: u64,
pub regime_id: u64,
pub f2_hat: u128,
pub m2_hat: Option<f64>,
pub p_eff_hat: f64,
pub sketch_r: usize,
pub sketch_db_epoch: u64,
pub sketch_version: &'static str,
pub sketch_window_id: u64,
pub heavy_hitter_k: Option<usize>,
pub heavy_hitters: Vec<HeavyHitterLedgerEntry>,
pub head_contrib_lower: Option<f64>,
pub head_contrib_upper: Option<f64>,
pub tail_contrib_hat: Option<f64>,
pub zipf_s_hat: Option<f64>,
pub zipf_window_n: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WindowCloseReason {
Boundary,
ManualFlush,
OverflowGuard,
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_field_names)]
pub struct AmsWindowEstimate {
pub regime_id: u64,
pub window_id: u64,
pub window_start_tick: u64,
pub window_end_tick: u64,
pub sketch_r: usize,
pub sketch_db_epoch: u64,
pub sketch_version: &'static str,
pub txn_count: u64,
pub f2_hat: u128,
pub m2_hat: Option<f64>,
pub p_eff_hat: f64,
pub heavy_hitter_k: Option<usize>,
pub heavy_hitters: Vec<SpaceSavingEntry>,
pub head_tail: Option<HeadTailDecomposition>,
pub zipf: Option<ZipfMleResult>,
pub exact_f2: Option<u128>,
pub exact_m2: Option<f64>,
pub close_reason: WindowCloseReason,
}
impl AmsWindowEstimate {
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn to_evidence_ledger(&self) -> AmsEvidenceLedger {
let mut heavy_hitters = self.heavy_hitters.clone();
heavy_hitters.sort_by(|left, right| {
right
.count_hat
.cmp(&left.count_hat)
.then(left.pgno.cmp(&right.pgno))
});
let denom = if self.txn_count == 0 {
None
} else {
let txn_count_f64 = self.txn_count as f64;
Some(txn_count_f64 * txn_count_f64)
};
let heavy_hitter_rows = heavy_hitters
.into_iter()
.map(|entry| {
let contrib_upper =
denom.map_or(0.0, |value| square_u64(entry.count_hat) as f64 / value);
HeavyHitterLedgerEntry {
pgno: entry.pgno,
count_hat: entry.count_hat,
err: entry.err,
contrib_upper,
}
})
.collect();
AmsEvidenceLedger {
txn_count: self.txn_count,
window_duration_ticks: self.window_end_tick.saturating_sub(self.window_start_tick),
regime_id: self.regime_id,
f2_hat: self.f2_hat,
m2_hat: self.m2_hat,
p_eff_hat: self.p_eff_hat,
sketch_r: self.sketch_r,
sketch_db_epoch: self.sketch_db_epoch,
sketch_version: self.sketch_version,
sketch_window_id: self.window_id,
heavy_hitter_k: self.heavy_hitter_k,
heavy_hitters: heavy_hitter_rows,
head_contrib_lower: self.head_tail.map(|value| value.head_contrib_lower),
head_contrib_upper: self.head_tail.map(|value| value.head_contrib_upper),
tail_contrib_hat: self.head_tail.map(|value| value.tail_contrib_hat),
zipf_s_hat: self.zipf.map(|value| value.s_hat),
zipf_window_n: self.zipf.map(|value| value.window_n),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct AmsWindowCollectorConfig {
pub r: usize,
pub db_epoch: u64,
pub regime_id: u64,
pub window_width_ticks: u64,
pub track_exact_m2: bool,
pub track_heavy_hitters: bool,
pub heavy_hitter_k: usize,
pub estimate_zipf: bool,
}
impl Default for AmsWindowCollectorConfig {
fn default() -> Self {
Self {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_width_ticks: 10,
track_exact_m2: false,
track_heavy_hitters: false,
heavy_hitter_k: DEFAULT_HEAVY_HITTER_K,
estimate_zipf: false,
}
}
}
#[derive(Debug, Clone)]
pub struct AmsWindowCollector {
config: AmsWindowCollectorConfig,
active_window_id: u64,
active_window_start_tick: u64,
sketch: AmsSketch,
exact_incidence: Option<HashMap<u64, u64>>,
heavy_hitter_summary: Option<SpaceSavingSummary>,
}
impl AmsWindowCollector {
#[must_use]
pub fn new(config: AmsWindowCollectorConfig, start_tick: u64) -> Self {
assert!(
config.window_width_ticks > 0,
"window_width_ticks must be > 0"
);
if config.track_heavy_hitters {
assert!(
validate_heavy_hitter_k(config.heavy_hitter_k),
"heavy_hitter_k must be in [{MIN_HEAVY_HITTER_K}, {MAX_HEAVY_HITTER_K}], got {}",
config.heavy_hitter_k
);
}
let sketch = AmsSketch::new(&AmsSketchConfig {
r: config.r,
db_epoch: config.db_epoch,
regime_id: config.regime_id,
window_id: 0,
});
Self {
config,
active_window_id: 0,
active_window_start_tick: start_tick,
sketch,
exact_incidence: config.track_exact_m2.then(HashMap::new),
heavy_hitter_summary: config
.track_heavy_hitters
.then(|| SpaceSavingSummary::new(config.heavy_hitter_k)),
}
}
pub fn observe_commit_attempt(
&mut self,
tick: u64,
write_set: &[u64],
) -> Vec<AmsWindowEstimate> {
let mut closed = self.rotate_until_tick(tick);
let dedup = dedup_write_set(write_set);
if self.sketch.would_overflow_for_txn(dedup.len()) {
closed.push(self.finalize_active_window(tick, WindowCloseReason::OverflowGuard));
self.advance_window_to_start(tick);
}
self.sketch.observe_write_set(&dedup);
if let Some(summary) = self.heavy_hitter_summary.as_mut() {
for &pgno in &dedup {
summary.observe_incidence(pgno);
}
}
if let Some(exact) = self.exact_incidence.as_mut() {
for pgno in dedup {
*exact.entry(pgno).or_default() += 1;
}
}
closed
}
pub fn force_flush(&mut self, end_tick: u64) -> AmsWindowEstimate {
let normalized_end = end_tick.max(self.active_window_start_tick);
let estimate = self.finalize_active_window(normalized_end, WindowCloseReason::ManualFlush);
self.advance_window_to_start(normalized_end);
estimate
}
#[must_use]
pub fn exact_count_for_page(&self, pgno: u64) -> Option<u64> {
self.exact_incidence
.as_ref()
.and_then(|counts| counts.get(&pgno).copied())
}
#[must_use]
pub fn heavy_hitter_entry_for(&self, pgno: u64) -> Option<SpaceSavingEntry> {
self.heavy_hitter_summary
.as_ref()
.and_then(|summary| summary.entry_for(pgno))
}
#[must_use]
pub fn active_heavy_hitters_sorted(&self) -> Option<Vec<SpaceSavingEntry>> {
self.heavy_hitter_summary
.as_ref()
.map(SpaceSavingSummary::entries_sorted)
}
#[must_use]
pub fn active_window_id(&self) -> u64 {
self.active_window_id
}
#[must_use]
pub fn active_window_start_tick(&self) -> u64 {
self.active_window_start_tick
}
#[must_use]
pub fn active_window_end_tick(&self) -> u64 {
self.active_window_start_tick
.saturating_add(self.config.window_width_ticks)
}
fn rotate_until_tick(&mut self, tick: u64) -> Vec<AmsWindowEstimate> {
let mut closed = Vec::new();
while tick >= self.active_window_end_tick() {
let end_tick = self.active_window_end_tick();
closed.push(self.finalize_active_window(end_tick, WindowCloseReason::Boundary));
self.advance_window_to_start(end_tick);
}
closed
}
fn finalize_active_window(
&self,
end_tick: u64,
close_reason: WindowCloseReason,
) -> AmsWindowEstimate {
let txn_count = self.sketch.txn_count();
let f2_hat = self.sketch.f2_hat();
let m2_hat = self.sketch.m2_hat();
let p_eff_hat = self.sketch.p_eff_hat();
let (heavy_hitter_k, heavy_hitters, head_tail, zipf) =
if let Some(summary) = self.heavy_hitter_summary.as_ref() {
let heavy_hitters = summary.entries_sorted();
let head_tail = Some(compute_head_tail_decomposition(
&heavy_hitters,
f2_hat,
txn_count,
));
let zipf = if self.config.estimate_zipf {
let ranked_counts: Vec<u64> = heavy_hitters
.iter()
.map(|entry| entry.count_lower_bound())
.filter(|count| *count > 0)
.collect();
zipf_mle_from_ranked_counts(&ranked_counts)
} else {
None
};
(Some(summary.capacity()), heavy_hitters, head_tail, zipf)
} else {
(None, Vec::new(), None, None)
};
let exact_f2 = self.exact_incidence.as_ref().map(|counts| {
counts
.values()
.map(|&count| u128::from(count) * u128::from(count))
.sum()
});
let exact_m2_value = exact_f2.and_then(|f2| {
if txn_count == 0 {
None
} else {
let tc = txn_count as f64;
Some(f2 as f64 / (tc * tc))
}
});
AmsWindowEstimate {
regime_id: self.config.regime_id,
window_id: self.active_window_id,
window_start_tick: self.active_window_start_tick,
window_end_tick: end_tick,
sketch_r: self.config.r,
sketch_db_epoch: self.config.db_epoch,
sketch_version: AMS_SKETCH_VERSION,
txn_count,
f2_hat,
m2_hat,
p_eff_hat,
heavy_hitter_k,
heavy_hitters,
head_tail,
zipf,
exact_f2,
exact_m2: exact_m2_value,
close_reason,
}
}
fn advance_window_to_start(&mut self, start_tick: u64) {
self.active_window_id = self.active_window_id.saturating_add(1);
self.active_window_start_tick = start_tick;
self.sketch = AmsSketch::new(&AmsSketchConfig {
r: self.config.r,
db_epoch: self.config.db_epoch,
regime_id: self.config.regime_id,
window_id: self.active_window_id,
});
if let Some(exact) = self.exact_incidence.as_mut() {
exact.clear();
}
if let Some(summary) = self.heavy_hitter_summary.as_mut() {
summary.clear();
}
}
}
#[must_use]
pub fn effective_w_leaf_split(base_write_set_size: u64, split_pages: u64) -> u64 {
base_write_set_size
.saturating_sub(1)
.saturating_add(split_pages.max(3))
}
#[must_use]
pub fn effective_w_root_split(base_write_set_size: u64, root_split_pages: u64) -> u64 {
base_write_set_size
.saturating_sub(1)
.saturating_add(root_split_pages.max(4))
}
#[must_use]
pub fn effective_w_index_multiplier(
base_write_set_size: u64,
index_count: u64,
split_factor: u64,
) -> u64 {
let multiplied = base_write_set_size.saturating_mul(1_u64.saturating_add(index_count));
multiplied.saturating_mul(split_factor.max(1))
}
#[derive(Debug, Clone, Default)]
pub struct InstrumentationCounters {
pub conflicts_detected: u64,
pub conflicts_merged_rebase: u64,
pub conflicts_merged_structured: u64,
pub conflicts_aborted: u64,
pub total_commits: u64,
pub writers_active_histogram: Vec<u64>,
pub pages_per_commit_histogram: Vec<u64>,
pub retry_attempts_histogram: Vec<u64>,
pub retry_wait_ms_histogram: Vec<u64>,
}
impl InstrumentationCounters {
pub fn record_conflict(&mut self) {
self.conflicts_detected = self.conflicts_detected.saturating_add(1);
}
pub fn record_merge_rebase(&mut self) {
self.conflicts_merged_rebase = self.conflicts_merged_rebase.saturating_add(1);
}
pub fn record_merge_structured(&mut self) {
self.conflicts_merged_structured = self.conflicts_merged_structured.saturating_add(1);
}
pub fn record_abort(&mut self) {
self.conflicts_aborted = self.conflicts_aborted.saturating_add(1);
}
pub fn record_commit(&mut self, write_set_size: usize, active_writers: usize) {
self.total_commits = self.total_commits.saturating_add(1);
if active_writers >= self.writers_active_histogram.len() {
self.writers_active_histogram.resize(active_writers + 1, 0);
}
self.writers_active_histogram[active_writers] =
self.writers_active_histogram[active_writers].saturating_add(1);
if write_set_size >= self.pages_per_commit_histogram.len() {
self.pages_per_commit_histogram
.resize(write_set_size + 1, 0);
}
self.pages_per_commit_histogram[write_set_size] =
self.pages_per_commit_histogram[write_set_size].saturating_add(1);
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn pages_per_commit_m2(&self) -> Option<f64> {
if self.total_commits == 0 {
return None;
}
let sum_w2: u128 = self
.pages_per_commit_histogram
.iter()
.enumerate()
.map(|(w, &count)| {
let w_u128 = w as u128;
w_u128 * w_u128 * u128::from(count)
})
.sum();
Some(sum_w2 as f64 / self.total_commits as f64)
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn f_merge(&self) -> Option<f64> {
if self.conflicts_detected == 0 {
return None;
}
let merged = self
.conflicts_merged_rebase
.saturating_add(self.conflicts_merged_structured);
Some(merged as f64 / self.conflicts_detected as f64)
}
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn p_drift(n_active_writers: u64, m2_hat: f64) -> f64 {
if n_active_writers < 2 {
return 0.0;
}
let n_minus_1 = (n_active_writers - 1) as f64;
1.0 - (-n_minus_1 * m2_hat).exp()
}
#[must_use]
pub fn p_abort_attempt(p_drift_value: f64, f_merge_value: f64) -> f64 {
p_drift_value * (1.0 - f_merge_value.clamp(0.0, 1.0))
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn tps_estimate(
n_active_writers: u64,
p_abort_attempt_value: f64,
t_attempt_seconds: f64,
) -> f64 {
if t_attempt_seconds <= 0.0 || !t_attempt_seconds.is_finite() {
return 0.0;
}
let n = n_active_writers as f64;
n * (1.0 - p_abort_attempt_value.clamp(0.0, 1.0)) / t_attempt_seconds
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
const BEAD_ID: &str = "bd-3iwr";
const BEAD_ID_26BE: &str = "bd-26be";
const BEAD_ID_3U2V: &str = "bd-3u2v";
#[allow(
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_sign_loss
)]
fn synthetic_zipf_counts(k: usize, s: f64, scale: f64) -> Vec<u64> {
(1..=k)
.map(|rank| (((rank as f64).powf(-s) * scale).round() as u64).max(1))
.collect()
}
#[test]
fn test_nitro_sketch_precision_validation() {
assert!(validate_nitro_precision(DEFAULT_NITRO_PRECISION));
assert!(validate_nitro_precision(MIN_NITRO_PRECISION));
assert!(validate_nitro_precision(MAX_NITRO_PRECISION));
assert!(!validate_nitro_precision(
MIN_NITRO_PRECISION.saturating_sub(1)
));
assert!(!validate_nitro_precision(
MAX_NITRO_PRECISION.saturating_add(1)
));
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_nitro_sketch_cardinality_accuracy_one_million_distinct() {
let mut sketch = NitroSketch::new(&NitroSketchConfig {
precision: DEFAULT_NITRO_PRECISION,
seed: 0x00C0_FFEE_u64,
});
for value in 0_u64..1_000_000_u64 {
sketch.observe_u64(value);
}
let estimate = sketch.estimate_cardinality();
let exact = 1_000_000_f64;
let relative_error = ((estimate - exact) / exact).abs();
assert!(
relative_error <= 0.05,
"bead_id={BEAD_ID_3U2V} case=nitro_accuracy_1m estimate={estimate} exact={exact} rel_error={relative_error}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_nitro_sketch_deterministic_replay() {
let config = NitroSketchConfig {
precision: DEFAULT_NITRO_PRECISION,
seed: 0xFACE_u64,
};
let mut a = NitroSketch::new(&config);
let mut b = NitroSketch::new(&config);
for value in 0_u64..200_000_u64 {
let mixed = mix64(value.wrapping_mul(31).wrapping_add(7));
a.observe_u64(mixed);
b.observe_u64(mixed);
}
let estimate_a = a.estimate_cardinality();
let estimate_b = b.estimate_cardinality();
assert!(
(estimate_a - estimate_b).abs() <= f64::EPSILON,
"bead_id={BEAD_ID_3U2V} case=nitro_determinism estimate_a={estimate_a} estimate_b={estimate_b}"
);
}
#[test]
fn test_pairwise_conflict_uniform() {
let w: u64 = 100;
let p: u64 = 1_000_000;
let prob = pairwise_conflict_probability(w, p);
let expected = 1.0 - (-10_000.0_f64 / 1_000_000.0).exp(); let rel_error = ((prob - expected) / expected).abs();
assert!(
rel_error < 0.01,
"bead_id={BEAD_ID} pairwise prob={prob} expected={expected} rel_error={rel_error}"
);
}
#[test]
fn test_birthday_paradox_n_writers() {
let prob = birthday_conflict_probability_uniform(10, 100, 1_000_000);
let exponent: f64 = 10.0 * 9.0 * 10_000.0 / (2.0 * 1_000_000.0);
assert!(
(exponent - 0.45).abs() < 1e-10,
"bead_id={BEAD_ID} exponent={exponent}"
);
let expected: f64 = 1.0 - (-exponent).exp();
assert!(
(prob - expected).abs() < 1e-10,
"bead_id={BEAD_ID} birthday prob={prob} expected={expected}"
);
assert!(
(prob - 0.3624).abs() < 0.01,
"bead_id={BEAD_ID} birthday ~36%: {prob}"
);
}
#[test]
fn test_collision_mass_uniform() {
let w: f64 = 100.0;
let p: f64 = 1_000_000.0;
let txn_count: u64 = 1000;
let m2_theoretical = (w * w) / p;
assert!(
(m2_theoretical - 0.01).abs() < 1e-10,
"bead_id={BEAD_ID} m2_uniform={m2_theoretical}"
);
let p_eff = effective_collision_pool(m2_theoretical);
let expected_p_eff = p / (w * w);
assert!(
(p_eff - expected_p_eff).abs() < 1e-6,
"bead_id={BEAD_ID} p_eff={p_eff} expected={expected_p_eff}"
);
let counts = vec![10u64; 100];
let m2 = exact_m2(&counts, txn_count).expect("non-zero txn_count");
assert!((m2 - 0.01).abs() < 1e-10, "bead_id={BEAD_ID} exact_m2={m2}");
}
#[test]
fn test_ams_sketch_exact_small() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 1,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
let mut incidence: HashMap<u64, u64> = HashMap::new();
for txn_id in 0u64..20 {
let pages = [txn_id % 50, (txn_id * 7 + 3) % 50, (txn_id * 13 + 17) % 50];
let mut dedup: Vec<u64> = pages.to_vec();
dedup.sort_unstable();
dedup.dedup();
for &pg in &dedup {
*incidence.entry(pg).or_default() += 1;
}
sketch.observe_write_set(&dedup);
}
let exact_f2: u128 = incidence
.values()
.map(|&c| u128::from(c) * u128::from(c))
.sum();
let f2_hat = sketch.f2_hat();
let ratio = if exact_f2 > 0 {
f2_hat as f64 / exact_f2 as f64
} else {
1.0
};
assert!(
(0.1..=10.0).contains(&ratio),
"bead_id={BEAD_ID} f2_hat={f2_hat} exact_f2={exact_f2} ratio={ratio}"
);
}
#[test]
fn test_ams_sketch_deterministic_replay() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 42,
regime_id: 1,
window_id: 7,
};
let run = || {
let mut sketch = AmsSketch::new(&config);
for txn_id in 0u64..100 {
let pages: Vec<u64> = (0..5).map(|i| (txn_id * 31 + i * 17) % 1000).collect();
sketch.observe_write_set(&pages);
}
sketch.f2_hat()
};
let f2_a = run();
let f2_b = run();
assert_eq!(
f2_a, f2_b,
"bead_id={BEAD_ID} deterministic_replay: {f2_a} != {f2_b}"
);
}
#[test]
fn test_ams_sketch_sign_hash_deterministic() {
let seed = 0xDEAD_BEEF_CAFE_BABEu64;
for pgno in 0u64..1000 {
let s1 = ams_sign(seed, pgno);
let s2 = ams_sign(seed, pgno);
assert_eq!(s1, s2, "bead_id={BEAD_ID} sign_deterministic pgno={pgno}");
assert!(
s1 == 1 || s1 == -1,
"bead_id={BEAD_ID} sign_range pgno={pgno} sign={s1}"
);
}
}
#[test]
fn test_ams_sketch_overflow_protection() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
for _ in 0..1_000_000u64 {
sketch.observe_write_set(&[42]);
}
let f2 = sketch.f2_hat();
let expected = 1_000_000u128 * 1_000_000;
assert_eq!(
f2, expected,
"bead_id={BEAD_ID} overflow_protection: f2={f2} expected={expected}"
);
}
#[test]
fn test_ams_sketch_memory_bound() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let sketch = AmsSketch::new(&config);
let mem = sketch.memory_bytes();
assert!(
mem <= 16 * 1024,
"bead_id={BEAD_ID} memory_bound: {mem} bytes > 16 KiB"
);
assert_eq!(mem, 296, "bead_id={BEAD_ID} memory_exact");
}
#[test]
fn test_m2_hat_zero_txn_count() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let sketch = AmsSketch::new(&config);
assert_eq!(sketch.m2_hat(), None, "bead_id={BEAD_ID} m2_hat_zero_txn");
assert!(
sketch.p_eff_hat().is_infinite(),
"bead_id={BEAD_ID} p_eff_hat_infinity"
);
assert_eq!(
exact_m2(&[1, 2, 3], 0),
None,
"bead_id={BEAD_ID} exact_m2_zero"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_m2_hat_tracks_skew() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 99,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
let mut incidence: HashMap<u64, u64> = HashMap::new();
let num_pages = 1000u64;
let txn_count = 500u64;
for txn_id in 0..txn_count {
let mut pages = Vec::new();
for i in 0u64..5 {
let h = mix64(txn_id.wrapping_mul(1337).wrapping_add(i));
let rank = (h % num_pages) + 1;
let page = num_pages / rank; pages.push(page);
}
pages.sort_unstable();
pages.dedup();
for &pg in &pages {
*incidence.entry(pg).or_default() += 1;
}
sketch.observe_write_set(&pages);
}
let m2_hat = sketch.m2_hat().expect("non-zero txn_count");
let exact_f2: u128 = incidence
.values()
.map(|&c| u128::from(c) * u128::from(c))
.sum();
let exact_m2_val = exact_f2 as f64 / (txn_count as f64 * txn_count as f64);
let uniform_m2 = 25.0 / 1000.0;
assert!(
exact_m2_val > uniform_m2 * 2.0,
"bead_id={BEAD_ID} skew_exact_m2={exact_m2_val} uniform={uniform_m2}"
);
let ratio = m2_hat / exact_m2_val;
assert!(
(0.1..=10.0).contains(&ratio),
"bead_id={BEAD_ID} m2_hat={m2_hat} exact_m2={exact_m2_val} ratio={ratio}"
);
}
#[test]
fn test_birthday_paradox_with_m2() {
let n: u64 = 10;
let w: u64 = 100;
let p: u64 = 1_000_000;
let m2_uniform = (w * w) as f64 / p as f64;
let prob_uniform = birthday_conflict_probability_uniform(n, w, p);
let prob_m2 = birthday_conflict_probability_m2(n, m2_uniform);
assert!(
(prob_uniform - prob_m2).abs() < 1e-10,
"bead_id={BEAD_ID} birthday_m2 uniform={prob_uniform} m2={prob_m2}"
);
}
#[test]
fn test_mix64_splitmix_golden() {
let r0 = mix64(0);
assert_eq!(r0, mix64(0), "bead_id={BEAD_ID} mix64_deterministic_0");
let r1 = mix64(1);
assert_ne!(r0, r1, "bead_id={BEAD_ID} mix64_avalanche_0_1");
assert_eq!(
r0, 0xE220_A839_7B1D_CDAF,
"bead_id={BEAD_ID} mix64_golden_0: got {r0:#018X}"
);
let expected_1 = mix64(1);
assert_eq!(r1, expected_1, "bead_id={BEAD_ID} mix64_golden_1");
let r_max = mix64(u64::MAX);
assert_eq!(
r_max,
mix64(u64::MAX),
"bead_id={BEAD_ID} mix64_deterministic_max"
);
}
#[test]
fn test_ams_sign_hash_deterministic() {
let seed = 0xDEAD_BEEF_CAFE_BABEu64;
for pgno in 0u64..10_000 {
let lhs = ams_sign(seed, pgno);
let rhs = ams_sign(seed, pgno);
assert_eq!(
lhs, rhs,
"bead_id={BEAD_ID_26BE} case=ams_sign_hash_deterministic pgno={pgno}"
);
}
}
#[test]
fn test_ams_sign_hash_balance() {
let seed = 0xABCD_EF01_0203_0405u64;
let mut pos = 0i64;
let mut neg = 0i64;
for pgno in 0u64..10_000 {
if ams_sign(seed, pgno) > 0 {
pos += 1;
} else {
neg += 1;
}
}
let imbalance = (pos - neg).abs();
assert!(
imbalance <= 250,
"bead_id={BEAD_ID_26BE} case=ams_sign_hash_balance pos={pos} neg={neg} imbalance={imbalance}"
);
}
#[test]
fn test_mix64_golden_vectors() {
assert_eq!(
mix64(0),
0xE220_A839_7B1D_CDAF,
"bead_id={BEAD_ID_26BE} case=mix64_golden_0"
);
assert_eq!(
mix64(1),
0x910A_2DEC_8902_5CC1,
"bead_id={BEAD_ID_26BE} case=mix64_golden_1"
);
assert_eq!(
mix64(u64::MAX),
0xE4D9_7177_1B65_2C20,
"bead_id={BEAD_ID_26BE} case=mix64_golden_max"
);
}
#[test]
fn test_ams_update_single_page() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 7,
regime_id: 9,
window_id: 11,
};
let mut sketch = AmsSketch::new(&config);
sketch.observe_write_set(&[42]);
assert_eq!(sketch.txn_count(), 1);
for (idx, &acc) in sketch.accumulators().iter().enumerate() {
let seed = sketch
.seed_for_index(idx)
.expect("seed must exist for accumulator");
let expected = i128::from(ams_sign(seed, 42));
assert_eq!(
acc, expected,
"bead_id={BEAD_ID_26BE} case=single_page_update idx={idx} acc={acc} expected={expected}"
);
}
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_ams_f2_hat_exact_small() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 1,
regime_id: 1,
window_id: 1,
};
let mut sketch = AmsSketch::new(&config);
let mut exact_counts: HashMap<u64, u64> = HashMap::new();
for txn in 0u64..10 {
let pages = dedup_write_set(&[txn % 5, (txn + 1) % 5, (txn * 3 + 2) % 5]);
for &pgno in &pages {
*exact_counts.entry(pgno).or_default() += 1;
}
sketch.observe_write_set(&pages);
}
let exact_f2: u128 = exact_counts
.values()
.map(|&count| u128::from(count) * u128::from(count))
.sum();
let f2_hat = sketch.f2_hat();
let ratio = f2_hat as f64 / exact_f2 as f64;
assert!(
(0.2..=5.0).contains(&ratio),
"bead_id={BEAD_ID_26BE} case=f2_exact_small f2_hat={f2_hat} exact_f2={exact_f2} ratio={ratio}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_ams_f2_hat_uniform_convergence() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 17,
regime_id: 3,
window_id: 5,
};
let mut sketch = AmsSketch::new(&config);
let mut exact_counts: HashMap<u64, u64> = HashMap::new();
let pages = 1000u64;
for txn in 0u64..1000 {
let mut write_set = Vec::with_capacity(5);
for i in 0u64..5 {
let h = mix64(txn.wrapping_mul(97).wrapping_add(i));
write_set.push(h % pages);
}
let write_set = dedup_write_set(&write_set);
for &pgno in &write_set {
*exact_counts.entry(pgno).or_default() += 1;
}
sketch.observe_write_set(&write_set);
}
let exact_f2: u128 = exact_counts
.values()
.map(|&count| u128::from(count) * u128::from(count))
.sum();
let f2_hat = sketch.f2_hat();
let relative_error = ((f2_hat as f64 - exact_f2 as f64) / exact_f2 as f64).abs();
assert!(
relative_error <= 0.30,
"bead_id={BEAD_ID_26BE} case=uniform_convergence f2_hat={f2_hat} exact_f2={exact_f2} rel_error={relative_error}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_ams_f2_hat_skewed_convergence() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 19,
regime_id: 4,
window_id: 6,
};
let mut sketch = AmsSketch::new(&config);
let mut exact_counts: HashMap<u64, u64> = HashMap::new();
let pages = 1000u64;
for txn in 0u64..1000 {
let mut write_set = Vec::with_capacity(5);
for i in 0u64..5 {
let rank = (mix64(txn.wrapping_mul(131).wrapping_add(i)) % pages) + 1;
write_set.push(pages / rank);
}
let write_set = dedup_write_set(&write_set);
for &pgno in &write_set {
*exact_counts.entry(pgno).or_default() += 1;
}
sketch.observe_write_set(&write_set);
}
let exact_f2: u128 = exact_counts
.values()
.map(|&count| u128::from(count) * u128::from(count))
.sum();
let f2_hat = sketch.f2_hat();
let relative_error = ((f2_hat as f64 - exact_f2 as f64) / exact_f2 as f64).abs();
assert!(
relative_error <= 0.30,
"bead_id={BEAD_ID_26BE} case=skewed_convergence f2_hat={f2_hat} exact_f2={exact_f2} rel_error={relative_error}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_m2_hat_computation() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
for _ in 0..10 {
sketch.observe_write_set(&[7]);
}
let m2_hat = sketch.m2_hat().expect("non-zero txn_count");
assert!(
(m2_hat - 1.0).abs() < 1e-9,
"bead_id={BEAD_ID_26BE} case=m2_hat_computation m2_hat={m2_hat}"
);
}
#[test]
fn test_peff_hat_guard_zero_txn() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let sketch = AmsSketch::new(&config);
assert_eq!(
sketch.m2_hat(),
None,
"bead_id={BEAD_ID_26BE} case=zero_txn_m2_none"
);
assert!(
sketch.p_eff_hat().is_infinite(),
"bead_id={BEAD_ID_26BE} case=zero_txn_peff_inf"
);
}
#[test]
fn test_peff_hat_guard_zero_m2() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
sketch.observe_write_set(&[]);
assert_eq!(
sketch.m2_hat(),
Some(0.0),
"bead_id={BEAD_ID_26BE} case=zero_m2_guard"
);
assert!(
sketch.p_eff_hat().is_infinite(),
"bead_id={BEAD_ID_26BE} case=zero_m2_peff_inf"
);
}
#[test]
fn test_window_deterministic_lab() {
let config = AmsWindowCollectorConfig {
r: DEFAULT_AMS_R,
db_epoch: 42,
regime_id: 5,
window_width_ticks: 10,
track_exact_m2: true,
track_heavy_hitters: false,
heavy_hitter_k: DEFAULT_HEAVY_HITTER_K,
estimate_zipf: false,
};
let trace = vec![
(0u64, vec![1u64, 2, 2, 7]),
(3, vec![7, 9, 9, 9]),
(11, vec![1, 8, 8]),
(18, vec![3, 3, 3, 4]),
(26, vec![5, 6, 6, 6]),
];
let run = || {
let mut collector = AmsWindowCollector::new(config, 0);
let mut closed = Vec::new();
for (tick, write_set) in &trace {
closed.extend(collector.observe_commit_attempt(*tick, write_set));
}
closed.push(collector.force_flush(30));
closed
};
let lhs = run();
let rhs = run();
assert_eq!(
lhs.len(),
rhs.len(),
"bead_id={BEAD_ID_26BE} case=window_deterministic_len"
);
for (index, (left, right)) in lhs.iter().zip(rhs.iter()).enumerate() {
assert_eq!(left.window_id, right.window_id);
assert_eq!(left.window_start_tick, right.window_start_tick);
assert_eq!(left.window_end_tick, right.window_end_tick);
assert_eq!(left.txn_count, right.txn_count);
assert_eq!(left.f2_hat, right.f2_hat);
assert_eq!(left.close_reason, right.close_reason);
assert_eq!(
left.m2_hat.map(f64::to_bits),
right.m2_hat.map(f64::to_bits),
"bead_id={BEAD_ID_26BE} case=window_m2_bits index={index}"
);
assert_eq!(
left.exact_m2.map(f64::to_bits),
right.exact_m2.map(f64::to_bits),
"bead_id={BEAD_ID_26BE} case=window_exact_m2_bits index={index}"
);
}
}
#[test]
fn test_ams_accumulator_no_overflow() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
for _ in 0..100_000 {
sketch.observe_write_set(&[42]);
}
assert!(
!sketch.would_overflow_for_txn(1000),
"bead_id={BEAD_ID_26BE} case=no_overflow_realistic_bound"
);
let mut near_limit = AmsSketch::new(&config);
let mut values = vec![0i128; near_limit.r()];
values[0] = i128::MAX - 200;
near_limit.set_accumulators_for_test(&values);
assert!(
near_limit.would_overflow_for_txn(1000),
"bead_id={BEAD_ID_26BE} case=overflow_guard_detects_near_limit"
);
}
#[test]
fn test_data_collection_dedup() {
let config = AmsWindowCollectorConfig {
r: DEFAULT_AMS_R,
db_epoch: 1,
regime_id: 1,
window_width_ticks: 10,
track_exact_m2: true,
track_heavy_hitters: false,
heavy_hitter_k: DEFAULT_HEAVY_HITTER_K,
estimate_zipf: false,
};
let mut collector = AmsWindowCollector::new(config, 0);
let _closed = collector.observe_commit_attempt(0, &[5, 5, 7, 7, 7]);
assert_eq!(collector.exact_count_for_page(5), Some(1));
assert_eq!(collector.exact_count_for_page(7), Some(1));
}
#[test]
fn test_median_computation() {
let config = AmsSketchConfig {
r: 8,
db_epoch: 0,
regime_id: 0,
window_id: 0,
};
let mut sketch = AmsSketch::new(&config);
sketch.set_accumulators_for_test(&[1, 2, 3, 4, 5, 6, 7, 8]);
assert_eq!(
sketch.f2_hat(),
16,
"bead_id={BEAD_ID_26BE} case=median_even_lower"
);
}
#[test]
fn test_seed_derivation_blake3() {
let config = AmsSketchConfig {
r: DEFAULT_AMS_R,
db_epoch: 11,
regime_id: 22,
window_id: 33,
};
let idx = 7usize;
let mut hasher = blake3::Hasher::new();
hasher.update(AMS_SKETCH_VERSION.as_bytes());
hasher.update(&config.db_epoch.to_le_bytes());
hasher.update(&config.regime_id.to_le_bytes());
hasher.update(&config.window_id.to_le_bytes());
hasher.update(&(idx as u64).to_le_bytes());
let digest = hasher.finalize();
let expected = u64::from_le_bytes(digest.as_bytes()[..8].try_into().expect("8 bytes"));
assert_eq!(
config.seed_for_index(idx),
expected,
"bead_id={BEAD_ID_26BE} case=seed_derivation"
);
}
#[test]
fn test_spacesaving_insert_new() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
for pgno in 0..MIN_HEAVY_HITTER_K as u64 {
summary.observe_incidence(pgno);
}
assert_eq!(
summary.len(),
MIN_HEAVY_HITTER_K,
"bead_id={BEAD_ID_3U2V} case=spacesaving_insert_new_len"
);
for pgno in 0..MIN_HEAVY_HITTER_K as u64 {
let entry = summary
.entry_for(pgno)
.expect("all inserted entries must be present");
assert_eq!(entry.count_hat, 1);
assert_eq!(entry.err, 0);
}
}
#[test]
fn test_spacesaving_increment_existing() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
summary.observe_incidence(7);
summary.observe_incidence(7);
summary.observe_incidence(7);
let entry = summary
.entry_for(7)
.expect("entry must exist after repeated updates");
assert_eq!(entry.count_hat, 3);
assert_eq!(entry.err, 0);
}
#[test]
fn test_spacesaving_evict_min() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
for pgno in 0..MIN_HEAVY_HITTER_K as u64 {
summary.observe_incidence(pgno);
}
for pgno in 0..MIN_HEAVY_HITTER_K as u64 {
if pgno != 5 {
summary.observe_incidence(pgno);
}
}
summary.observe_incidence(10_000);
assert!(
summary.entry_for(5).is_none(),
"bead_id={BEAD_ID_3U2V} case=spacesaving_evict_min expected_pgno=5_evicted"
);
let replacement = summary
.entry_for(10_000)
.expect("replacement entry must exist");
assert_eq!(replacement.count_hat, 2);
assert_eq!(replacement.err, 1);
}
#[test]
fn test_spacesaving_tiebreak_min_pgno() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
for pgno in 0..MIN_HEAVY_HITTER_K as u64 {
summary.observe_incidence(pgno);
}
summary.observe_incidence(99_999);
assert!(
summary.entry_for(0).is_none(),
"bead_id={BEAD_ID_3U2V} case=spacesaving_tiebreak expected_pgno=0_evicted"
);
let replacement = summary
.entry_for(99_999)
.expect("replacement entry must exist");
assert_eq!(replacement.count_hat, 2);
assert_eq!(replacement.err, 1);
}
#[test]
fn test_spacesaving_count_bounds() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
let mut exact = HashMap::<u64, u64>::new();
for step in 0u64..5_000 {
let pgno = mix64(step) % 200;
summary.observe_incidence(pgno);
*exact.entry(pgno).or_default() += 1;
}
for entry in summary.entries_sorted() {
let exact_count = *exact.get(&entry.pgno).unwrap_or(&0);
let lower = entry.count_lower_bound();
assert!(
lower <= exact_count && exact_count <= entry.count_hat,
"bead_id={BEAD_ID_3U2V} case=spacesaving_bounds pgno={} lower={} exact={} upper={}",
entry.pgno,
lower,
exact_count,
entry.count_hat
);
}
}
#[test]
fn test_spacesaving_k_capacity() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
for step in 0u64..10_000 {
summary.observe_incidence(step);
}
assert!(
summary.len() <= MIN_HEAVY_HITTER_K,
"bead_id={BEAD_ID_3U2V} case=spacesaving_capacity len={} k={}",
summary.len(),
MIN_HEAVY_HITTER_K
);
}
#[test]
fn test_spacesaving_deterministic() {
let trace = (0u64..2000)
.map(|value| mix64(value) % 128)
.collect::<Vec<_>>();
let run = || {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
for &pgno in &trace {
summary.observe_incidence(pgno);
}
summary.entries_sorted()
};
assert_eq!(
run(),
run(),
"bead_id={BEAD_ID_3U2V} case=spacesaving_deterministic"
);
}
#[test]
fn test_head_tail_decomposition_exact() {
let mut summary = SpaceSavingSummary::new(MIN_HEAVY_HITTER_K);
for _ in 0..4 {
summary.observe_incidence(10);
}
for _ in 0..2 {
summary.observe_incidence(11);
}
let entries = summary.entries_sorted();
let exact_f2 = square_u64(4) + square_u64(2);
let decomposition = compute_head_tail_decomposition(&entries, exact_f2, 6);
assert_eq!(decomposition.f2_head_lower, exact_f2);
assert_eq!(decomposition.f2_head_upper, exact_f2);
assert_eq!(decomposition.f2_tail_hat, 0);
}
#[test]
fn test_head_tail_conservative() {
let entries = vec![
SpaceSavingEntry {
pgno: 1,
count_hat: 10,
err: 8,
},
SpaceSavingEntry {
pgno: 2,
count_hat: 9,
err: 7,
},
];
let decomposition = compute_head_tail_decomposition(&entries, 5, 10);
assert_eq!(decomposition.f2_tail_hat, 0);
assert!(decomposition.tail_contrib_hat >= 0.0);
assert!(decomposition.f2_head_lower <= decomposition.f2_head_upper);
}
#[test]
fn test_collision_mass_contrib() {
let entries = vec![
SpaceSavingEntry {
pgno: 1,
count_hat: 3,
err: 0,
},
SpaceSavingEntry {
pgno: 2,
count_hat: 2,
err: 0,
},
];
let decomposition = compute_head_tail_decomposition(&entries, 13, 5);
assert!(
(decomposition.head_contrib_upper - (13.0 / 25.0)).abs() < 1e-12,
"bead_id={BEAD_ID_3U2V} case=collision_mass_contrib got={}",
decomposition.head_contrib_upper
);
}
#[test]
fn test_zipf_mle_pure_zipf() {
let counts = synthetic_zipf_counts(64, 1.0, 100_000.0);
let result = zipf_mle_from_ranked_counts(&counts).expect("zipf mle should be defined");
assert!(
(result.s_hat - 1.0).abs() <= 0.12,
"bead_id={BEAD_ID_3U2V} case=zipf_mle_pure_zipf s_hat={}",
result.s_hat
);
}
#[test]
fn test_zipf_mle_clamp() {
let mut counts = vec![1_000_000_u64];
counts.extend(std::iter::repeat_n(1_u64, 63));
let result = zipf_mle_from_ranked_counts(&counts).expect("zipf mle should be defined");
assert!(
(ZIPF_S_MIN..=ZIPF_S_MAX).contains(&result.s_hat),
"bead_id={BEAD_ID_3U2V} case=zipf_mle_clamp s_hat={}",
result.s_hat
);
}
#[test]
fn test_zipf_mle_few_iterations() {
let counts = synthetic_zipf_counts(64, 0.9, 50_000.0);
let result = zipf_mle_from_ranked_counts(&counts).expect("zipf mle should be defined");
assert!(
result.iterations <= DEFAULT_ZIPF_MAX_ITERS,
"bead_id={BEAD_ID_3U2V} case=zipf_mle_iteration_budget iterations={}",
result.iterations
);
}
#[test]
fn test_evidence_ledger_fields() {
let config = AmsWindowCollectorConfig {
r: DEFAULT_AMS_R,
db_epoch: 3,
regime_id: 9,
window_width_ticks: 16,
track_exact_m2: true,
track_heavy_hitters: true,
heavy_hitter_k: DEFAULT_HEAVY_HITTER_K,
estimate_zipf: true,
};
let mut collector = AmsWindowCollector::new(config, 0);
for _ in 0..3 {
let _closed = collector.observe_commit_attempt(0, &[1]);
let _closed = collector.observe_commit_attempt(0, &[2]);
}
let _closed = collector.observe_commit_attempt(0, &[9]);
let estimate = collector.force_flush(1);
let ledger = estimate.to_evidence_ledger();
assert_eq!(ledger.txn_count, estimate.txn_count);
assert_eq!(ledger.regime_id, estimate.regime_id);
assert_eq!(ledger.f2_hat, estimate.f2_hat);
assert_eq!(
ledger.m2_hat.map(f64::to_bits),
estimate.m2_hat.map(f64::to_bits)
);
assert_eq!(ledger.p_eff_hat.to_bits(), estimate.p_eff_hat.to_bits());
assert_eq!(ledger.sketch_r, DEFAULT_AMS_R);
assert_eq!(ledger.sketch_version, AMS_SKETCH_VERSION);
assert_eq!(ledger.heavy_hitter_k, Some(DEFAULT_HEAVY_HITTER_K));
assert!(
!ledger.heavy_hitters.is_empty(),
"bead_id={BEAD_ID_3U2V} case=evidence_ledger_fields expected_non_empty_heavy_hitters"
);
assert!(ledger.head_contrib_lower.is_some());
assert!(ledger.head_contrib_upper.is_some());
assert!(ledger.tail_contrib_hat.is_some());
}
#[test]
fn test_evidence_ledger_sort_order() {
let config = AmsWindowCollectorConfig {
r: DEFAULT_AMS_R,
db_epoch: 1,
regime_id: 1,
window_width_ticks: 10,
track_exact_m2: false,
track_heavy_hitters: true,
heavy_hitter_k: DEFAULT_HEAVY_HITTER_K,
estimate_zipf: false,
};
let mut collector = AmsWindowCollector::new(config, 0);
for _ in 0..3 {
let _closed = collector.observe_commit_attempt(0, &[1]);
let _closed = collector.observe_commit_attempt(0, &[2]);
}
let _closed = collector.observe_commit_attempt(0, &[9]);
let estimate = collector.force_flush(1);
let ledger = estimate.to_evidence_ledger();
let mut previous: Option<HeavyHitterLedgerEntry> = None;
for entry in ledger.heavy_hitters {
if let Some(prev) = previous {
assert!(
prev.count_hat > entry.count_hat
|| (prev.count_hat == entry.count_hat && prev.pgno <= entry.pgno),
"bead_id={BEAD_ID_3U2V} case=evidence_sort_order prev={prev:?} entry={entry:?}"
);
}
previous = Some(entry);
}
}
#[test]
fn test_zipf_not_used_as_policy_input() {
let m2_hat = Some(0.25);
let policy_a = policy_collision_mass_input(m2_hat, Some(0.3));
let policy_b = policy_collision_mass_input(m2_hat, Some(1.9));
assert_eq!(policy_a, policy_b);
assert_eq!(policy_a, m2_hat);
}
const BEAD_ID_25Q8: &str = "bd-25q8";
#[test]
fn test_root_split_effective_w() {
let w = effective_w_root_split(3, 4);
assert_eq!(
w, 6,
"bead_id={BEAD_ID_25Q8} case=root_split_effective_w w={w}"
);
let w_min = effective_w_root_split(1, 0);
assert_eq!(
w_min, 4,
"bead_id={BEAD_ID_25Q8} case=root_split_min_clamp w={w_min}"
);
}
#[test]
fn test_leaf_split_effective_w() {
let w = effective_w_leaf_split(5, 3);
assert_eq!(
w, 7,
"bead_id={BEAD_ID_25Q8} case=leaf_split_effective_w w={w}"
);
let w_min = effective_w_leaf_split(2, 1);
assert_eq!(
w_min, 4,
"bead_id={BEAD_ID_25Q8} case=leaf_split_min_clamp w={w_min}"
);
}
#[test]
fn test_index_maintenance_w_multiplier() {
let w = effective_w_index_multiplier(1, 5, 1);
assert_eq!(
w, 6,
"bead_id={BEAD_ID_25Q8} case=index_w_multiplier_k5 w={w}"
);
let w_split = effective_w_index_multiplier(1, 5, 2);
assert_eq!(
w_split, 12,
"bead_id={BEAD_ID_25Q8} case=index_w_multiplier_split w={w_split}"
);
}
#[test]
fn test_instrumentation_conflicts_detected() {
let mut counters = InstrumentationCounters::default();
counters.record_conflict();
counters.record_conflict();
counters.record_conflict();
assert_eq!(
counters.conflicts_detected, 3,
"bead_id={BEAD_ID_25Q8} case=conflicts_detected"
);
}
#[test]
fn test_instrumentation_merge_rung_counts() {
let mut counters = InstrumentationCounters::default();
counters.record_conflict();
counters.record_merge_rebase();
counters.record_conflict();
counters.record_merge_structured();
counters.record_conflict();
counters.record_abort();
assert_eq!(
counters.conflicts_detected, 3,
"bead_id={BEAD_ID_25Q8} case=merge_rung_detected"
);
assert_eq!(
counters.conflicts_merged_rebase, 1,
"bead_id={BEAD_ID_25Q8} case=merge_rung_rebase"
);
assert_eq!(
counters.conflicts_merged_structured, 1,
"bead_id={BEAD_ID_25Q8} case=merge_rung_structured"
);
assert_eq!(
counters.conflicts_aborted, 1,
"bead_id={BEAD_ID_25Q8} case=merge_rung_aborted"
);
}
#[test]
fn test_instrumentation_pages_per_commit_histogram() {
let mut counters = InstrumentationCounters::default();
for w in [3, 5, 3, 7, 5, 3, 10, 5, 3, 3] {
counters.record_commit(w, 4);
}
assert_eq!(
counters.total_commits, 10,
"bead_id={BEAD_ID_25Q8} case=histogram_total"
);
assert_eq!(
counters
.pages_per_commit_histogram
.get(3)
.copied()
.unwrap_or(0),
5,
"bead_id={BEAD_ID_25Q8} case=histogram_bin_3"
);
assert_eq!(
counters
.pages_per_commit_histogram
.get(5)
.copied()
.unwrap_or(0),
3,
"bead_id={BEAD_ID_25Q8} case=histogram_bin_5"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_pages_per_commit_m2_derivation() {
let mut counters = InstrumentationCounters::default();
for _ in 0..5 {
counters.record_commit(2, 1);
}
for _ in 0..3 {
counters.record_commit(4, 1);
}
for _ in 0..2 {
counters.record_commit(6, 1);
}
let m2 = counters
.pages_per_commit_m2()
.expect("non-zero total_commits");
assert!(
(m2 - 14.0).abs() < 1e-9,
"bead_id={BEAD_ID_25Q8} case=m2_derivation m2={m2} expected=14.0"
);
}
#[test]
fn test_p_drift_formula() {
let pd = p_drift(8, 0.025);
let expected = 1.0 - (-7.0 * 0.025_f64).exp();
let rel_error = ((pd - expected) / expected).abs();
assert!(
rel_error < 0.01,
"bead_id={BEAD_ID_25Q8} case=p_drift pd={pd} expected={expected} rel_error={rel_error}"
);
}
#[test]
fn test_p_abort_attempt_formula() {
let pa = p_abort_attempt(0.16, 0.40);
let expected = 0.096;
let abs_error = (pa - expected).abs();
assert!(
abs_error < 0.001,
"bead_id={BEAD_ID_25Q8} case=p_abort pa={pa} expected={expected}"
);
}
#[test]
fn test_tps_formula() {
let tps = tps_estimate(8, 0.10, 0.01);
let expected = 720.0;
let rel_error = ((tps - expected) / expected).abs();
assert!(
rel_error < 0.01,
"bead_id={BEAD_ID_25Q8} case=tps tps={tps} expected={expected}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_validation_uniform_within_10pct() {
let n: u64 = 8;
let w: u64 = 50;
let p: u64 = 10_000;
let predicted = birthday_conflict_probability_uniform(n, w, p);
let analytical = 1.0 - (-7.0_f64).exp();
let rel_error = ((predicted - analytical) / analytical).abs();
assert!(
rel_error < 0.01,
"bead_id={BEAD_ID_25Q8} case=uniform_10pct predicted={predicted} analytical={analytical}"
);
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn test_validation_skewed_within_20pct() {
let w: f64 = 50.0;
let p: f64 = 10_000.0;
let n: u64 = 8;
let m2_uniform = (w * w) / p;
let m2_skewed = m2_uniform * 3.0;
let pred_uniform = birthday_conflict_probability_m2(n, m2_uniform);
let pred_skewed = birthday_conflict_probability_m2(n, m2_skewed);
assert!(
pred_skewed >= pred_uniform,
"bead_id={BEAD_ID_25Q8} case=skewed_20pct skewed={pred_skewed} uniform={pred_uniform}"
);
}
#[test]
fn test_f_merge_computation() {
let mut counters = InstrumentationCounters::default();
for _ in 0..10 {
counters.record_conflict();
}
for _ in 0..4 {
counters.record_merge_rebase();
}
for _ in 0..2 {
counters.record_merge_structured();
}
for _ in 0..4 {
counters.record_abort();
}
let f = counters.f_merge().expect("non-zero conflicts");
assert!(
(f - 0.6).abs() < 1e-9,
"bead_id={BEAD_ID_25Q8} case=f_merge f={f} expected=0.6"
);
}
}