use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
pub const MAX_EACH_BUCKETS: usize = 256;
pub const MAX_EACH_KEYS: usize = 6;
const EACH_MSG_LEN: usize = 32;
pub const EACH_BUCKET_MEM_SIZE: usize = 8 + MAX_EACH_BUCKETS * std::mem::size_of::<EachBucket>();
#[repr(C)]
#[derive(Clone, Copy)]
pub struct EachBucket {
pub site_hash: u32,
pub bucket_hash: u32,
pub split_triggered: u8,
pub num_keys: u8,
pub has_quality: u8,
pub _pad: u8,
pub pass_count: u32,
pub best_score: i64,
pub key_values: [i64; MAX_EACH_KEYS],
pub msg: [u8; EACH_MSG_LEN],
}
impl EachBucket {
pub fn msg_str(&self) -> &str {
let len = self
.msg
.iter()
.position(|&b| b == 0)
.unwrap_or(EACH_MSG_LEN);
std::str::from_utf8(&self.msg[..len]).unwrap_or("???")
}
}
fn msg_hash(msg: &str) -> u32 {
let mut h: u32 = 0x811c9dc5;
for b in msg.bytes() {
h ^= b as u32;
h = h.wrapping_mul(0x01000193);
}
h
}
unsafe fn find_or_alloc_each_bucket(
ptr: *mut u8,
site_hash: u32,
bucket_hash: u32,
keys: &[(&str, i64)],
msg: &str,
has_quality: u8,
) -> *mut EachBucket {
unsafe {
let next_atomic = &*(ptr as *const AtomicU32);
let count = next_atomic.load(Ordering::Relaxed) as usize;
let base = ptr.add(8) as *mut EachBucket;
for i in 0..count.min(MAX_EACH_BUCKETS) {
let bucket = base.add(i);
if (*bucket).site_hash == site_hash && (*bucket).bucket_hash == bucket_hash {
return bucket;
}
}
let new_idx = next_atomic.fetch_add(1, Ordering::Relaxed) as usize;
if new_idx >= MAX_EACH_BUCKETS {
next_atomic.fetch_sub(1, Ordering::Relaxed);
return std::ptr::null_mut();
}
let bucket = base.add(new_idx);
let mut msg_buf = [0u8; EACH_MSG_LEN];
let n = msg.len().min(EACH_MSG_LEN - 1);
msg_buf[..n].copy_from_slice(&msg.as_bytes()[..n]);
let mut key_values = [0i64; MAX_EACH_KEYS];
let num_keys = keys.len().min(MAX_EACH_KEYS);
for (i, &(_, v)) in keys.iter().take(num_keys).enumerate() {
key_values[i] = v;
}
std::ptr::write(
bucket,
EachBucket {
site_hash,
bucket_hash,
split_triggered: 0,
num_keys: num_keys as u8,
has_quality,
_pad: 0,
pass_count: 0,
best_score: i64::MIN,
key_values,
msg: msg_buf,
},
);
bucket
}
}
fn compute_each_bucket_index(base_ptr: *mut u8, bucket: *const EachBucket) -> usize {
if base_ptr.is_null() {
return 0;
}
let buckets_base = unsafe { base_ptr.add(8) } as usize;
let offset = (bucket as usize).saturating_sub(buckets_base);
offset / std::mem::size_of::<EachBucket>()
}
fn pack_quality(quality: &[(&str, i64)]) -> i64 {
let mut packed: i64 = 0;
let n = quality.len().min(4);
for (i, &(_, v)) in quality.iter().take(n).enumerate() {
let shift = (3 - i) * 16;
packed |= ((v as u16) as i64) << shift;
}
packed
}
pub fn unpack_quality(packed: i64, n: u8) -> Vec<i64> {
(0..n as usize)
.map(|i| {
let shift = (3 - i) * 16;
((packed >> shift) as u16) as i64
})
.collect()
}
pub fn assertion_sometimes_each(msg: &str, keys: &[(&str, i64)], quality: &[(&str, i64)]) {
let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
if ptr.is_null() {
return;
}
let site_hash = msg_hash(msg);
let mut bucket_hash = site_hash;
for &(_, val) in keys {
for b in val.to_le_bytes() {
bucket_hash ^= b as u32;
bucket_hash = bucket_hash.wrapping_mul(0x01000193);
}
}
let bm_ptr = crate::context::COVERAGE_BITMAP_PTR.with(|c| c.get());
if !bm_ptr.is_null() {
let bm = unsafe { crate::coverage::CoverageBitmap::new(bm_ptr) };
bm.set_bit(bucket_hash as usize);
}
let has_quality = quality.len().min(4) as u8;
let score = if has_quality > 0 {
pack_quality(quality)
} else {
0
};
let bucket =
unsafe { find_or_alloc_each_bucket(ptr, site_hash, bucket_hash, keys, msg, has_quality) };
if bucket.is_null() {
return;
}
unsafe {
let count_atomic = &*((&(*bucket).pass_count) as *const u32 as *const AtomicU32);
count_atomic.fetch_add(1, Ordering::Relaxed);
let ft = &*((&(*bucket).split_triggered) as *const u8 as *const AtomicU8);
let first_discovery = ft
.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok();
if first_discovery {
if has_quality > 0 {
let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
bs_atomic.store(score, Ordering::Relaxed);
}
let bucket_index = compute_each_bucket_index(ptr, bucket);
crate::split_loop::dispatch_split(
msg,
bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
);
} else if has_quality > 0 {
let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
let mut current = bs_atomic.load(Ordering::Relaxed);
loop {
if score <= current {
break;
}
match bs_atomic.compare_exchange_weak(
current,
score,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
let bucket_index = compute_each_bucket_index(ptr, bucket);
crate::split_loop::dispatch_split(
msg,
bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
);
break;
}
Err(actual) => current = actual,
}
}
}
}
}
pub fn each_bucket_read_all() -> Vec<EachBucket> {
let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
if ptr.is_null() {
return Vec::new();
}
unsafe {
let count = (*(ptr as *const u32)) as usize;
let count = count.min(MAX_EACH_BUCKETS);
let base = ptr.add(8) as *const EachBucket;
(0..count).map(|i| std::ptr::read(base.add(i))).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_msg_hash_deterministic() {
let h1 = msg_hash("test_assertion");
let h2 = msg_hash("test_assertion");
assert_eq!(h1, h2);
}
#[test]
fn test_msg_hash_different_inputs() {
let h1 = msg_hash("alpha");
let h2 = msg_hash("beta");
let h3 = msg_hash("gamma");
assert_ne!(h1, h2);
assert_ne!(h2, h3);
assert_ne!(h1, h3);
}
#[test]
fn test_pack_unpack_quality_roundtrip() {
let quality = &[("health", 100i64), ("armor", 50i64), ("mana", 200i64)];
let packed = pack_quality(quality);
let unpacked = unpack_quality(packed, 3);
assert_eq!(unpacked, vec![100, 50, 200]);
}
#[test]
fn test_pack_quality_single() {
let quality = &[("health", 42i64)];
let packed = pack_quality(quality);
let unpacked = unpack_quality(packed, 1);
assert_eq!(unpacked, vec![42]);
}
#[test]
fn test_each_bucket_size_stable() {
assert_eq!(std::mem::size_of::<EachBucket>(), 104);
}
#[test]
fn test_each_bucket_read_all_when_inactive() {
let buckets = each_bucket_read_all();
assert!(buckets.is_empty());
}
#[test]
fn test_assertion_sometimes_each_noop_when_inactive() {
assertion_sometimes_each("test", &[("key", 1)], &[]);
}
}