Skip to main content

moonpool_explorer/
each_buckets.rs

1//! Per-value bucketed exploration infrastructure for `assert_sometimes_each!`.
2//!
3//! Each unique combination of identity key values creates one bucket in shared
4//! memory. On first discovery, a fork is triggered. Optional quality watermarks
5//! allow re-forking when the packed quality score improves.
6//!
7//! # Memory Layout
8//!
9//! ```text
10//! [next_bucket: u32, _pad: u32, buckets: [EachBucket; MAX_EACH_BUCKETS]]
11//! ```
12//!
13//! The `next_bucket` counter is incremented atomically (via `AtomicU32::fetch_add`)
14//! to allocate new buckets safely across fork boundaries.
15
16use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
17
18/// Maximum number of EachBucket slots in shared memory.
19pub const MAX_EACH_BUCKETS: usize = 256;
20
21/// Maximum number of identity keys per bucket.
22pub const MAX_EACH_KEYS: usize = 6;
23
24/// Maximum length of the assertion message stored in a bucket.
25const EACH_MSG_LEN: usize = 32;
26
27/// Total shared memory size for the EachBucket region.
28pub const EACH_BUCKET_MEM_SIZE: usize = 8 + MAX_EACH_BUCKETS * std::mem::size_of::<EachBucket>();
29
30/// One bucket's state in MAP_SHARED memory for per-value bucketed assertions.
31///
32/// Each unique combination of identity key values creates one bucket.
33/// Optional quality watermark (`has_quality != 0`): re-forks when `best_score` improves.
34#[repr(C)]
35#[derive(Clone, Copy)]
36pub struct EachBucket {
37    /// FNV-1a hash of the assertion message string.
38    pub site_hash: u32,
39    /// Hash of (site_hash + identity key values) — uniquely identifies this bucket.
40    pub bucket_hash: u32,
41    /// CAS guard: 0 = no fork yet, 1 = first fork triggered.
42    pub split_triggered: u8,
43    /// Number of identity keys stored in `key_values`.
44    pub num_keys: u8,
45    /// Number of quality keys (0-4). 0 means no quality tracking.
46    pub has_quality: u8,
47    /// Alignment padding.
48    pub _pad: u8,
49    /// Number of times this bucket has been hit (atomic increment).
50    pub pass_count: u32,
51    /// Best quality watermark score (atomic CAS for improvement detection).
52    pub best_score: i64,
53    /// Identity key values for display/debugging.
54    pub key_values: [i64; MAX_EACH_KEYS],
55    /// Assertion message string (null-terminated C-style).
56    pub msg: [u8; EACH_MSG_LEN],
57}
58
59impl EachBucket {
60    /// Get the assertion message as a string slice.
61    pub fn msg_str(&self) -> &str {
62        let len = self
63            .msg
64            .iter()
65            .position(|&b| b == 0)
66            .unwrap_or(EACH_MSG_LEN);
67        std::str::from_utf8(&self.msg[..len]).unwrap_or("???")
68    }
69}
70
71/// FNV-1a hash of a message string to a stable u32.
72fn msg_hash(msg: &str) -> u32 {
73    let mut h: u32 = 0x811c9dc5;
74    for b in msg.bytes() {
75        h ^= b as u32;
76        h = h.wrapping_mul(0x01000193);
77    }
78    h
79}
80
81/// Find an existing bucket or allocate a new one by (site_hash, bucket_hash).
82///
83/// Returns a pointer to the bucket, or null if the table is full.
84///
85/// # Safety
86///
87/// `ptr` must point to a valid EachBucket shared memory region of at least
88/// `EACH_BUCKET_MEM_SIZE` bytes.
89unsafe fn find_or_alloc_each_bucket(
90    ptr: *mut u8,
91    site_hash: u32,
92    bucket_hash: u32,
93    keys: &[(&str, i64)],
94    msg: &str,
95    has_quality: u8,
96) -> *mut EachBucket {
97    unsafe {
98        let next_atomic = &*(ptr as *const AtomicU32);
99        let count = next_atomic.load(Ordering::Relaxed) as usize;
100        let base = ptr.add(8) as *mut EachBucket;
101
102        // Search existing buckets.
103        for i in 0..count.min(MAX_EACH_BUCKETS) {
104            let bucket = base.add(i);
105            if (*bucket).site_hash == site_hash && (*bucket).bucket_hash == bucket_hash {
106                return bucket;
107            }
108        }
109
110        // Allocate new bucket atomically.
111        let new_idx = next_atomic.fetch_add(1, Ordering::Relaxed) as usize;
112        if new_idx >= MAX_EACH_BUCKETS {
113            next_atomic.fetch_sub(1, Ordering::Relaxed);
114            return std::ptr::null_mut();
115        }
116
117        let bucket = base.add(new_idx);
118        let mut msg_buf = [0u8; EACH_MSG_LEN];
119        let n = msg.len().min(EACH_MSG_LEN - 1);
120        msg_buf[..n].copy_from_slice(&msg.as_bytes()[..n]);
121
122        let mut key_values = [0i64; MAX_EACH_KEYS];
123        let num_keys = keys.len().min(MAX_EACH_KEYS);
124        for (i, &(_, v)) in keys.iter().take(num_keys).enumerate() {
125            key_values[i] = v;
126        }
127
128        std::ptr::write(
129            bucket,
130            EachBucket {
131                site_hash,
132                bucket_hash,
133                split_triggered: 0,
134                num_keys: num_keys as u8,
135                has_quality,
136                _pad: 0,
137                pass_count: 0,
138                best_score: i64::MIN,
139                key_values,
140                msg: msg_buf,
141            },
142        );
143        bucket
144    }
145}
146
147/// Compute the 0-based array index of a bucket from its pointer.
148fn compute_each_bucket_index(base_ptr: *mut u8, bucket: *const EachBucket) -> usize {
149    if base_ptr.is_null() {
150        return 0;
151    }
152    let buckets_base = unsafe { base_ptr.add(8) } as usize;
153    let offset = (bucket as usize).saturating_sub(buckets_base);
154    offset / std::mem::size_of::<EachBucket>()
155}
156
157/// Pack up to 4 quality key values into a single i64 for lexicographic comparison.
158///
159/// First key gets the highest 16 bits (highest priority).
160fn pack_quality(quality: &[(&str, i64)]) -> i64 {
161    let mut packed: i64 = 0;
162    let n = quality.len().min(4);
163    for (i, &(_, v)) in quality.iter().take(n).enumerate() {
164        let shift = (3 - i) * 16;
165        packed |= ((v as u16) as i64) << shift;
166    }
167    packed
168}
169
170/// Unpack a quality i64 back into individual values for display.
171pub fn unpack_quality(packed: i64, n: u8) -> Vec<i64> {
172    (0..n as usize)
173        .map(|i| {
174            let shift = (3 - i) * 16;
175            ((packed >> shift) as u16) as i64
176        })
177        .collect()
178}
179
180/// Backing function for per-value bucketed assertions.
181///
182/// Each unique combination of identity key values creates one bucket.
183/// Forks on first discovery. Optional quality keys re-fork when the packed
184/// quality score improves (CAS loop on `best_score`).
185///
186/// This is a no-op if EachBucket shared memory is not initialized.
187pub fn assertion_sometimes_each(msg: &str, keys: &[(&str, i64)], quality: &[(&str, i64)]) {
188    let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
189    if ptr.is_null() {
190        return;
191    }
192
193    // Compute bucket hash: site_hash mixed with identity key values only via FNV-1a.
194    // Quality values are NOT included — they're watermarks, not identity keys.
195    let site_hash = msg_hash(msg);
196    let mut bucket_hash = site_hash;
197    for &(_, val) in keys {
198        for b in val.to_le_bytes() {
199            bucket_hash ^= b as u32;
200            bucket_hash = bucket_hash.wrapping_mul(0x01000193);
201        }
202    }
203
204    // Mark coverage bitmap for adaptive yield detection.
205    // Different identity key combinations produce different bucket_hash values,
206    // so the bitmap distinguishes e.g. floor-1 from floor-2 assertions.
207    let bm_ptr = crate::context::COVERAGE_BITMAP_PTR.with(|c| c.get());
208    if !bm_ptr.is_null() {
209        // Safety: bm_ptr is non-null (checked above) and was set to a valid
210        // alloc_shared() pointer of COVERAGE_MAP_SIZE bytes during init().
211        let bm = unsafe { crate::coverage::CoverageBitmap::new(bm_ptr) };
212        bm.set_bit(bucket_hash as usize);
213    }
214
215    let has_quality = quality.len().min(4) as u8;
216    let score = if has_quality > 0 {
217        pack_quality(quality)
218    } else {
219        0
220    };
221
222    // Safety: ptr was allocated during init() with EACH_BUCKET_MEM_SIZE bytes.
223    let bucket =
224        unsafe { find_or_alloc_each_bucket(ptr, site_hash, bucket_hash, keys, msg, has_quality) };
225    if bucket.is_null() {
226        return;
227    }
228
229    // Safety: bucket points to valid MAP_SHARED memory. Atomic operations are used
230    // for cross-fork safety (parent waits on child via waitpid, but atomics ensure
231    // correct visibility for recursive fork scenarios).
232    unsafe {
233        // Increment pass count.
234        let count_atomic = &*((&(*bucket).pass_count) as *const u32 as *const AtomicU32);
235        count_atomic.fetch_add(1, Ordering::Relaxed);
236
237        // Fork on first discovery: CAS split_triggered from 0 → 1.
238        let ft = &*((&(*bucket).split_triggered) as *const u8 as *const AtomicU8);
239        let first_discovery = ft
240            .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
241            .is_ok();
242
243        if first_discovery {
244            // On first discovery, initialize best_score if quality-tracked.
245            if has_quality > 0 {
246                let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
247                bs_atomic.store(score, Ordering::Relaxed);
248            }
249
250            let bucket_index = compute_each_bucket_index(ptr, bucket);
251            crate::split_loop::dispatch_split(
252                msg,
253                bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
254            );
255        } else if has_quality > 0 {
256            // Not first discovery: check quality watermark improvement.
257            // CAS loop on best_score — re-fork when score improves.
258            let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
259            let mut current = bs_atomic.load(Ordering::Relaxed);
260            loop {
261                if score <= current {
262                    break;
263                }
264                match bs_atomic.compare_exchange_weak(
265                    current,
266                    score,
267                    Ordering::Relaxed,
268                    Ordering::Relaxed,
269                ) {
270                    Ok(_) => {
271                        let bucket_index = compute_each_bucket_index(ptr, bucket);
272                        crate::split_loop::dispatch_split(
273                            msg,
274                            bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
275                        );
276                        break;
277                    }
278                    Err(actual) => current = actual,
279                }
280            }
281        }
282    }
283}
284
285/// Read all recorded EachBucket entries from shared memory.
286///
287/// Returns an empty vector if EachBucket shared memory is not initialized.
288pub fn each_bucket_read_all() -> Vec<EachBucket> {
289    let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
290    if ptr.is_null() {
291        return Vec::new();
292    }
293    // Safety: ptr was allocated during init() with EACH_BUCKET_MEM_SIZE bytes.
294    // - The first 4 bytes hold the bucket count (u32), capped at MAX_EACH_BUCKETS.
295    // - base = ptr + 8 is the start of the EachBucket array.
296    // - Loop bound 0..count ensures base.add(i) stays within the allocated region.
297    // - EachBucket is #[repr(C)] + Copy, so ptr::read is valid for initialized slots.
298    unsafe {
299        let count = (*(ptr as *const u32)) as usize;
300        let count = count.min(MAX_EACH_BUCKETS);
301        let base = ptr.add(8) as *const EachBucket;
302        (0..count).map(|i| std::ptr::read(base.add(i))).collect()
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn test_msg_hash_deterministic() {
312        let h1 = msg_hash("test_assertion");
313        let h2 = msg_hash("test_assertion");
314        assert_eq!(h1, h2);
315    }
316
317    #[test]
318    fn test_msg_hash_different_inputs() {
319        let h1 = msg_hash("alpha");
320        let h2 = msg_hash("beta");
321        let h3 = msg_hash("gamma");
322        assert_ne!(h1, h2);
323        assert_ne!(h2, h3);
324        assert_ne!(h1, h3);
325    }
326
327    #[test]
328    fn test_pack_unpack_quality_roundtrip() {
329        let quality = &[("health", 100i64), ("armor", 50i64), ("mana", 200i64)];
330        let packed = pack_quality(quality);
331        let unpacked = unpack_quality(packed, 3);
332        assert_eq!(unpacked, vec![100, 50, 200]);
333    }
334
335    #[test]
336    fn test_pack_quality_single() {
337        let quality = &[("health", 42i64)];
338        let packed = pack_quality(quality);
339        let unpacked = unpack_quality(packed, 1);
340        assert_eq!(unpacked, vec![42]);
341    }
342
343    #[test]
344    fn test_each_bucket_size_stable() {
345        // EachBucket must have a stable size for shared memory layout.
346        // 4+4+1+1+1+1+4+8+6*8+32 = 104 bytes
347        assert_eq!(std::mem::size_of::<EachBucket>(), 104);
348    }
349
350    #[test]
351    fn test_each_bucket_read_all_when_inactive() {
352        // Should return empty when not initialized.
353        let buckets = each_bucket_read_all();
354        assert!(buckets.is_empty());
355    }
356
357    #[test]
358    fn test_assertion_sometimes_each_noop_when_inactive() {
359        // Should not panic when EachBucket memory is not initialized.
360        assertion_sometimes_each("test", &[("key", 1)], &[]);
361    }
362}