maniac_runtime/runtime/
summary.rs

1use super::waker::WorkerWaker;
2use crate::utils::bits;
3use crate::utils::CachePadded;
4use std::marker::PhantomData;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Condvar, Mutex};
7use std::time::Duration;
8
9const TASK_SLOTS_PER_SIGNAL: usize = u64::BITS as usize;
10
11/// Single-level summary tree for task work-stealing.
12///
13/// This tree tracks ONLY task signals - no yield or worker state.
14/// Each leaf represents a set of task signal words.
15///
16/// The Summary coordinates with SignalWakers to notify partition owners
17/// when their assigned leafs become active/inactive.
18///
19/// # Thread Safety
20/// This struct is designed for high-concurrency scenarios with the following guarantees:
21/// - All operations use atomic primitives for lock-free operation
22/// - Raw pointers are used for performance but are guaranteed safe by WorkerService lifetime
23/// - Implements `Send` and `Sync` for cross-thread usage
24///
25/// # Memory Layout
26/// - `leaf_words`: Atomic bitfields tracking which signals have pending tasks
27/// - `task_reservations`: Atomic bitfields for task slot reservations within each signal
28/// - Round-robin cursors distribute load evenly across leaves and signals
29/// - Partition mapping enables work-stealing between worker threads
30pub struct Summary {
31    // Owned heap allocations
32    pub(crate) leaf_words: Box<[AtomicU64]>, // Pub for Worker access
33    task_reservations: Box<[AtomicU64]>,
34
35    // Configuration
36    leaf_count: usize,
37    signals_per_leaf: usize,
38    leaf_summary_mask: u64,
39
40    // Round-robin cursors for allocation
41    // CachePadded to prevent false sharing between CPU cores
42    next_partition: CachePadded<AtomicUsize>,
43
44    // Partition owner notification
45    // Raw pointer to WorkerService.wakers array (lifetime guaranteed by WorkerService ownership)
46    wakers: *const Arc<WorkerWaker>,
47    wakers_len: usize,
48    // Shared reference to worker_count - keeps it alive
49    worker_count: Arc<AtomicUsize>,
50
51    // Provenance tracking for raw pointers
52    _marker: PhantomData<&'static WorkerWaker>,
53}
54
55unsafe impl Send for Summary {}
56unsafe impl Sync for Summary {}
57
58impl Summary {
59    /// Creates a new Summary with the specified dimensions.
60    ///
61    /// # Arguments
62    /// * `leaf_count` - Number of leaf nodes (typically matches worker partition count)
63    /// * `signals_per_leaf` - Number of task signal words per leaf (typically tasks_per_leaf / 64)
64    /// * `wakers` - Slice of SignalWakers for partition owner notification
65    /// * `worker_count` - Reference to WorkerService's worker_count atomic (single source of truth)
66    ///
67    /// # Safety
68    /// The wakers slice and worker_count reference must remain valid for the lifetime of this Summary.
69    /// This is guaranteed when Summary is owned by WorkerService which also owns the wakers and worker_count.
70    ///
71    /// # Memory Allocation
72    /// Allocates `leaf_count * signals_per_leaf * 8` bytes for reservations plus overhead for leaf words and cursors.
73    pub fn new(
74        leaf_count: usize,
75        signals_per_leaf: usize,
76        wakers: &[Arc<WorkerWaker>],
77        worker_count: &Arc<AtomicUsize>,
78    ) -> Self {
79        assert!(leaf_count > 0, "leaf_count must be > 0");
80        assert!(signals_per_leaf > 0, "signals_per_leaf must be > 0");
81        assert!(signals_per_leaf <= 64, "signals_per_leaf must be <= 64");
82        assert!(!wakers.is_empty(), "wakers must not be empty");
83
84        let task_word_count = leaf_count * signals_per_leaf;
85
86        // Initialize leaf words (all signals initially inactive)
87        let leaf_words = (0..leaf_count)
88            .map(|_| AtomicU64::new(0))
89            .collect::<Vec<_>>()
90            .into_boxed_slice();
91
92        // Initialize reservation bitmaps (all slots initially free)
93        let task_reservations = (0..task_word_count)
94            .map(|_| AtomicU64::new(0))
95            .collect::<Vec<_>>()
96            .into_boxed_slice();
97
98        // Create mask for valid signal bits in each leaf
99        let leaf_summary_mask = if signals_per_leaf >= 64 {
100            u64::MAX
101        } else {
102            (1u64 << signals_per_leaf) - 1
103        };
104
105        Self {
106            leaf_words,
107            task_reservations,
108            leaf_count,
109            signals_per_leaf,
110            leaf_summary_mask,
111            next_partition: CachePadded::new(AtomicUsize::new(0)),
112            wakers: wakers.as_ptr(),
113            wakers_len: wakers.len(),
114            worker_count: Arc::clone(worker_count),
115            _marker: PhantomData,
116        }
117    }
118
119    /// Get the current worker count from WorkerService.
120    /// Reads directly from the single source of truth.
121    ///
122    /// # Atomic Semantics
123    /// Uses `Relaxed` ordering since this is only used for informational purposes
124    /// and doesn't require synchronization with other memory operations.
125    #[inline]
126    pub fn get_worker_count(&self) -> usize {
127        self.worker_count.load(Ordering::Relaxed)
128    }
129
130    #[inline(always)]
131    fn leaf_word(&self, idx: usize) -> &AtomicU64 {
132        &self.leaf_words[idx]
133    }
134
135    #[inline(always)]
136    fn reservation_index(&self, leaf_idx: usize, signal_idx: usize) -> usize {
137        leaf_idx * self.signals_per_leaf + signal_idx
138    }
139
140    #[inline(always)]
141    fn reservation_word(&self, leaf_idx: usize, signal_idx: usize) -> &AtomicU64 {
142        &self.task_reservations[self.reservation_index(leaf_idx, signal_idx)]
143    }
144
145
146    #[inline(always)]
147    fn try_reserve_in_leaf(&self, leaf_idx: usize) -> Option<(usize, usize, u8)> {
148        if self.signals_per_leaf == 0 {
149            return None;
150        }
151        let mut signal_mask = self.leaf_summary_mask;
152        while signal_mask != 0 {
153            let signal_idx = signal_mask.trailing_zeros() as usize;
154            if signal_idx >= self.signals_per_leaf {
155                break;
156            }
157            if let Some(bit) = self.reserve_task_in_leaf(leaf_idx, signal_idx) {
158                return Some((leaf_idx, signal_idx, bit));
159            }
160            signal_mask &= signal_mask - 1;
161        }
162        None
163    }
164
165    /// Notify the partition owner's SignalWaker that a leaf in their partition became active.
166    ///
167    /// # Atomic Semantics
168    /// This function reads the worker count with Acquire ordering to ensure visibility
169    /// of prior worker service state changes. It includes validation to handle the case
170    /// where worker count changes between loading and using it (TOCTOU race).
171    ///
172    /// # Race Condition Mitigation
173    /// - Validates worker_count against wakers_len to prevent out-of-bounds access
174    /// - Validates owner_id against current worker_count to handle worker shutdown
175    /// - These checks ensure safe operation even if worker configuration changes
176    #[inline(always)]
177    fn notify_partition_owner_active(&self, leaf_idx: usize) {
178        let worker_count = self.worker_count.load(Ordering::Acquire);
179        // Validate worker count to prevent out-of-bounds access
180        if worker_count == 0 || worker_count > self.wakers_len {
181            return;
182        }
183
184        let owner_id = self.compute_partition_owner(leaf_idx, worker_count);
185        // Additional validation: owner_id must be within current worker count
186        // This handles the case where worker count decreases after we loaded it
187        if owner_id >= worker_count {
188            return;
189        }
190
191        if owner_id < self.wakers_len {
192            // SAFETY: wakers pointer is valid for the lifetime of Summary
193            // because WorkerService owns both
194            let waker = unsafe { &*self.wakers.add(owner_id) };
195
196            // Compute local leaf index within owner's partition
197            if let Some(local_idx) = self.global_to_local_leaf_idx(leaf_idx, owner_id, worker_count)
198            {
199                waker.mark_partition_leaf_active(local_idx);
200            }
201        }
202    }
203
204    /// Notify the partition owner's SignalWaker that a leaf in their partition became inactive.
205    ///
206    /// # Atomic Semantics
207    /// This function reads the worker count with Acquire ordering to ensure visibility
208    /// of prior worker service state changes. It includes validation to handle the case
209    /// where worker count changes between loading and using it (TOCTOU race).
210    ///
211    /// # Race Condition Mitigation
212    /// - Validates worker_count against wakers_len to prevent out-of-bounds access
213    /// - Validates owner_id against current worker_count to handle worker shutdown
214    /// - These checks ensure safe operation even if worker configuration changes
215    #[inline(always)]
216    fn notify_partition_owner_inactive(&self, leaf_idx: usize) {
217        let worker_count = self.worker_count.load(Ordering::Acquire);
218        // Validate worker count to prevent out-of-bounds access
219        if worker_count == 0 || worker_count > self.wakers_len {
220            return;
221        }
222
223        let owner_id = self.compute_partition_owner(leaf_idx, worker_count);
224        // Additional validation: owner_id must be within current worker count
225        // This handles the case where worker count decreases after we loaded it
226        if owner_id >= worker_count {
227            return;
228        }
229
230        if owner_id < self.wakers_len {
231            // SAFETY: wakers pointer is valid for the lifetime of Summary
232            // because WorkerService owns both
233            let waker = unsafe { &*self.wakers.add(owner_id) };
234
235            // Compute local leaf index within owner's partition
236            if let Some(local_idx) = self.global_to_local_leaf_idx(leaf_idx, owner_id, worker_count)
237            {
238                waker.clear_partition_leaf(local_idx);
239            }
240        }
241    }
242
243    #[inline(always)]
244    fn mark_leaf_bits(&self, leaf_idx: usize, mask: u64) -> bool {
245        if mask == 0 {
246            return false;
247        }
248        let leaf = self.leaf_word(leaf_idx);
249        // Atomic Read-Modify-Write: fetch_or returns previous value and sets new bits
250        // Using AcqRel ordering to both see previous changes and make new changes visible
251        let prev = leaf.fetch_or(mask, Ordering::AcqRel);
252
253        let was_empty = prev & self.leaf_summary_mask == 0;
254        // Check if we actually added new bits (not just setting already-set bits)
255        let any_new_bits = (prev & mask) != mask;
256
257        // Notify partition owner if any new signal bits were set
258        // Note: There's a small race window here where bits could be cleared
259        // before notification, but this is acceptable for performance
260        if any_new_bits {
261            self.notify_partition_owner_active(leaf_idx);
262        }
263
264        was_empty
265    }
266
267    #[inline(always)]
268    fn clear_leaf_bits(&self, leaf_idx: usize, mask: u64) -> bool {
269        if mask == 0 {
270            return false;
271        }
272        let leaf = self.leaf_word(leaf_idx);
273        // Atomic Read-Modify-Write: fetch_and returns previous value and clears bits
274        // Using AcqRel ordering to both see previous changes and make new changes visible
275        let prev = leaf.fetch_and(!mask, Ordering::AcqRel);
276        if prev & mask == 0 {
277            return false; // Bits were already cleared
278        }
279        // Bits were successfully cleared - return true
280        // Also notify partition owner if this leaf is now completely empty
281        // The check uses the atomic snapshot `prev` to avoid races
282        if (prev & !mask) & self.leaf_summary_mask == 0 {
283            self.notify_partition_owner_inactive(leaf_idx);
284        }
285        true
286    }
287
288    /// Sets the summary bit for a task signal.
289    ///
290    /// # Returns
291    /// `true` if the leaf was empty before setting this signal (useful for work-stealing decisions)
292    ///
293    /// # Atomic Semantics
294    /// Uses `fetch_or` with `AcqRel` ordering to atomically set bits and get previous state.
295    /// Notifies the partition owner if new bits were added.
296    pub fn mark_signal_active(&self, leaf_idx: usize, signal_idx: usize) -> bool {
297        if leaf_idx >= self.leaf_count || signal_idx >= self.signals_per_leaf {
298            return false;
299        }
300        debug_assert!(signal_idx < self.signals_per_leaf);
301        let mask = 1u64 << signal_idx;
302        self.mark_leaf_bits(leaf_idx, mask)
303    }
304
305    /// Clears the summary bit for a task signal.
306    ///
307    /// # Returns
308    /// `true` if bits were successfully cleared, `false` if indices are invalid or bits were already cleared
309    ///
310    /// # Atomic Semantics
311    /// Uses `fetch_and` with `AcqRel` ordering to atomically clear bits and get previous state.
312    /// Notifies the partition owner if the leaf became empty.
313    pub fn mark_signal_inactive(&self, leaf_idx: usize, signal_idx: usize) -> bool {
314        if leaf_idx >= self.leaf_count || signal_idx >= self.signals_per_leaf {
315            return false;
316        }
317        debug_assert!(signal_idx < self.signals_per_leaf);
318        let mask = 1u64 << signal_idx;
319        self.clear_leaf_bits(leaf_idx, mask)
320    }
321
322    /// Attempts to reserve a task slot within (`leaf_idx`, `signal_idx`).
323    ///
324    /// # Returns
325    /// The bit index (0-63) of the reserved slot, or `None` if all slots are taken
326    ///
327    /// # Atomic Semantics
328    /// Implements a lock-free reservation system using atomic CAS (Compare-And-Swap) loops.
329    /// - Uses `Acquire` ordering for loads to see completed reservations
330    /// - Uses `AcqRel` for successful CAS to make reservation visible to others
331    /// - Uses `Acquire` for failed CAS to see the updated state
332    ///
333    /// # Algorithm
334    /// 1. Load current reservation bitmap
335    /// 2. Use a per-signal round-robin cursor to pick a starting bit
336    /// 3. Rotate the bitmap so trailing_zeros finds the next free slot after the cursor
337    /// 4. Attempt to atomically set that bit with CAS
338    /// 5. If CAS fails (another thread reserved it), retry with updated value
339    /// 6. Continue until success or no free bits remain
340    pub fn reserve_task_in_leaf(&self, leaf_idx: usize, signal_idx: usize) -> Option<u8> {
341        if leaf_idx >= self.leaf_count || signal_idx >= self.signals_per_leaf {
342            return None;
343        }
344        let reservations = self.reservation_word(leaf_idx, signal_idx);
345        let mut current = reservations.load(Ordering::Acquire);
346        loop {
347            let free = !current;
348            if free == 0 {
349                return None; // All bits reserved
350            }
351            let bit = free.trailing_zeros() as u8;
352            let mask = 1u64 << bit;
353            match reservations.compare_exchange(
354                current,
355                current | mask,
356                Ordering::AcqRel,
357                Ordering::Acquire,
358            ) {
359                Ok(_) => return Some(bit),
360                Err(updated) => current = updated,
361            }
362        }
363    }
364
365    /// Clears a previously reserved task slot.
366    ///
367    /// # Atomic Semantics
368    /// Uses `fetch_and` with `SeqCst` ordering to atomically clear the reservation bit.
369    /// This ensures the release is visible to other threads attempting reservations.
370    pub fn release_task_in_leaf(&self, leaf_idx: usize, signal_idx: usize, bit: usize) {
371        if leaf_idx >= self.leaf_count
372            || signal_idx >= self.signals_per_leaf
373            || bit >= TASK_SLOTS_PER_SIGNAL
374        {
375            return;
376        }
377        let mask = !(1u64 << bit);
378        self.reservation_word(leaf_idx, signal_idx)
379            .fetch_and(mask, Ordering::AcqRel);
380    }
381
382    /// Convenience function: reserve the first available task slot across the arena.
383    ///
384    /// # Atomic Semantics
385    /// Uses round-robin cursors with `SeqCst` ordering to ensure proper synchronization
386    /// between threads when selecting partitions, leaves, and signals. The actual reservation
387    /// uses the CAS loop in `reserve_task_in_leaf` which provides proper synchronization.
388    pub fn reserve_task(&self) -> Option<(usize, usize, u8)> {
389        if self.leaf_count == 0 {
390            return None;
391        }
392        if self.signals_per_leaf == 0 {
393            return None;
394        }
395
396        // Exhaustively scan partitions one by one. Each partition represents a worker's slice of
397        // leaves, so rotating by partition keeps contention localized while still guaranteeing we
398        // eventually visit every leaf if the partition has no free slots.
399        let worker_count = self.get_worker_count();
400        let partition_count = worker_count.max(1);
401        let start_partition =
402            self.next_partition.fetch_add(1, Ordering::SeqCst) % partition_count;
403
404        for partition_offset in 0..partition_count {
405            let worker_id = (start_partition + partition_offset) % partition_count;
406            let partition_start = self.partition_start_for_worker(worker_id, partition_count);
407            let partition_end = self.partition_end_for_worker(worker_id, partition_count);
408            if partition_end <= partition_start {
409                continue; // Empty partition (more workers than leaves)
410            }
411
412            let partition_len = partition_end - partition_start;
413            if partition_len == 1 {
414                if let Some(found) = self.try_reserve_in_leaf(partition_start) {
415                    return Some(found);
416                }
417                continue;
418            }
419
420            // Use a simple random starting point within the partition to distribute load
421            let random_seed = crate::utils::random_u64() as usize;
422            let start_leaf_offset = random_seed % partition_len;
423
424            for leaf_offset in 0..partition_len {
425                let leaf_idx =
426                    partition_start + (start_leaf_offset + leaf_offset) % partition_len;
427                if let Some(found) = self.try_reserve_in_leaf(leaf_idx) {
428                    return Some(found);
429                }
430            }
431        }
432        None
433    }
434
435    /// Clears the summary bit when the corresponding task signal becomes empty.
436    ///
437    /// # ⚠️ CORRECTNESS ISSUE - TOCTOU Race Condition
438    ///
439    /// This function has an unfixed race condition between checking and clearing:
440    ///
441    /// **Race scenario:**
442    /// 1. Thread A: `signal.load()` sees `0`
443    /// 2. Thread B: Enqueues task, `signal` becomes `1`, calls `mark_signal_active()`
444    /// 3. Thread A: Calls `mark_signal_inactive()`, clearing the bit Thread B just set
445    /// 4. Result: `signal` has tasks but summary bit is cleared → **lost work notification**
446    ///
447    /// **Why this is problematic:**
448    /// - Work-stealing threads rely on summary bits to find available work
449    /// - Clearing the bit while tasks exist makes those tasks invisible to stealers
450    /// - The task enqueue won't re-set the bit because it already set it in step 2
451    ///
452    /// **Proper fix requires one of:**
453    /// 1. Caller-side synchronization ensuring signal cannot be modified during this call
454    /// 2. API change: pass mutable/exclusive access to signal to perform atomic check-and-clear
455    /// 3. Accepting false negatives: allow summary bit to remain set even when signal is empty
456    ///    (wastes stealer cycles but is always safe)
457    ///
458    /// **Current mitigation:** None - callers must ensure signal stability externally.
459    ///
460    /// # Atomic Semantics
461    /// Uses `Acquire` ordering to ensure visibility of all prior writes to the signal.
462    pub fn mark_signal_inactive_if_empty(
463        &self,
464        leaf_idx: usize,
465        signal_idx: usize,
466        signal: &AtomicU64,
467    ) {
468        // WARNING: This check-then-act pattern is inherently racy
469        // See documentation above for details
470        if signal.load(Ordering::Acquire) == 0 {
471            self.mark_signal_inactive(leaf_idx, signal_idx);
472        }
473    }
474
475    #[inline(always)]
476    pub fn leaf_count(&self) -> usize {
477        self.leaf_count
478    }
479
480    #[inline(always)]
481    pub fn signals_per_leaf(&self) -> usize {
482        self.signals_per_leaf
483    }
484
485    // ────────────────────────────────────────────────────────────────────────────
486    // PARTITION MANAGEMENT HELPERS
487    // ────────────────────────────────────────────────────────────────────────────
488    // These functions handle the mapping between global leaf indices and worker partitions.
489    // They implement a load-balancing algorithm that distributes leaves evenly across workers.
490
491    /// Compute which worker owns a given leaf based on partition assignments.
492    ///
493    /// This is the inverse of `Worker::compute_partition()`. Given a leaf index,
494    /// it determines which worker is responsible for processing tasks in that leaf.
495    ///
496    /// # Partition Algorithm
497    /// Uses a balanced distribution where:
498    /// - First `leaf_count % worker_count` workers get `(leaf_count / worker_count) + 1` leaves
499    /// - Remaining workers get `leaf_count / worker_count` leaves
500    /// This ensures leaves are distributed as evenly as possible.
501    ///
502    /// # Arguments
503    ///
504    /// * `leaf_idx` - The global leaf index (0..leaf_count)
505    /// * `worker_count` - Total number of active workers
506    ///
507    /// # Returns
508    ///
509    /// Worker ID (0..worker_count) that owns this leaf
510    ///
511    /// # Example
512    ///
513    /// ```ignore
514    /// let owner_id = summary_tree.compute_partition_owner(leaf_idx, worker_count);
515    /// let owner_waker = &service.wakers[owner_id];
516    /// owner_waker.mark_partition_leaf_active(local_idx);
517    /// ```
518    pub fn compute_partition_owner(&self, leaf_idx: usize, worker_count: usize) -> usize {
519        if worker_count == 0 {
520            return 0;
521        }
522
523        let base = self.leaf_count / worker_count;
524        let extra = self.leaf_count % worker_count;
525
526        // First 'extra' workers get (base + 1) leafs each
527        let boundary = extra * (base + 1);
528
529        if leaf_idx < boundary {
530            leaf_idx / (base + 1)
531        } else {
532            extra + (leaf_idx - boundary) / base
533        }
534    }
535
536    /// Compute the partition start index for a given worker.
537    ///
538    /// # Arguments
539    ///
540    /// * `worker_id` - Worker ID (0..worker_count)
541    /// * `worker_count` - Total number of active workers
542    ///
543    /// # Returns
544    ///
545    /// First leaf index in this worker's partition
546    pub fn partition_start_for_worker(&self, worker_id: usize, worker_count: usize) -> usize {
547        if worker_count == 0 {
548            return 0;
549        }
550
551        let base = self.leaf_count / worker_count;
552        let extra = self.leaf_count % worker_count;
553
554        if worker_id < extra {
555            worker_id * (base + 1)
556        } else {
557            extra * (base + 1) + (worker_id - extra) * base
558        }
559    }
560
561    /// Compute the partition end index for a given worker.
562    ///
563    /// # Arguments
564    ///
565    /// * `worker_id` - Worker ID (0..worker_count)
566    /// * `worker_count` - Total number of active workers
567    ///
568    /// # Returns
569    ///
570    /// One past the last leaf index in this worker's partition (exclusive)
571    pub fn partition_end_for_worker(&self, worker_id: usize, worker_count: usize) -> usize {
572        if worker_count == 0 {
573            return 0;
574        }
575
576        let start = self.partition_start_for_worker(worker_id, worker_count);
577        let base = self.leaf_count / worker_count;
578        let extra = self.leaf_count % worker_count;
579
580        let len = if worker_id < extra { base + 1 } else { base };
581
582        (start + len).min(self.leaf_count)
583    }
584
585    /// Convert a global leaf index to a local index within a worker's partition.
586    ///
587    /// # Arguments
588    ///
589    /// * `leaf_idx` - Global leaf index
590    /// * `worker_id` - Worker ID
591    /// * `worker_count` - Total number of workers
592    ///
593    /// # Returns
594    ///
595    /// Local leaf index (0..partition_size) for use with SignalWaker partition bitmap,
596    /// or None if the leaf is not in this worker's partition
597    pub fn global_to_local_leaf_idx(
598        &self,
599        leaf_idx: usize,
600        worker_id: usize,
601        worker_count: usize,
602    ) -> Option<usize> {
603        let partition_start = self.partition_start_for_worker(worker_id, worker_count);
604        let partition_end = self.partition_end_for_worker(worker_id, worker_count);
605
606        if leaf_idx >= partition_start && leaf_idx < partition_end {
607            Some(leaf_idx - partition_start)
608        } else {
609            None
610        }
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use crate::runtime::waker::WorkerWaker;
617
618    use super::*;
619    use std::collections::HashSet;
620    use std::sync::{Arc, Barrier, Mutex};
621    use std::thread;
622    use std::thread::yield_now;
623    use std::time::{Duration, Instant};
624
625    /// Helper function to create a test Summary with dummy wakers
626    fn setup_tree(leaf_count: usize, signals_per_leaf: usize) -> (Summary, Vec<Arc<WorkerWaker>>, Arc<AtomicUsize>) {
627        // Create dummy wakers for testing
628        // SAFETY: The returned Arc<AtomicUsize> must outlive the Summary instance
629        // to prevent dangling pointer access via Summary.worker_count
630        let wakers: Vec<Arc<WorkerWaker>> = (0..4).map(|_| Arc::new(WorkerWaker::new())).collect();
631        let worker_count = Arc::new(AtomicUsize::new(4));
632        let tree = Summary::new(leaf_count, signals_per_leaf, &wakers, &worker_count);
633        (tree, wakers, worker_count)
634    }
635
636    /// Test that marking a signal active updates the leaf word correctly
637    /// and that duplicate marking returns false (idempotent behavior)
638    #[test]
639    fn mark_signal_active_updates_root_and_leaf() {
640        let (tree, _wakers, _worker_count) = setup_tree(4, 4);
641        
642        // First activation should return true (leaf was empty)
643        assert!(tree.mark_signal_active(1, 1));
644        assert_eq!(tree.leaf_words[1].load(Ordering::Relaxed), 1u64 << 1);
645        
646        // Duplicate activation should return false (already active)
647        assert!(!tree.mark_signal_active(1, 1));
648        
649        // Clearing should work and return true (leaf became empty)
650        assert!(tree.mark_signal_inactive(1, 1));
651        assert_eq!(tree.leaf_words[1].load(Ordering::Relaxed), 0);
652    }
653
654    /// Test the conditional clearing functionality when signal is empty
655    /// This tests the race-prone mark_signal_inactive_if_empty function
656    #[test]
657    fn mark_signal_inactive_if_empty_clears_summary() {
658        let (tree, _wakers, _worker_count) = setup_tree(1, 2);
659        
660        // Activate a signal
661        assert!(tree.mark_signal_active(0, 1));
662        
663        // Create an empty signal for testing
664        let signal = AtomicU64::new(0);
665        
666        // Should clear the summary bit since signal is empty
667        tree.mark_signal_inactive_if_empty(0, 1, &signal);
668        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
669    }
670
671    /// Test that task reservation exhausts all 64 bits correctly
672    /// Validates the CAS loop implementation and bit manipulation
673    #[test]
674    fn reserve_task_in_leaf_exhausts_all_bits() {
675        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
676        let mut bits = Vec::with_capacity(64);
677        
678        // Reserve all 64 bits
679        for _ in 0..64 {
680            let bit = tree.reserve_task_in_leaf(0, 0).expect("expected free bit");
681            bits.push(bit);
682        }
683        
684        // Verify we got all unique bits 0-63
685        bits.sort_unstable();
686        assert_eq!(bits, (0..64).collect::<Vec<_>>());
687        
688        // Should be exhausted now
689        assert!(
690            tree.reserve_task_in_leaf(0, 0).is_none(),
691            "all bits should be exhausted"
692        );
693        
694        // Release all bits for cleanup
695        for bit in bits {
696            tree.release_task_in_leaf(0, 0, bit as usize);
697        }
698    }
699
700    /// Test distribution across leaves
701    /// Validates that reserve_task can visit all leaves
702    #[test]
703    fn reserve_task_round_robin_visits_all_leaves() {
704        let (tree, _wakers, _worker_count) = setup_tree(4, 1);
705        let mut observed = Vec::with_capacity(16);
706        
707        // Reserve multiple tasks to ensure we visit different leaves
708        // With random starting points, we should eventually hit all leaves
709        for _ in 0..16 {
710            let (leaf, sig, bit) = tree.reserve_task().expect("reserve task");
711            observed.push(leaf);
712            tree.release_task_in_leaf(leaf, sig, bit as usize);
713        }
714        
715        // Should have visited all 4 leaves at least once
716        observed.sort_unstable();
717        observed.dedup();
718        assert_eq!(observed.len(), 4, "Should visit all 4 leaves");
719    }
720
721    /// Stress test: concurrent reservations must be unique
722    /// Tests atomicity of CAS loop under high contention
723    #[test]
724    fn concurrent_reservations_are_unique() {
725        let (tree, _wakers, _worker_count) = setup_tree(4, 1);
726        let tree = Arc::new(tree);
727        let threads = 8;
728        let reservations_per_thread = 8;
729        let barrier = Arc::new(Barrier::new(threads));
730        let handles = Arc::new(Mutex::new(Vec::with_capacity(
731            threads * reservations_per_thread,
732        )));
733
734        let mut join_handles = Vec::with_capacity(threads);
735        for _ in 0..threads {
736            let tree_clone = Arc::clone(&tree);
737            let barrier = Arc::clone(&barrier);
738            let handles = Arc::clone(&handles);
739            join_handles.push(thread::spawn(move || {
740                barrier.wait();
741                for _ in 0..reservations_per_thread {
742                    loop {
743                        if let Some(handle) = tree_clone.reserve_task() {
744                            let mut guard = handles.lock().unwrap();
745                            guard.push(handle);
746                            break;
747                        } else {
748                            yield_now();
749                        }
750                    }
751                }
752            }));
753        }
754
755        for join in join_handles {
756            join.join().expect("thread panicked");
757        }
758
759        // Verify all reservations are unique (no duplicates)
760        let guard = handles.lock().unwrap();
761        let mut unique = HashSet::new();
762        for &(leaf, signal, bit) in guard.iter() {
763            assert!(
764                unique.insert((leaf, signal, bit)),
765                "duplicate handle detected"
766            );
767        }
768        assert_eq!(guard.len(), threads * reservations_per_thread);
769
770        // Cleanup
771        for &(leaf, signal, bit) in guard.iter() {
772            tree.release_task_in_leaf(leaf, signal, bit as usize);
773        }
774    }
775
776    /// Test that reservation and release properly update the reservation bitmap
777    /// Validates atomic visibility of changes
778    #[test]
779    fn reserve_and_release_task_updates_reservations() {
780        let (tree, _wakers, _worker_count) = setup_tree(4, 1);
781        
782        // Reserve a task
783        let handle = tree.reserve_task().expect("task handle");
784        assert_eq!(handle.1, 0); // signal idx
785
786        // Verify reservation bitmap was updated
787        let reservation = tree
788            .task_reservations
789            .get(handle.0 * 1 + handle.1)
790            .unwrap()
791            .load(Ordering::Relaxed);
792        assert_ne!(reservation, 0);
793
794        // Release the task
795        tree.release_task_in_leaf(handle.0, handle.1, handle.2 as usize);
796        
797        // Verify reservation bitmap was cleared
798        let reservation = tree
799            .task_reservations
800            .get(handle.0 * 1 + handle.1)
801            .unwrap()
802            .load(Ordering::Relaxed);
803        assert_eq!(reservation, 0);
804    }
805
806    // ────────────────────────────────────────────────────────────────────────────
807    // ATOMIC BIT OPERATIONS TESTS
808    // ────────────────────────────────────────────────────────────────────────────
809
810    /// Test atomic bit operations with proper memory ordering semantics
811    /// Validates that fetch_or/fetch_and operations are truly atomic
812    #[test]
813    fn atomic_bit_operations_are_atomic() {
814        let (tree, _wakers, _worker_count) = setup_tree(2, 2);
815        let tree = Arc::new(tree);
816        let threads = 4;
817        let barrier = Arc::new(Barrier::new(threads));
818        
819        // Each thread will set different bits in the same leaf
820        let handles: Vec<_> = (0..threads)
821            .map(|i| {
822                let tree = Arc::clone(&tree);
823                let barrier = Arc::clone(&barrier);
824                thread::spawn(move || {
825                    barrier.wait();
826                    let signal_idx = i % 2;
827                    let mask = 1u64 << signal_idx;
828                    tree.mark_signal_active(0, signal_idx);
829                    
830                    // Verify the bit is set
831                    let leaf_word = tree.leaf_words[0].load(Ordering::Relaxed);
832                    assert_eq!(leaf_word & mask, mask, "Thread {}: bit {} should be set", i, signal_idx);
833                })
834            })
835            .collect();
836
837        for handle in handles {
838            handle.join().unwrap();
839        }
840
841        // All bits should be set
842        let final_word = tree.leaf_words[0].load(Ordering::Relaxed);
843        assert_eq!(final_word, 0b11);
844    }
845
846    /// Test that mark_signal_inactive properly clears only specified bits
847    #[test]
848    fn clear_leaf_bits_clears_specific_bits() {
849        let (tree, _wakers, _worker_count) = setup_tree(1, 3);
850        
851        // Set multiple bits
852        tree.mark_signal_active(0, 0);
853        tree.mark_signal_active(0, 1);
854        tree.mark_signal_active(0, 2);
855        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0b111);
856        
857        // Clear middle bit
858        assert!(tree.mark_signal_inactive(0, 1));
859        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0b101);
860        
861        // Clear first bit
862        assert!(tree.mark_signal_inactive(0, 0));
863        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0b100);
864        
865        // Clear last bit - should return true (leaf became empty)
866        assert!(tree.mark_signal_inactive(0, 2));
867        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
868    }
869
870    /// Test idempotency of bit operations
871    #[test]
872    fn bit_operations_are_idempotent() {
873        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
874        
875        // First activation should return true
876        assert!(tree.mark_signal_active(0, 0));
877        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
878        
879        // Duplicate activation should return false
880        assert!(!tree.mark_signal_active(0, 0));
881        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
882        
883        // First deactivation should return true
884        assert!(tree.mark_signal_inactive(0, 0));
885        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
886        
887        // Duplicate deactivation should return false
888        assert!(!tree.mark_signal_inactive(0, 0));
889        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
890    }
891
892    // ────────────────────────────────────────────────────────────────────────────
893    // TASK RESERVATION SYSTEM TESTS
894    // ────────────────────────────────────────────────────────────────────────────
895
896    /// Test CAS loop correctness under concurrent access
897    /// Validates that the compare_exchange loop properly handles contention
898    #[test]
899    fn cas_loop_handles_concurrent_contention() {
900        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
901        let tree = Arc::new(tree);
902        let threads = 8;
903        let barrier = Arc::new(Barrier::new(threads));
904        let reservations = Arc::new(Mutex::new(Vec::new()));
905        
906        let handles: Vec<_> = (0..threads)
907            .map(|_| {
908                let tree = Arc::clone(&tree);
909                let barrier = Arc::clone(&barrier);
910                let reservations = Arc::clone(&reservations);
911                thread::spawn(move || {
912                    barrier.wait();
913                    // Each thread tries to reserve multiple times
914                    for _ in 0..4 {
915                        if let Some(handle) = tree.reserve_task_in_leaf(0, 0) {
916                            let mut guard = reservations.lock().unwrap();
917                            guard.push(handle);
918                        }
919                    }
920                })
921            })
922            .collect();
923
924        for handle in handles {
925            handle.join().unwrap();
926        }
927
928        let guard = reservations.lock().unwrap();
929        assert_eq!(guard.len(), 32); // 8 threads * 4 reservations each
930        
931        // All reservations should be unique
932        let mut unique = HashSet::new();
933        for &bit in guard.iter() {
934            assert!(unique.insert(bit), "Duplicate reservation detected: {:?}", bit);
935        }
936        
937        // Cleanup
938        for &bit in guard.iter() {
939            tree.release_task_in_leaf(0, 0, bit as usize);
940        }
941    }
942
943    /// Test reservation bitmap exhaustion and wraparound
944    #[test]
945    fn reservation_exhaustion_and_wraparound() {
946        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
947        
948        // Exhaust all 64 bits
949        let mut reservations = Vec::new();
950        for _ in 0..64 {
951            let bit = tree.reserve_task_in_leaf(0, 0).expect("Should get reservation");
952            reservations.push(bit);
953        }
954        
955        // Should be exhausted
956        assert!(tree.reserve_task_in_leaf(0, 0).is_none());
957        
958        // Release half the bits (at even indices: 0, 2, 4, ..., 62)
959        let released_bits: Vec<u8> = (0..32).map(|i| reservations[i * 2]).collect();
960        let still_reserved: HashSet<u8> = (0..32).map(|i| reservations[i * 2 + 1]).collect();
961        
962        for &bit in &released_bits {
963            tree.release_task_in_leaf(0, 0, bit as usize);
964        }
965        
966        // Should be able to reserve again - should get the released bits
967        let new_reservations: Vec<_> = (0..32)
968            .map(|_| tree.reserve_task_in_leaf(0, 0).expect("Should get reservation"))
969            .collect();
970        
971        // All new reservations should be unique
972        let mut seen = HashSet::new();
973        for &new_bit in &new_reservations {
974            assert!(seen.insert(new_bit), "Got duplicate reservation: {}", new_bit);
975        }
976        
977        // All new reservations should be from the released bits, not the still-reserved ones
978        for &new_bit in &new_reservations {
979            assert!(!still_reserved.contains(&new_bit), "Got reservation for still-reserved bit: {}", new_bit);
980        }
981        
982        // Cleanup
983        for &bit in &reservations {
984            tree.release_task_in_leaf(0, 0, bit as usize);
985        }
986        for &bit in &new_reservations {
987            tree.release_task_in_leaf(0, 0, bit as usize);
988        }
989    }
990
991    /// Test signal distribution across a leaf
992    #[test]
993    fn round_robin_signal_distribution() {
994        let (tree, _wakers, _worker_count) = setup_tree(1, 4);
995        let mut observed_signals = Vec::new();
996        
997        // Reserve tasks and track signal distribution
998        // With bit-ops selection, signals are chosen by trailing_zeros (lowest free bit)
999        for _ in 0..8 {
1000            let (leaf, signal, _) = tree.reserve_task().expect("reserve task");
1001            observed_signals.push(signal);
1002            tree.release_task_in_leaf(leaf, signal, 0); // Release immediately
1003        }
1004        
1005        // Should have visited all signals (though not necessarily round-robin)
1006        observed_signals.sort_unstable();
1007        let unique_signals: HashSet<_> = observed_signals.iter().collect();
1008        assert!(unique_signals.len() >= 1, "Should use at least 1 signal");
1009    }
1010
1011    // ────────────────────────────────────────────────────────────────────────────
1012    // PARTITION MANAGEMENT TESTS
1013    // ────────────────────────────────────────────────────────────────────────────
1014
1015    /// Test partition owner computation with various worker counts
1016    #[test]
1017    fn compute_partition_owner_distribution() {
1018        let (tree, _wakers, _worker_count) = setup_tree(10, 1);
1019        
1020        // Test with 3 workers
1021        for leaf_idx in 0..10 {
1022            let owner = tree.compute_partition_owner(leaf_idx, 3);
1023            assert!(owner < 3, "Owner {} should be < 3", owner);
1024        }
1025        
1026        // Test with 1 worker (all leaves belong to worker 0)
1027        for leaf_idx in 0..10 {
1028            let owner = tree.compute_partition_owner(leaf_idx, 1);
1029            assert_eq!(owner, 0, "All leaves should belong to worker 0");
1030        }
1031        
1032        // Test with equal workers and leaves (perfect distribution)
1033        // Create a tree with 5 leaves for this test
1034        let (tree5, _, _) = setup_tree(5, 1);
1035        for leaf_idx in 0..5 {
1036            let owner = tree5.compute_partition_owner(leaf_idx, 5);
1037            assert_eq!(owner, leaf_idx, "Each leaf should have unique owner");
1038        }
1039    }
1040
1041    /// Test partition boundaries are correct
1042    #[test]
1043    fn partition_boundaries_are_correct() {
1044        let (tree, _wakers, _worker_count) = setup_tree(7, 1); // 7 leaves, 3 workers
1045        
1046        // Worker 0: leaves 0, 1, 2 (3 leaves) - gets extra leaf
1047        assert_eq!(tree.partition_start_for_worker(0, 3), 0);
1048        assert_eq!(tree.partition_end_for_worker(0, 3), 3);
1049        
1050        // Worker 1: leaves 3, 4 (2 leaves)  
1051        assert_eq!(tree.partition_start_for_worker(1, 3), 3);
1052        assert_eq!(tree.partition_end_for_worker(1, 3), 5);
1053        
1054        // Worker 2: leaves 5, 6 (2 leaves)
1055        assert_eq!(tree.partition_start_for_worker(2, 3), 5);
1056        assert_eq!(tree.partition_end_for_worker(2, 3), 7);
1057    }
1058
1059    /// Test global to local leaf index conversion
1060    #[test]
1061    fn global_to_local_leaf_conversion() {
1062        let (tree, _wakers, _worker_count) = setup_tree(6, 2); // 6 leaves, 2 workers
1063        
1064        // Worker 0: leaves 0, 1, 2
1065        assert_eq!(tree.global_to_local_leaf_idx(0, 0, 2), Some(0));
1066        assert_eq!(tree.global_to_local_leaf_idx(1, 0, 2), Some(1));
1067        assert_eq!(tree.global_to_local_leaf_idx(2, 0, 2), Some(2));
1068        assert_eq!(tree.global_to_local_leaf_idx(3, 0, 2), None); // Not in partition
1069        
1070        // Worker 1: leaves 3, 4, 5
1071        assert_eq!(tree.global_to_local_leaf_idx(3, 1, 2), Some(0));
1072        assert_eq!(tree.global_to_local_leaf_idx(4, 1, 2), Some(1));
1073        assert_eq!(tree.global_to_local_leaf_idx(5, 1, 2), Some(2));
1074        assert_eq!(tree.global_to_local_leaf_idx(2, 1, 2), None); // Not in partition
1075    }
1076
1077    /// Test partition computation with edge cases
1078    #[test]
1079    fn partition_computation_edge_cases() {
1080        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
1081        
1082        // Single leaf, single worker
1083        assert_eq!(tree.compute_partition_owner(0, 1), 0);
1084        assert_eq!(tree.partition_start_for_worker(0, 1), 0);
1085        assert_eq!(tree.partition_end_for_worker(0, 1), 1);
1086        assert_eq!(tree.global_to_local_leaf_idx(0, 0, 1), Some(0));
1087        
1088        // Zero workers
1089        assert_eq!(tree.compute_partition_owner(0, 0), 0);
1090        assert_eq!(tree.partition_start_for_worker(0, 0), 0);
1091        assert_eq!(tree.partition_end_for_worker(0, 0), 0);
1092    }
1093
1094    // ────────────────────────────────────────────────────────────────────────────
1095    // MEMORY ORDERING TESTS
1096    // ────────────────────────────────────────────────────────────────────────────
1097
1098    /// Test that Acquire ordering ensures visibility of prior writes
1099    #[test]
1100    fn acquire_ordering_ensures_visibility() {
1101        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
1102        let tree = Arc::new(tree);
1103        let tree_ = Arc::clone(&tree);
1104        let flag = Arc::new(AtomicU64::new(0));
1105        let flag_ = Arc::clone(&flag);
1106        
1107        let handle = thread::spawn(move || {
1108            // Set the flag with Release ordering
1109            flag_.store(1, Ordering::Release);
1110            
1111            // Mark signal active (should be visible after flag is set)
1112            tree_.mark_signal_active(0, 0);
1113        });
1114        
1115        handle.join().unwrap();
1116        
1117        // Read with Acquire ordering (should see the flag)
1118        let observed_flag = flag.load(Ordering::Acquire);
1119        assert_eq!(observed_flag, 1);
1120        
1121        // Signal should be active
1122        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
1123    }
1124
1125    /// Test that Relaxed ordering doesn't provide synchronization
1126    #[test]
1127    fn relaxed_ordering_no_synchronization() {
1128        let (tree, _wakers, _worker_count) = setup_tree(1, 1);
1129        let tree = Arc::new(tree);
1130        let tree_ = Arc::clone(&tree);
1131        let data = Arc::new(AtomicU64::new(0));
1132        let data_ = Arc::clone(&data);
1133        let flag = Arc::new(AtomicU64::new(0));
1134        let flag_ = Arc::clone(&flag);
1135        
1136        let handle = thread::spawn(move || {
1137            // Writer thread
1138            data_.store(42, Ordering::Relaxed);
1139            flag_.store(1, Ordering::Release);
1140            tree_.mark_signal_active(0, 0);
1141        });
1142        
1143        handle.join().unwrap();
1144        
1145        // Reader might see flag but not data due to relaxed ordering
1146        let observed_flag = flag.load(Ordering::Acquire);
1147        let observed_data = data.load(Ordering::Relaxed);
1148        
1149        // Flag should be visible
1150        assert_eq!(observed_flag, 1);
1151        
1152        // Data might not be visible (this test documents the behavior)
1153        // In practice, the signal activation provides some ordering
1154        assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
1155    }
1156
1157    // ────────────────────────────────────────────────────────────────────────────
1158    // EDGE CASES AND ERROR HANDLING
1159    // ────────────────────────────────────────────────────────────────────────────
1160
1161    /// Test boundary conditions for indices
1162    #[test]
1163    fn boundary_conditions_for_indices() {
1164        let (tree, _wakers, _worker_count) = setup_tree(2, 3);
1165        
1166        // Valid indices should work
1167        assert!(tree.mark_signal_active(0, 0));
1168        assert!(tree.mark_signal_active(1, 2));
1169        assert!(tree.reserve_task_in_leaf(0, 0).is_some());
1170        assert!(tree.reserve_task_in_leaf(1, 2).is_some());
1171        
1172        // Invalid leaf index should return None/false
1173        assert!(!tree.mark_signal_active(2, 0)); // leaf_idx >= leaf_count
1174        assert!(!tree.mark_signal_inactive(2, 0));
1175        assert!(tree.reserve_task_in_leaf(2, 0).is_none());
1176        
1177        // Invalid signal index should return None/false
1178        assert!(!tree.mark_signal_active(0, 3)); // signal_idx >= signals_per_leaf
1179        assert!(!tree.mark_signal_inactive(0, 3));
1180        assert!(tree.reserve_task_in_leaf(0, 3).is_none());
1181    }
1182
1183    /// Test zero signals per leaf
1184    #[test]
1185    fn zero_signals_per_leaf_handling() {
1186        // This should panic during creation
1187        let result = std::panic::catch_unwind(|| {
1188            let wakers: Vec<Arc<WorkerWaker>> = (0..2).map(|_| Arc::new(WorkerWaker::new())).collect();
1189            let worker_count = Arc::new(AtomicUsize::new(2));
1190            Summary::new(2, 0, &wakers, &worker_count);
1191        });
1192        
1193        assert!(result.is_err());
1194    }
1195
1196    /// Test empty wakers array
1197    #[test]
1198    fn empty_wakers_array_handling() {
1199        // This should panic during creation
1200        let result = std::panic::catch_unwind(|| {
1201            let wakers: Vec<Arc<WorkerWaker>> = Vec::new();
1202            let worker_count = Arc::new(AtomicUsize::new(0));
1203            Summary::new(2, 2, &wakers, &worker_count);
1204        });
1205        
1206        assert!(result.is_err());
1207    }
1208
1209    /// Test release_task with various configurations
1210    #[test]
1211    fn reserve_task_various_configurations() {
1212        // Empty tree - should panic during creation, not during reserve_task
1213        let result = std::panic::catch_unwind(|| {
1214            let wakers: Vec<Arc<WorkerWaker>> = (0..2).map(|_| Arc::new(WorkerWaker::new())).collect();
1215            let worker_count = Arc::new(AtomicUsize::new(2));
1216            Summary::new(0, 1, &wakers, &worker_count)
1217        });
1218        assert!(result.is_err(), "Empty tree should panic during creation");
1219        
1220        // Tree with zero signals per leaf
1221        let result = std::panic::catch_unwind(|| {
1222            let wakers: Vec<Arc<WorkerWaker>> = (0..2).map(|_| Arc::new(WorkerWaker::new())).collect();
1223            let worker_count = Arc::new(AtomicUsize::new(2));
1224            Summary::new(2, 0, &wakers, &worker_count)
1225        });
1226        assert!(result.is_err());
1227    }
1228
1229    // ────────────────────────────────────────────────────────────────────────────
1230    // CONCURRENCY STRESS TESTS
1231    // ────────────────────────────────────────────────────────────────────────────
1232
1233    /// High contention stress test for signal activation/deactivation
1234    #[test]
1235    fn stress_test_signal_activation_deactivation() {
1236        let (tree, _wakers, _worker_count) = setup_tree(4, 2);
1237        let tree = Arc::new(tree);
1238        let threads = 12;
1239        let iterations = 100;
1240        let barrier = Arc::new(Barrier::new(threads));
1241        
1242        let handles: Vec<_> = (0..threads)
1243            .map(|thread_id| {
1244                let tree = Arc::clone(&tree);
1245                let barrier = Arc::clone(&barrier);
1246                thread::spawn(move || {
1247                    barrier.wait();
1248                    for _ in 0..iterations {
1249                        let leaf_idx = thread_id % 4;
1250                        let signal_idx = thread_id % 2;
1251                        
1252                        // Randomly activate or deactivate
1253                        if thread_id % 2 == 0 {
1254                            tree.mark_signal_active(leaf_idx, signal_idx);
1255                        } else {
1256                            tree.mark_signal_inactive(leaf_idx, signal_idx);
1257                        }
1258                    }
1259                })
1260            })
1261            .collect();
1262
1263        for handle in handles {
1264            handle.join().unwrap();
1265        }
1266
1267        // Tree should still be in valid state
1268        for i in 0..4 {
1269            let word = tree.leaf_words[i].load(Ordering::Relaxed);
1270            assert!(word <= 0b11, "Leaf {} has invalid word: {}", i, word);
1271        }
1272    }
1273
1274    /// Stress test for partition owner computation with changing worker count
1275    #[test]
1276    fn stress_test_partition_owner_with_changing_workers() {
1277        let (tree, _wakers, worker_count) = setup_tree(20, 1);
1278        let tree = Arc::new(tree);
1279        
1280        let handles: Vec<_> = (0..8)
1281            .map(|i| {
1282                let tree = Arc::clone(&tree);
1283                let worker_count = Arc::clone(&worker_count);
1284                thread::spawn(move || {
1285                    // Each thread uses different worker counts
1286                    let test_counts = [1, 2, 4, 5, 10];
1287                    for &count in &test_counts {
1288                        worker_count.store(count, Ordering::Relaxed);
1289                        
1290                        // Compute partition owners for all leaves
1291                        for leaf_idx in 0..20 {
1292                            let owner = tree.compute_partition_owner(leaf_idx, count);
1293                            assert!(owner < count, "Invalid owner {} for count {}", owner, count);
1294                        }
1295                    }
1296                })
1297            })
1298            .collect();
1299
1300        for handle in handles {
1301            handle.join().unwrap();
1302        }
1303    }
1304
1305    /// Test for task reservation system to verify no duplicate reservations by checking the bitmap
1306    #[test]
1307    fn stress_test_task_reservation_system() {
1308        let (tree, _wakers, _worker_count) = setup_tree(4, 4); // Increase size to reduce contention
1309        let tree = Arc::new(tree);
1310        let threads = 16; // Reduce threads to reduce contention
1311        let iterations = 200; // Reduce iterations
1312        let barrier = Arc::new(Barrier::new(threads));
1313        
1314        let handles: Vec<_> = (0..threads)
1315            .map(|thread_id| {
1316                let tree = Arc::clone(&tree);
1317                let barrier = Arc::clone(&barrier);
1318                thread::spawn(move || {
1319                    barrier.wait();
1320                    for i in 0..iterations {
1321                        // Try to reserve - may fail if all slots are taken
1322                        if let Some((leaf, signal, bit)) = tree.reserve_task() {
1323                            // Verify the bit is actually set in the reservation bitmap
1324                            let reservation_word = tree.reservation_word(leaf, signal);
1325                            let current = reservation_word.load(Ordering::SeqCst);
1326                            let mask = 1u64 << bit;
1327                            
1328                            // The bit should be set
1329                            assert!(current & mask != 0, 
1330                                   "Thread {} iteration {}: Bit {} not set in reservation bitmap for ({}, {})", 
1331                                   thread_id, i, bit, leaf, signal);
1332                            
1333                            // Hold for a bit to create contention
1334                            thread::sleep(Duration::from_micros(100));
1335                            
1336                            // Release the reservation
1337                            tree.release_task_in_leaf(leaf, signal, bit as usize);
1338                            
1339                            // Note: We don't verify the bit is cleared here because under high contention,
1340                            // another thread may immediately re-reserve the same bit. The final check at
1341                            // the end of the test verifies all bits are eventually released.
1342                        }
1343                    }
1344                })
1345            })
1346            .collect();
1347
1348        // Wait for all threads to complete
1349        for handle in handles {
1350            handle.join().unwrap();
1351        }
1352        
1353        // Verify all reservation bitmaps are empty
1354        for leaf in 0..tree.leaf_count {
1355            for signal in 0..tree.signals_per_leaf {
1356                let reservation_word = tree.reservation_word(leaf, signal);
1357                let current = reservation_word.load(Ordering::SeqCst);
1358                assert_eq!(current, 0, 
1359                          "Reservation bitmap not empty for leaf {}, signal {}: {:b}", 
1360                          leaf, signal, current);
1361            }
1362        }
1363        
1364        println!("Successfully completed {} iterations with {} threads", iterations, threads);
1365    }
1366
1367    /// Test TOCTOU race condition mitigation in notification system
1368    #[test]
1369    fn test_toctou_race_mitigation() {
1370        let (tree, wakers_, worker_count) = setup_tree(4, 2);
1371        let tree = Arc::new(tree);
1372        let tree_ = Arc::clone(&tree);
1373        
1374        // Start with 2 workers
1375        worker_count.store(2, Ordering::Relaxed);
1376        
1377        let handle = thread::spawn(move || {
1378            let tree = tree_;
1379            // Simulate rapid worker count changes
1380            for count in [1, 3, 0, 2, 4] {
1381                worker_count.store(count, Ordering::Relaxed);
1382                
1383                // Try to trigger notifications - should not panic
1384                for leaf_idx in 0..4 {
1385                    // These calls should handle the changing worker count safely
1386                    tree.notify_partition_owner_active(leaf_idx);
1387                    tree.notify_partition_owner_inactive(leaf_idx);
1388                }
1389                
1390                thread::yield_now();
1391            }
1392        });
1393        
1394        handle.join().unwrap();
1395        
1396        // Tree should still be functional
1397        assert!(tree.mark_signal_active(0, 0));
1398        assert!(tree.reserve_task().is_some());
1399    }
1400}