Skip to main content

moonpool_explorer/
each_buckets.rs

1//! Per-value bucketed exploration infrastructure for `assert_sometimes_each!`.
2//!
3//! Each unique combination of identity key values creates one bucket in shared
4//! memory. On first discovery, a fork is triggered. Optional quality watermarks
5//! allow re-forking when the packed quality score improves.
6//!
7//! # Memory Layout
8//!
9//! ```text
10//! [next_bucket: u32, _pad: u32, buckets: [EachBucket; MAX_EACH_BUCKETS]]
11//! ```
12//!
13//! The `next_bucket` counter is incremented atomically (via `AtomicU32::fetch_add`)
14//! to allocate new buckets safely across fork boundaries.
15
16use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
17
18/// Maximum number of EachBucket slots in shared memory.
19pub const MAX_EACH_BUCKETS: usize = 256;
20
21/// Maximum number of identity keys per bucket.
22pub const MAX_EACH_KEYS: usize = 6;
23
24/// Maximum length of the assertion message stored in a bucket.
25const EACH_MSG_LEN: usize = 32;
26
27/// Total shared memory size for the EachBucket region.
28pub const EACH_BUCKET_MEM_SIZE: usize = 8 + MAX_EACH_BUCKETS * std::mem::size_of::<EachBucket>();
29
30/// One bucket's state in MAP_SHARED memory for per-value bucketed assertions.
31///
32/// Each unique combination of identity key values creates one bucket.
33/// Optional quality watermark (`has_quality != 0`): re-forks when `best_score` improves.
34#[repr(C)]
35#[derive(Clone, Copy)]
36pub struct EachBucket {
37    /// FNV-1a hash of the assertion message string.
38    pub site_hash: u32,
39    /// Hash of (site_hash + identity key values) — uniquely identifies this bucket.
40    pub bucket_hash: u32,
41    /// CAS guard: 0 = no fork yet, 1 = first fork triggered.
42    pub split_triggered: u8,
43    /// Number of identity keys stored in `key_values`.
44    pub num_keys: u8,
45    /// Number of quality keys (0-4). 0 means no quality tracking.
46    pub has_quality: u8,
47    /// Alignment padding.
48    pub _pad: u8,
49    /// Number of times this bucket has been hit (atomic increment).
50    pub pass_count: u32,
51    /// Best quality watermark score (atomic CAS for improvement detection).
52    pub best_score: i64,
53    /// Identity key values for display/debugging.
54    pub key_values: [i64; MAX_EACH_KEYS],
55    /// Assertion message string (null-terminated C-style).
56    pub msg: [u8; EACH_MSG_LEN],
57}
58
59impl EachBucket {
60    /// Get the assertion message as a string slice.
61    pub fn msg_str(&self) -> &str {
62        let len = self
63            .msg
64            .iter()
65            .position(|&b| b == 0)
66            .unwrap_or(EACH_MSG_LEN);
67        std::str::from_utf8(&self.msg[..len]).unwrap_or("???")
68    }
69}
70
71/// FNV-1a hash of a message string to a stable u32.
72fn msg_hash(msg: &str) -> u32 {
73    let mut h: u32 = 0x811c9dc5;
74    for b in msg.bytes() {
75        h ^= b as u32;
76        h = h.wrapping_mul(0x01000193);
77    }
78    h
79}
80
81/// Find an existing bucket or allocate a new one by (site_hash, bucket_hash).
82///
83/// Returns a pointer to the bucket, or null if the table is full.
84///
85/// # Safety
86///
87/// `ptr` must point to a valid EachBucket shared memory region of at least
88/// `EACH_BUCKET_MEM_SIZE` bytes.
89unsafe fn find_or_alloc_each_bucket(
90    ptr: *mut u8,
91    site_hash: u32,
92    bucket_hash: u32,
93    keys: &[(&str, i64)],
94    msg: &str,
95    has_quality: u8,
96) -> *mut EachBucket {
97    unsafe {
98        let next_atomic = &*(ptr as *const AtomicU32);
99        let count = next_atomic.load(Ordering::Relaxed) as usize;
100        let base = ptr.add(8) as *mut EachBucket;
101
102        // Search existing buckets.
103        for i in 0..count.min(MAX_EACH_BUCKETS) {
104            let bucket = base.add(i);
105            if (*bucket).site_hash == site_hash && (*bucket).bucket_hash == bucket_hash {
106                return bucket;
107            }
108        }
109
110        // Allocate new bucket atomically.
111        let new_idx = next_atomic.fetch_add(1, Ordering::Relaxed) as usize;
112        if new_idx >= MAX_EACH_BUCKETS {
113            next_atomic.fetch_sub(1, Ordering::Relaxed);
114            return std::ptr::null_mut();
115        }
116
117        let bucket = base.add(new_idx);
118        let mut msg_buf = [0u8; EACH_MSG_LEN];
119        let n = msg.len().min(EACH_MSG_LEN - 1);
120        msg_buf[..n].copy_from_slice(&msg.as_bytes()[..n]);
121
122        let mut key_values = [0i64; MAX_EACH_KEYS];
123        let num_keys = keys.len().min(MAX_EACH_KEYS);
124        for (i, &(_, v)) in keys.iter().take(num_keys).enumerate() {
125            key_values[i] = v;
126        }
127
128        std::ptr::write(
129            bucket,
130            EachBucket {
131                site_hash,
132                bucket_hash,
133                split_triggered: 0,
134                num_keys: num_keys as u8,
135                has_quality,
136                _pad: 0,
137                pass_count: 0,
138                best_score: i64::MIN,
139                key_values,
140                msg: msg_buf,
141            },
142        );
143        bucket
144    }
145}
146
147/// Compute the 0-based array index of a bucket from its pointer.
148fn compute_each_bucket_index(base_ptr: *mut u8, bucket: *const EachBucket) -> usize {
149    if base_ptr.is_null() {
150        return 0;
151    }
152    let buckets_base = unsafe { base_ptr.add(8) } as usize;
153    let offset = (bucket as usize).saturating_sub(buckets_base);
154    offset / std::mem::size_of::<EachBucket>()
155}
156
157/// Pack up to 4 quality key values into a single i64 for lexicographic comparison.
158///
159/// First key gets the highest 16 bits (highest priority).
160fn pack_quality(quality: &[(&str, i64)]) -> i64 {
161    let mut packed: i64 = 0;
162    let n = quality.len().min(4);
163    for (i, &(_, v)) in quality.iter().take(n).enumerate() {
164        let shift = (3 - i) * 16;
165        packed |= ((v as u16) as i64) << shift;
166    }
167    packed
168}
169
170/// Unpack a quality i64 back into individual values for display.
171pub fn unpack_quality(packed: i64, n: u8) -> Vec<i64> {
172    (0..n as usize)
173        .map(|i| {
174            let shift = (3 - i) * 16;
175            ((packed >> shift) as u16) as i64
176        })
177        .collect()
178}
179
180/// Backing function for per-value bucketed assertions.
181///
182/// Each unique combination of identity key values creates one bucket.
183/// Forks on first discovery. Optional quality keys re-fork when the packed
184/// quality score improves (CAS loop on `best_score`).
185///
186/// This is a no-op if EachBucket shared memory is not initialized.
187pub fn assertion_sometimes_each(msg: &str, keys: &[(&str, i64)], quality: &[(&str, i64)]) {
188    let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
189    if ptr.is_null() {
190        return;
191    }
192
193    // Compute bucket hash: site_hash mixed with identity key values only via FNV-1a.
194    // Quality values are NOT included — they're watermarks, not identity keys.
195    let site_hash = msg_hash(msg);
196    let mut bucket_hash = site_hash;
197    for &(_, val) in keys {
198        for b in val.to_le_bytes() {
199            bucket_hash ^= b as u32;
200            bucket_hash = bucket_hash.wrapping_mul(0x01000193);
201        }
202    }
203
204    // Mark coverage bitmap for adaptive yield detection.
205    // Different identity key combinations produce different bucket_hash values,
206    // so the bitmap distinguishes e.g. floor-1 from floor-2 assertions.
207    let bm_ptr = crate::context::COVERAGE_BITMAP_PTR.with(|c| c.get());
208    if !bm_ptr.is_null() {
209        let bm = unsafe { crate::coverage::CoverageBitmap::new(bm_ptr) };
210        bm.set_bit(bucket_hash as usize);
211    }
212
213    let has_quality = quality.len().min(4) as u8;
214    let score = if has_quality > 0 {
215        pack_quality(quality)
216    } else {
217        0
218    };
219
220    // Safety: ptr was allocated during init() with EACH_BUCKET_MEM_SIZE bytes.
221    let bucket =
222        unsafe { find_or_alloc_each_bucket(ptr, site_hash, bucket_hash, keys, msg, has_quality) };
223    if bucket.is_null() {
224        return;
225    }
226
227    // Safety: bucket points to valid MAP_SHARED memory. Atomic operations are used
228    // for cross-fork safety (parent waits on child via waitpid, but atomics ensure
229    // correct visibility for recursive fork scenarios).
230    unsafe {
231        // Increment pass count.
232        let count_atomic = &*((&(*bucket).pass_count) as *const u32 as *const AtomicU32);
233        count_atomic.fetch_add(1, Ordering::Relaxed);
234
235        // Fork on first discovery: CAS split_triggered from 0 → 1.
236        let ft = &*((&(*bucket).split_triggered) as *const u8 as *const AtomicU8);
237        let first_discovery = ft
238            .compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
239            .is_ok();
240
241        if first_discovery {
242            // On first discovery, initialize best_score if quality-tracked.
243            if has_quality > 0 {
244                let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
245                bs_atomic.store(score, Ordering::Relaxed);
246            }
247
248            let bucket_index = compute_each_bucket_index(ptr, bucket);
249            crate::split_loop::dispatch_split(
250                msg,
251                bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
252            );
253        } else if has_quality > 0 {
254            // Not first discovery: check quality watermark improvement.
255            // CAS loop on best_score — re-fork when score improves.
256            let bs_atomic = &*((&(*bucket).best_score) as *const i64 as *const AtomicI64);
257            let mut current = bs_atomic.load(Ordering::Relaxed);
258            loop {
259                if score <= current {
260                    break;
261                }
262                match bs_atomic.compare_exchange_weak(
263                    current,
264                    score,
265                    Ordering::Relaxed,
266                    Ordering::Relaxed,
267                ) {
268                    Ok(_) => {
269                        let bucket_index = compute_each_bucket_index(ptr, bucket);
270                        crate::split_loop::dispatch_split(
271                            msg,
272                            bucket_index % crate::assertion_slots::MAX_ASSERTION_SLOTS,
273                        );
274                        break;
275                    }
276                    Err(actual) => current = actual,
277                }
278            }
279        }
280    }
281}
282
283/// Read all recorded EachBucket entries from shared memory.
284///
285/// Returns an empty vector if EachBucket shared memory is not initialized.
286pub fn each_bucket_read_all() -> Vec<EachBucket> {
287    let ptr = crate::context::EACH_BUCKET_PTR.with(|c| c.get());
288    if ptr.is_null() {
289        return Vec::new();
290    }
291    unsafe {
292        let count = (*(ptr as *const u32)) as usize;
293        let count = count.min(MAX_EACH_BUCKETS);
294        let base = ptr.add(8) as *const EachBucket;
295        (0..count).map(|i| std::ptr::read(base.add(i))).collect()
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_msg_hash_deterministic() {
305        let h1 = msg_hash("test_assertion");
306        let h2 = msg_hash("test_assertion");
307        assert_eq!(h1, h2);
308    }
309
310    #[test]
311    fn test_msg_hash_different_inputs() {
312        let h1 = msg_hash("alpha");
313        let h2 = msg_hash("beta");
314        let h3 = msg_hash("gamma");
315        assert_ne!(h1, h2);
316        assert_ne!(h2, h3);
317        assert_ne!(h1, h3);
318    }
319
320    #[test]
321    fn test_pack_unpack_quality_roundtrip() {
322        let quality = &[("health", 100i64), ("armor", 50i64), ("mana", 200i64)];
323        let packed = pack_quality(quality);
324        let unpacked = unpack_quality(packed, 3);
325        assert_eq!(unpacked, vec![100, 50, 200]);
326    }
327
328    #[test]
329    fn test_pack_quality_single() {
330        let quality = &[("health", 42i64)];
331        let packed = pack_quality(quality);
332        let unpacked = unpack_quality(packed, 1);
333        assert_eq!(unpacked, vec![42]);
334    }
335
336    #[test]
337    fn test_each_bucket_size_stable() {
338        // EachBucket must have a stable size for shared memory layout.
339        // 4+4+1+1+1+1+4+8+6*8+32 = 104 bytes
340        assert_eq!(std::mem::size_of::<EachBucket>(), 104);
341    }
342
343    #[test]
344    fn test_each_bucket_read_all_when_inactive() {
345        // Should return empty when not initialized.
346        let buckets = each_bucket_read_all();
347        assert!(buckets.is_empty());
348    }
349
350    #[test]
351    fn test_assertion_sometimes_each_noop_when_inactive() {
352        // Should not panic when EachBucket memory is not initialized.
353        assertion_sometimes_each("test", &[("key", 1)], &[]);
354    }
355}