1use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
17
18pub const MAX_EACH_BUCKETS: usize = 256;
20
21pub const MAX_EACH_KEYS: usize = 6;
23
24const EACH_MSG_LEN: usize = 32;
26
27pub const EACH_BUCKET_MEM_SIZE: usize = 8 + MAX_EACH_BUCKETS * std::mem::size_of::<EachBucket>();
29
30#[repr(C)]
35#[derive(Clone, Copy)]
36pub struct EachBucket {
37 pub site_hash: u32,
39 pub bucket_hash: u32,
41 pub split_triggered: u8,
43 pub num_keys: u8,
45 pub has_quality: u8,
47 pub _pad: u8,
49 pub pass_count: u32,
51 pub best_score: i64,
53 pub key_values: [i64; MAX_EACH_KEYS],
55 pub msg: [u8; EACH_MSG_LEN],
57}
58
59impl EachBucket {
60 pub fn msg_str(&self) -> &str {
62 let len = self
63 .msg
64 .iter()
65 .position(|&b| b == 0)
66 .unwrap_or(EACH_MSG_LEN);
67 std::str::from_utf8(&self.msg[..len]).unwrap_or("???")
68 }
69}
70
71fn msg_hash(msg: &str) -> u32 {
73 let mut h: u32 = 0x811c9dc5;
74 for b in msg.bytes() {
75 h ^= b as u32;
76 h = h.wrapping_mul(0x01000193);
77 }
78 h
79}
80
81unsafe fn find_or_alloc_each_bucket(
90 ptr: *mut u8,
91 site_hash: u32,
92 bucket_hash: u32,
93 keys: &[(&str, i64)],
94 msg: &str,
95 has_quality: u8,
96) -> *mut EachBucket {
97 unsafe {
98 let next_atomic = &*(ptr as *const AtomicU32);
99 let count = next_atomic.load(Ordering::Relaxed) as usize;
100 let base = ptr.add(8) as *mut EachBucket;
101
102 for i in 0..count.min(MAX_EACH_BUCKETS) {
104 let bucket = base.add(i);
105 if (*bucket).site_hash == site_hash && (*bucket).bucket_hash == bucket_hash {
106 return bucket;
107 }
108 }
109
110 let new_idx = next_atomic.fetch_add(1, Ordering::Relaxed) as usize;
112 if new_idx >= MAX_EACH_BUCKETS {
113 next_atomic.fetch_sub(1, Ordering::Relaxed);
114 return std::ptr::null_mut();
115 }
116
117 let bucket = base.add(new_idx);
118 let mut msg_buf = [0u8; EACH_MSG_LEN];
119 let n = msg.len().min(EACH_MSG_LEN - 1);
120 msg_buf[..n].copy_from_slice(&msg.as_bytes()[..n]);
121
122 let mut key_values = [0i64; MAX_EACH_KEYS];
123 let num_keys = keys.len().min(MAX_EACH_KEYS);
124 for (i, &(_, v)) in keys.iter().take(num_keys).enumerate() {
125 key_values[i] = v;
126 }
127
128 std::ptr::write(
129 bucket,
130 EachBucket {
131 site_hash,
132 bucket_hash,
133 split_triggered: 0,
134 num_keys: num_keys as u8,
135 has_quality,
136 _pad: 0,
137 pass_count: 0,
138 best_score: i64::MIN,
139 key_values,
140 msg: msg_buf,
141 },
142 );
143 bucket
144 }
145}
146
147fn compute_each_bucket_index(base_ptr: *mut u8, bucket: *const EachBucket) -> usize {
149 if base_ptr.is_null() {
150 return 0;
151 }
152 let buckets_base = unsafe { base_ptr.add(8) } as usize;
153 let offset = (bucket as usize).saturating_sub(buckets_base);
154 offset / std::mem::size_of::<EachBucket>()
155}
156
157fn pack_quality(quality: &[(&str, i64)]) -> i64 {
161 let mut packed: i64 = 0;
162 let n = quality.len().min(4);
163 for (i, &(_, v)) in quality.iter().take(n).enumerate() {
164 let shift = (3 - i) * 16;
165 packed |= ((v as u16) as i64) << shift;
166 }
167 packed
168}
169
170pub fn unpack_quality(packed: i64, n: u8) -> Vec<i64> {
172 (0..n as usize)
173 .map(|i| {
174 let shift = (3 - i) * 16;
175 ((packed >> shift) as u16) as i64
176 })
177 .collect()
178}
179
180pub fn assertion_sometimes_each(msg: &str, keys: &[(&str, i64)], quality: &[(&str, i64)]) {
188 let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
189 if ptr.is_null() {
190 return;
191 }
192
193 let site_hash = msg_hash(msg);
196 let mut bucket_hash = site_hash;
197 for &(_, val) in keys {
198 for b in val.to_le_bytes() {
199 bucket_hash ^= b as u32;
200 bucket_hash = bucket_hash.wrapping_mul(0x01000193);
201 }
202 }
203
204 let bm_ptr = crate::context::COVERAGE_BITMAP_PTR.with(|c| c.get());
208 if !bm_ptr.is_null() {
209 let bm = unsafe { crate::coverage::CoverageBitmap::new(bm_ptr) };
210 bm.set_bit(bucket_hash as usize);
211 }
212
213 let has_quality = quality.len().min(4) as u8;
214 let score = if has_quality > 0 {
215 pack_quality(quality)
216 } else {
217 0
218 };
219
220 let bucket =
222 unsafe { find_or_alloc_each_bucket(ptr, site_hash, bucket_hash, keys, msg, has_quality) };
223 if bucket.is_null() {
224 return;
225 }
226
227 unsafe {
231 let count_atomic = &*((&(*bucket).pass_count) as *const u32 as *const AtomicU32);
233 count_atomic.fetch_add(1, Ordering::Relaxed);
234
235 let ft = &*((&(*bucket).split_triggered) as *const u8 as *const AtomicU8);
237 let first_discovery = ft
238 .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
239 .is_ok();
240
241 if first_discovery {
242 if has_quality > 0 {
244 let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
245 bs_atomic.store(score, Ordering::Relaxed);
246 }
247
248 let bucket_index = compute_each_bucket_index(ptr, bucket);
249 crate::split_loop::dispatch_split(
250 msg,
251 bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
252 );
253 } else if has_quality > 0 {
254 let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
257 let mut current = bs_atomic.load(Ordering::Relaxed);
258 loop {
259 if score <= current {
260 break;
261 }
262 match bs_atomic.compare_exchange_weak(
263 current,
264 score,
265 Ordering::Relaxed,
266 Ordering::Relaxed,
267 ) {
268 Ok(_) => {
269 let bucket_index = compute_each_bucket_index(ptr, bucket);
270 crate::split_loop::dispatch_split(
271 msg,
272 bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
273 );
274 break;
275 }
276 Err(actual) => current = actual,
277 }
278 }
279 }
280 }
281}
282
283pub fn each_bucket_read_all() -> Vec<EachBucket> {
287 let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
288 if ptr.is_null() {
289 return Vec::new();
290 }
291 unsafe {
292 let count = (*(ptr as *const u32)) as usize;
293 let count = count.min(MAX_EACH_BUCKETS);
294 let base = ptr.add(8) as *const EachBucket;
295 (0..count).map(|i| std::ptr::read(base.add(i))).collect()
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_msg_hash_deterministic() {
305 let h1 = msg_hash("test_assertion");
306 let h2 = msg_hash("test_assertion");
307 assert_eq!(h1, h2);
308 }
309
310 #[test]
311 fn test_msg_hash_different_inputs() {
312 let h1 = msg_hash("alpha");
313 let h2 = msg_hash("beta");
314 let h3 = msg_hash("gamma");
315 assert_ne!(h1, h2);
316 assert_ne!(h2, h3);
317 assert_ne!(h1, h3);
318 }
319
320 #[test]
321 fn test_pack_unpack_quality_roundtrip() {
322 let quality = &[("health", 100i64), ("armor", 50i64), ("mana", 200i64)];
323 let packed = pack_quality(quality);
324 let unpacked = unpack_quality(packed, 3);
325 assert_eq!(unpacked, vec![100, 50, 200]);
326 }
327
328 #[test]
329 fn test_pack_quality_single() {
330 let quality = &[("health", 42i64)];
331 let packed = pack_quality(quality);
332 let unpacked = unpack_quality(packed, 1);
333 assert_eq!(unpacked, vec![42]);
334 }
335
336 #[test]
337 fn test_each_bucket_size_stable() {
338 assert_eq!(std::mem::size_of::<EachBucket>(), 104);
341 }
342
343 #[test]
344 fn test_each_bucket_read_all_when_inactive() {
345 let buckets = each_bucket_read_all();
347 assert!(buckets.is_empty());
348 }
349
350 #[test]
351 fn test_assertion_sometimes_each_noop_when_inactive() {
352 assertion_sometimes_each("test", &[("key", 1)], &[]);
354 }
355}