1use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
17
18pub const MAX_EACH_BUCKETS: usize = 256;
20
21pub const MAX_EACH_KEYS: usize = 6;
23
24const EACH_MSG_LEN: usize = 32;
26
27pub const EACH_BUCKET_MEM_SIZE: usize = 8 + MAX_EACH_BUCKETS * std::mem::size_of::<EachBucket>();
29
30#[repr(C)]
35#[derive(Clone, Copy)]
36pub struct EachBucket {
37 pub site_hash: u32,
39 pub bucket_hash: u32,
41 pub split_triggered: u8,
43 pub num_keys: u8,
45 pub has_quality: u8,
47 pub _pad: u8,
49 pub pass_count: u32,
51 pub best_score: i64,
53 pub key_values: [i64; MAX_EACH_KEYS],
55 pub msg: [u8; EACH_MSG_LEN],
57}
58
59impl EachBucket {
60 pub fn msg_str(&self) -> &str {
62 let len = self
63 .msg
64 .iter()
65 .position(|&b| b == 0)
66 .unwrap_or(EACH_MSG_LEN);
67 std::str::from_utf8(&self.msg[..len]).unwrap_or("???")
68 }
69}
70
71fn msg_hash(msg: &str) -> u32 {
73 let mut h: u32 = 0x811c9dc5;
74 for b in msg.bytes() {
75 h ^= b as u32;
76 h = h.wrapping_mul(0x01000193);
77 }
78 h
79}
80
81unsafe fn find_or_alloc_each_bucket(
90 ptr: *mut u8,
91 site_hash: u32,
92 bucket_hash: u32,
93 keys: &[(&str, i64)],
94 msg: &str,
95 has_quality: u8,
96) -> *mut EachBucket {
97 unsafe {
98 let next_atomic = &*(ptr as *const AtomicU32);
99 let count = next_atomic.load(Ordering::Relaxed) as usize;
100 let base = ptr.add(8) as *mut EachBucket;
101
102 for i in 0..count.min(MAX_EACH_BUCKETS) {
104 let bucket = base.add(i);
105 if (*bucket).site_hash == site_hash && (*bucket).bucket_hash == bucket_hash {
106 return bucket;
107 }
108 }
109
110 let new_idx = next_atomic.fetch_add(1, Ordering::Relaxed) as usize;
112 if new_idx >= MAX_EACH_BUCKETS {
113 next_atomic.fetch_sub(1, Ordering::Relaxed);
114 return std::ptr::null_mut();
115 }
116
117 let bucket = base.add(new_idx);
118 let mut msg_buf = [0u8; EACH_MSG_LEN];
119 let n = msg.len().min(EACH_MSG_LEN - 1);
120 msg_buf[..n].copy_from_slice(&msg.as_bytes()[..n]);
121
122 let mut key_values = [0i64; MAX_EACH_KEYS];
123 let num_keys = keys.len().min(MAX_EACH_KEYS);
124 for (i, &(_, v)) in keys.iter().take(num_keys).enumerate() {
125 key_values[i] = v;
126 }
127
128 std::ptr::write(
129 bucket,
130 EachBucket {
131 site_hash,
132 bucket_hash,
133 split_triggered: 0,
134 num_keys: num_keys as u8,
135 has_quality,
136 _pad: 0,
137 pass_count: 0,
138 best_score: i64::MIN,
139 key_values,
140 msg: msg_buf,
141 },
142 );
143 bucket
144 }
145}
146
147fn compute_each_bucket_index(base_ptr: *mut u8, bucket: *const EachBucket) -> usize {
149 if base_ptr.is_null() {
150 return 0;
151 }
152 let buckets_base = unsafe { base_ptr.add(8) } as usize;
153 let offset = (bucket as usize).saturating_sub(buckets_base);
154 offset / std::mem::size_of::<EachBucket>()
155}
156
157fn pack_quality(quality: &[(&str, i64)]) -> i64 {
161 let mut packed: i64 = 0;
162 let n = quality.len().min(4);
163 for (i, &(_, v)) in quality.iter().take(n).enumerate() {
164 let shift = (3 - i) * 16;
165 packed |= ((v as u16) as i64) << shift;
166 }
167 packed
168}
169
170pub fn unpack_quality(packed: i64, n: u8) -> Vec<i64> {
172 (0..n as usize)
173 .map(|i| {
174 let shift = (3 - i) * 16;
175 ((packed >> shift) as u16) as i64
176 })
177 .collect()
178}
179
180pub fn assertion_sometimes_each(msg: &str, keys: &[(&str, i64)], quality: &[(&str, i64)]) {
188 let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
189 if ptr.is_null() {
190 return;
191 }
192
193 let site_hash = msg_hash(msg);
196 let mut bucket_hash = site_hash;
197 for &(_, val) in keys {
198 for b in val.to_le_bytes() {
199 bucket_hash ^= b as u32;
200 bucket_hash = bucket_hash.wrapping_mul(0x01000193);
201 }
202 }
203
204 let bm_ptr = crate::context::COVERAGE_BITMAP_PTR.with(|c| c.get());
208 if !bm_ptr.is_null() {
209 let bm = unsafe { crate::coverage::CoverageBitmap::new(bm_ptr) };
212 bm.set_bit(bucket_hash as usize);
213 }
214
215 let has_quality = quality.len().min(4) as u8;
216 let score = if has_quality > 0 {
217 pack_quality(quality)
218 } else {
219 0
220 };
221
222 let bucket =
224 unsafe { find_or_alloc_each_bucket(ptr, site_hash, bucket_hash, keys, msg, has_quality) };
225 if bucket.is_null() {
226 return;
227 }
228
229 unsafe {
233 let count_atomic = &*((&(*bucket).pass_count) as *const u32 as *const AtomicU32);
235 count_atomic.fetch_add(1, Ordering::Relaxed);
236
237 let ft = &*((&(*bucket).split_triggered) as *const u8 as *const AtomicU8);
239 let first_discovery = ft
240 .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
241 .is_ok();
242
243 if first_discovery {
244 if has_quality > 0 {
246 let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
247 bs_atomic.store(score, Ordering::Relaxed);
248 }
249
250 let bucket_index = compute_each_bucket_index(ptr, bucket);
251 crate::split_loop::dispatch_split(
252 msg,
253 bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
254 );
255 } else if has_quality > 0 {
256 let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
259 let mut current = bs_atomic.load(Ordering::Relaxed);
260 loop {
261 if score <= current {
262 break;
263 }
264 match bs_atomic.compare_exchange_weak(
265 current,
266 score,
267 Ordering::Relaxed,
268 Ordering::Relaxed,
269 ) {
270 Ok(_) => {
271 let bucket_index = compute_each_bucket_index(ptr, bucket);
272 crate::split_loop::dispatch_split(
273 msg,
274 bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
275 );
276 break;
277 }
278 Err(actual) => current = actual,
279 }
280 }
281 }
282 }
283}
284
285pub fn each_bucket_read_all() -> Vec<EachBucket> {
289 let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
290 if ptr.is_null() {
291 return Vec::new();
292 }
293 unsafe {
299 let count = (*(ptr as *const u32)) as usize;
300 let count = count.min(MAX_EACH_BUCKETS);
301 let base = ptr.add(8) as *const EachBucket;
302 (0..count).map(|i| std::ptr::read(base.add(i))).collect()
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
311 fn test_msg_hash_deterministic() {
312 let h1 = msg_hash("test_assertion");
313 let h2 = msg_hash("test_assertion");
314 assert_eq!(h1, h2);
315 }
316
317 #[test]
318 fn test_msg_hash_different_inputs() {
319 let h1 = msg_hash("alpha");
320 let h2 = msg_hash("beta");
321 let h3 = msg_hash("gamma");
322 assert_ne!(h1, h2);
323 assert_ne!(h2, h3);
324 assert_ne!(h1, h3);
325 }
326
327 #[test]
328 fn test_pack_unpack_quality_roundtrip() {
329 let quality = &[("health", 100i64), ("armor", 50i64), ("mana", 200i64)];
330 let packed = pack_quality(quality);
331 let unpacked = unpack_quality(packed, 3);
332 assert_eq!(unpacked, vec![100, 50, 200]);
333 }
334
335 #[test]
336 fn test_pack_quality_single() {
337 let quality = &[("health", 42i64)];
338 let packed = pack_quality(quality);
339 let unpacked = unpack_quality(packed, 1);
340 assert_eq!(unpacked, vec![42]);
341 }
342
343 #[test]
344 fn test_each_bucket_size_stable() {
345 assert_eq!(std::mem::size_of::<EachBucket>(), 104);
348 }
349
350 #[test]
351 fn test_each_bucket_read_all_when_inactive() {
352 let buckets = each_bucket_read_all();
354 assert!(buckets.is_empty());
355 }
356
357 #[test]
358 fn test_assertion_sometimes_each_noop_when_inactive() {
359 assertion_sometimes_each("test", &[("key", 1)], &[]);
361 }
362}