maniac_runtime/runtime/summary.rs
1use super::waker::WorkerWaker;
2use crate::utils::bits;
3use crate::utils::CachePadded;
4use std::marker::PhantomData;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Condvar, Mutex};
7use std::time::Duration;
8
9const TASK_SLOTS_PER_SIGNAL: usize = u64::BITS as usize;
10
11/// Single-level summary tree for task work-stealing.
12///
13/// This tree tracks ONLY task signals - no yield or worker state.
14/// Each leaf represents a set of task signal words.
15///
16/// The Summary coordinates with SignalWakers to notify partition owners
17/// when their assigned leafs become active/inactive.
18///
19/// # Thread Safety
20/// This struct is designed for high-concurrency scenarios with the following guarantees:
21/// - All operations use atomic primitives for lock-free operation
22/// - Raw pointers are used for performance but are guaranteed safe by WorkerService lifetime
23/// - Implements `Send` and `Sync` for cross-thread usage
24///
25/// # Memory Layout
26/// - `leaf_words`: Atomic bitfields tracking which signals have pending tasks
27/// - `task_reservations`: Atomic bitfields for task slot reservations within each signal
28/// - Round-robin cursors distribute load evenly across leaves and signals
29/// - Partition mapping enables work-stealing between worker threads
30pub struct Summary {
31 // Owned heap allocations
32 pub(crate) leaf_words: Box<[AtomicU64]>, // Pub for Worker access
33 task_reservations: Box<[AtomicU64]>,
34
35 // Configuration
36 leaf_count: usize,
37 signals_per_leaf: usize,
38 leaf_summary_mask: u64,
39
40 // Round-robin cursors for allocation
41 // CachePadded to prevent false sharing between CPU cores
42 next_partition: CachePadded<AtomicUsize>,
43
44 // Partition owner notification
45 // Raw pointer to WorkerService.wakers array (lifetime guaranteed by WorkerService ownership)
46 wakers: *const Arc<WorkerWaker>,
47 wakers_len: usize,
48 // Shared reference to worker_count - keeps it alive
49 worker_count: Arc<AtomicUsize>,
50
51 // Provenance tracking for raw pointers
52 _marker: PhantomData<&'static WorkerWaker>,
53}
54
55unsafe impl Send for Summary {}
56unsafe impl Sync for Summary {}
57
58impl Summary {
59 /// Creates a new Summary with the specified dimensions.
60 ///
61 /// # Arguments
62 /// * `leaf_count` - Number of leaf nodes (typically matches worker partition count)
63 /// * `signals_per_leaf` - Number of task signal words per leaf (typically tasks_per_leaf / 64)
64 /// * `wakers` - Slice of SignalWakers for partition owner notification
65 /// * `worker_count` - Reference to WorkerService's worker_count atomic (single source of truth)
66 ///
67 /// # Safety
68 /// The wakers slice and worker_count reference must remain valid for the lifetime of this Summary.
69 /// This is guaranteed when Summary is owned by WorkerService which also owns the wakers and worker_count.
70 ///
71 /// # Memory Allocation
72 /// Allocates `leaf_count * signals_per_leaf * 8` bytes for reservations plus overhead for leaf words and cursors.
73 pub fn new(
74 leaf_count: usize,
75 signals_per_leaf: usize,
76 wakers: &[Arc<WorkerWaker>],
77 worker_count: &Arc<AtomicUsize>,
78 ) -> Self {
79 assert!(leaf_count > 0, "leaf_count must be > 0");
80 assert!(signals_per_leaf > 0, "signals_per_leaf must be > 0");
81 assert!(signals_per_leaf <= 64, "signals_per_leaf must be <= 64");
82 assert!(!wakers.is_empty(), "wakers must not be empty");
83
84 let task_word_count = leaf_count * signals_per_leaf;
85
86 // Initialize leaf words (all signals initially inactive)
87 let leaf_words = (0..leaf_count)
88 .map(|_| AtomicU64::new(0))
89 .collect::<Vec<_>>()
90 .into_boxed_slice();
91
92 // Initialize reservation bitmaps (all slots initially free)
93 let task_reservations = (0..task_word_count)
94 .map(|_| AtomicU64::new(0))
95 .collect::<Vec<_>>()
96 .into_boxed_slice();
97
98 // Create mask for valid signal bits in each leaf
99 let leaf_summary_mask = if signals_per_leaf >= 64 {
100 u64::MAX
101 } else {
102 (1u64 << signals_per_leaf) - 1
103 };
104
105 Self {
106 leaf_words,
107 task_reservations,
108 leaf_count,
109 signals_per_leaf,
110 leaf_summary_mask,
111 next_partition: CachePadded::new(AtomicUsize::new(0)),
112 wakers: wakers.as_ptr(),
113 wakers_len: wakers.len(),
114 worker_count: Arc::clone(worker_count),
115 _marker: PhantomData,
116 }
117 }
118
119 /// Get the current worker count from WorkerService.
120 /// Reads directly from the single source of truth.
121 ///
122 /// # Atomic Semantics
123 /// Uses `Relaxed` ordering since this is only used for informational purposes
124 /// and doesn't require synchronization with other memory operations.
125 #[inline]
126 pub fn get_worker_count(&self) -> usize {
127 self.worker_count.load(Ordering::Relaxed)
128 }
129
130 #[inline(always)]
131 fn leaf_word(&self, idx: usize) -> &AtomicU64 {
132 &self.leaf_words[idx]
133 }
134
135 #[inline(always)]
136 fn reservation_index(&self, leaf_idx: usize, signal_idx: usize) -> usize {
137 leaf_idx * self.signals_per_leaf + signal_idx
138 }
139
140 #[inline(always)]
141 fn reservation_word(&self, leaf_idx: usize, signal_idx: usize) -> &AtomicU64 {
142 &self.task_reservations[self.reservation_index(leaf_idx, signal_idx)]
143 }
144
145
146 #[inline(always)]
147 fn try_reserve_in_leaf(&self, leaf_idx: usize) -> Option<(usize, usize, u8)> {
148 if self.signals_per_leaf == 0 {
149 return None;
150 }
151 let mut signal_mask = self.leaf_summary_mask;
152 while signal_mask != 0 {
153 let signal_idx = signal_mask.trailing_zeros() as usize;
154 if signal_idx >= self.signals_per_leaf {
155 break;
156 }
157 if let Some(bit) = self.reserve_task_in_leaf(leaf_idx, signal_idx) {
158 return Some((leaf_idx, signal_idx, bit));
159 }
160 signal_mask &= signal_mask - 1;
161 }
162 None
163 }
164
165 /// Notify the partition owner's SignalWaker that a leaf in their partition became active.
166 ///
167 /// # Atomic Semantics
168 /// This function reads the worker count with Acquire ordering to ensure visibility
169 /// of prior worker service state changes. It includes validation to handle the case
170 /// where worker count changes between loading and using it (TOCTOU race).
171 ///
172 /// # Race Condition Mitigation
173 /// - Validates worker_count against wakers_len to prevent out-of-bounds access
174 /// - Validates owner_id against current worker_count to handle worker shutdown
175 /// - These checks ensure safe operation even if worker configuration changes
176 #[inline(always)]
177 fn notify_partition_owner_active(&self, leaf_idx: usize) {
178 let worker_count = self.worker_count.load(Ordering::Acquire);
179 // Validate worker count to prevent out-of-bounds access
180 if worker_count == 0 || worker_count > self.wakers_len {
181 return;
182 }
183
184 let owner_id = self.compute_partition_owner(leaf_idx, worker_count);
185 // Additional validation: owner_id must be within current worker count
186 // This handles the case where worker count decreases after we loaded it
187 if owner_id >= worker_count {
188 return;
189 }
190
191 if owner_id < self.wakers_len {
192 // SAFETY: wakers pointer is valid for the lifetime of Summary
193 // because WorkerService owns both
194 let waker = unsafe { &*self.wakers.add(owner_id) };
195
196 // Compute local leaf index within owner's partition
197 if let Some(local_idx) = self.global_to_local_leaf_idx(leaf_idx, owner_id, worker_count)
198 {
199 waker.mark_partition_leaf_active(local_idx);
200 }
201 }
202 }
203
204 /// Notify the partition owner's SignalWaker that a leaf in their partition became inactive.
205 ///
206 /// # Atomic Semantics
207 /// This function reads the worker count with Acquire ordering to ensure visibility
208 /// of prior worker service state changes. It includes validation to handle the case
209 /// where worker count changes between loading and using it (TOCTOU race).
210 ///
211 /// # Race Condition Mitigation
212 /// - Validates worker_count against wakers_len to prevent out-of-bounds access
213 /// - Validates owner_id against current worker_count to handle worker shutdown
214 /// - These checks ensure safe operation even if worker configuration changes
215 #[inline(always)]
216 fn notify_partition_owner_inactive(&self, leaf_idx: usize) {
217 let worker_count = self.worker_count.load(Ordering::Acquire);
218 // Validate worker count to prevent out-of-bounds access
219 if worker_count == 0 || worker_count > self.wakers_len {
220 return;
221 }
222
223 let owner_id = self.compute_partition_owner(leaf_idx, worker_count);
224 // Additional validation: owner_id must be within current worker count
225 // This handles the case where worker count decreases after we loaded it
226 if owner_id >= worker_count {
227 return;
228 }
229
230 if owner_id < self.wakers_len {
231 // SAFETY: wakers pointer is valid for the lifetime of Summary
232 // because WorkerService owns both
233 let waker = unsafe { &*self.wakers.add(owner_id) };
234
235 // Compute local leaf index within owner's partition
236 if let Some(local_idx) = self.global_to_local_leaf_idx(leaf_idx, owner_id, worker_count)
237 {
238 waker.clear_partition_leaf(local_idx);
239 }
240 }
241 }
242
243 #[inline(always)]
244 fn mark_leaf_bits(&self, leaf_idx: usize, mask: u64) -> bool {
245 if mask == 0 {
246 return false;
247 }
248 let leaf = self.leaf_word(leaf_idx);
249 // Atomic Read-Modify-Write: fetch_or returns previous value and sets new bits
250 // Using AcqRel ordering to both see previous changes and make new changes visible
251 let prev = leaf.fetch_or(mask, Ordering::AcqRel);
252
253 let was_empty = prev & self.leaf_summary_mask == 0;
254 // Check if we actually added new bits (not just setting already-set bits)
255 let any_new_bits = (prev & mask) != mask;
256
257 // Notify partition owner if any new signal bits were set
258 // Note: There's a small race window here where bits could be cleared
259 // before notification, but this is acceptable for performance
260 if any_new_bits {
261 self.notify_partition_owner_active(leaf_idx);
262 }
263
264 was_empty
265 }
266
267 #[inline(always)]
268 fn clear_leaf_bits(&self, leaf_idx: usize, mask: u64) -> bool {
269 if mask == 0 {
270 return false;
271 }
272 let leaf = self.leaf_word(leaf_idx);
273 // Atomic Read-Modify-Write: fetch_and returns previous value and clears bits
274 // Using AcqRel ordering to both see previous changes and make new changes visible
275 let prev = leaf.fetch_and(!mask, Ordering::AcqRel);
276 if prev & mask == 0 {
277 return false; // Bits were already cleared
278 }
279 // Bits were successfully cleared - return true
280 // Also notify partition owner if this leaf is now completely empty
281 // The check uses the atomic snapshot `prev` to avoid races
282 if (prev & !mask) & self.leaf_summary_mask == 0 {
283 self.notify_partition_owner_inactive(leaf_idx);
284 }
285 true
286 }
287
288 /// Sets the summary bit for a task signal.
289 ///
290 /// # Returns
291 /// `true` if the leaf was empty before setting this signal (useful for work-stealing decisions)
292 ///
293 /// # Atomic Semantics
294 /// Uses `fetch_or` with `AcqRel` ordering to atomically set bits and get previous state.
295 /// Notifies the partition owner if new bits were added.
296 pub fn mark_signal_active(&self, leaf_idx: usize, signal_idx: usize) -> bool {
297 if leaf_idx >= self.leaf_count || signal_idx >= self.signals_per_leaf {
298 return false;
299 }
300 debug_assert!(signal_idx < self.signals_per_leaf);
301 let mask = 1u64 << signal_idx;
302 self.mark_leaf_bits(leaf_idx, mask)
303 }
304
305 /// Clears the summary bit for a task signal.
306 ///
307 /// # Returns
308 /// `true` if bits were successfully cleared, `false` if indices are invalid or bits were already cleared
309 ///
310 /// # Atomic Semantics
311 /// Uses `fetch_and` with `AcqRel` ordering to atomically clear bits and get previous state.
312 /// Notifies the partition owner if the leaf became empty.
313 pub fn mark_signal_inactive(&self, leaf_idx: usize, signal_idx: usize) -> bool {
314 if leaf_idx >= self.leaf_count || signal_idx >= self.signals_per_leaf {
315 return false;
316 }
317 debug_assert!(signal_idx < self.signals_per_leaf);
318 let mask = 1u64 << signal_idx;
319 self.clear_leaf_bits(leaf_idx, mask)
320 }
321
322 /// Attempts to reserve a task slot within (`leaf_idx`, `signal_idx`).
323 ///
324 /// # Returns
325 /// The bit index (0-63) of the reserved slot, or `None` if all slots are taken
326 ///
327 /// # Atomic Semantics
328 /// Implements a lock-free reservation system using atomic CAS (Compare-And-Swap) loops.
329 /// - Uses `Acquire` ordering for loads to see completed reservations
330 /// - Uses `AcqRel` for successful CAS to make reservation visible to others
331 /// - Uses `Acquire` for failed CAS to see the updated state
332 ///
333 /// # Algorithm
334 /// 1. Load current reservation bitmap
335 /// 2. Use a per-signal round-robin cursor to pick a starting bit
336 /// 3. Rotate the bitmap so trailing_zeros finds the next free slot after the cursor
337 /// 4. Attempt to atomically set that bit with CAS
338 /// 5. If CAS fails (another thread reserved it), retry with updated value
339 /// 6. Continue until success or no free bits remain
340 pub fn reserve_task_in_leaf(&self, leaf_idx: usize, signal_idx: usize) -> Option<u8> {
341 if leaf_idx >= self.leaf_count || signal_idx >= self.signals_per_leaf {
342 return None;
343 }
344 let reservations = self.reservation_word(leaf_idx, signal_idx);
345 let mut current = reservations.load(Ordering::Acquire);
346 loop {
347 let free = !current;
348 if free == 0 {
349 return None; // All bits reserved
350 }
351 let bit = free.trailing_zeros() as u8;
352 let mask = 1u64 << bit;
353 match reservations.compare_exchange(
354 current,
355 current | mask,
356 Ordering::AcqRel,
357 Ordering::Acquire,
358 ) {
359 Ok(_) => return Some(bit),
360 Err(updated) => current = updated,
361 }
362 }
363 }
364
365 /// Clears a previously reserved task slot.
366 ///
367 /// # Atomic Semantics
368 /// Uses `fetch_and` with `SeqCst` ordering to atomically clear the reservation bit.
369 /// This ensures the release is visible to other threads attempting reservations.
370 pub fn release_task_in_leaf(&self, leaf_idx: usize, signal_idx: usize, bit: usize) {
371 if leaf_idx >= self.leaf_count
372 || signal_idx >= self.signals_per_leaf
373 || bit >= TASK_SLOTS_PER_SIGNAL
374 {
375 return;
376 }
377 let mask = !(1u64 << bit);
378 self.reservation_word(leaf_idx, signal_idx)
379 .fetch_and(mask, Ordering::AcqRel);
380 }
381
382 /// Convenience function: reserve the first available task slot across the arena.
383 ///
384 /// # Atomic Semantics
385 /// Uses round-robin cursors with `SeqCst` ordering to ensure proper synchronization
386 /// between threads when selecting partitions, leaves, and signals. The actual reservation
387 /// uses the CAS loop in `reserve_task_in_leaf` which provides proper synchronization.
388 pub fn reserve_task(&self) -> Option<(usize, usize, u8)> {
389 if self.leaf_count == 0 {
390 return None;
391 }
392 if self.signals_per_leaf == 0 {
393 return None;
394 }
395
396 // Exhaustively scan partitions one by one. Each partition represents a worker's slice of
397 // leaves, so rotating by partition keeps contention localized while still guaranteeing we
398 // eventually visit every leaf if the partition has no free slots.
399 let worker_count = self.get_worker_count();
400 let partition_count = worker_count.max(1);
401 let start_partition =
402 self.next_partition.fetch_add(1, Ordering::SeqCst) % partition_count;
403
404 for partition_offset in 0..partition_count {
405 let worker_id = (start_partition + partition_offset) % partition_count;
406 let partition_start = self.partition_start_for_worker(worker_id, partition_count);
407 let partition_end = self.partition_end_for_worker(worker_id, partition_count);
408 if partition_end <= partition_start {
409 continue; // Empty partition (more workers than leaves)
410 }
411
412 let partition_len = partition_end - partition_start;
413 if partition_len == 1 {
414 if let Some(found) = self.try_reserve_in_leaf(partition_start) {
415 return Some(found);
416 }
417 continue;
418 }
419
420 // Use a simple random starting point within the partition to distribute load
421 let random_seed = crate::utils::random_u64() as usize;
422 let start_leaf_offset = random_seed % partition_len;
423
424 for leaf_offset in 0..partition_len {
425 let leaf_idx =
426 partition_start + (start_leaf_offset + leaf_offset) % partition_len;
427 if let Some(found) = self.try_reserve_in_leaf(leaf_idx) {
428 return Some(found);
429 }
430 }
431 }
432 None
433 }
434
435 /// Clears the summary bit when the corresponding task signal becomes empty.
436 ///
437 /// # ⚠️ CORRECTNESS ISSUE - TOCTOU Race Condition
438 ///
439 /// This function has an unfixed race condition between checking and clearing:
440 ///
441 /// **Race scenario:**
442 /// 1. Thread A: `signal.load()` sees `0`
443 /// 2. Thread B: Enqueues task, `signal` becomes `1`, calls `mark_signal_active()`
444 /// 3. Thread A: Calls `mark_signal_inactive()`, clearing the bit Thread B just set
445 /// 4. Result: `signal` has tasks but summary bit is cleared → **lost work notification**
446 ///
447 /// **Why this is problematic:**
448 /// - Work-stealing threads rely on summary bits to find available work
449 /// - Clearing the bit while tasks exist makes those tasks invisible to stealers
450 /// - The task enqueue won't re-set the bit because it already set it in step 2
451 ///
452 /// **Proper fix requires one of:**
453 /// 1. Caller-side synchronization ensuring signal cannot be modified during this call
454 /// 2. API change: pass mutable/exclusive access to signal to perform atomic check-and-clear
455 /// 3. Accepting false negatives: allow summary bit to remain set even when signal is empty
456 /// (wastes stealer cycles but is always safe)
457 ///
458 /// **Current mitigation:** None - callers must ensure signal stability externally.
459 ///
460 /// # Atomic Semantics
461 /// Uses `Acquire` ordering to ensure visibility of all prior writes to the signal.
462 pub fn mark_signal_inactive_if_empty(
463 &self,
464 leaf_idx: usize,
465 signal_idx: usize,
466 signal: &AtomicU64,
467 ) {
468 // WARNING: This check-then-act pattern is inherently racy
469 // See documentation above for details
470 if signal.load(Ordering::Acquire) == 0 {
471 self.mark_signal_inactive(leaf_idx, signal_idx);
472 }
473 }
474
475 #[inline(always)]
476 pub fn leaf_count(&self) -> usize {
477 self.leaf_count
478 }
479
480 #[inline(always)]
481 pub fn signals_per_leaf(&self) -> usize {
482 self.signals_per_leaf
483 }
484
485 // ────────────────────────────────────────────────────────────────────────────
486 // PARTITION MANAGEMENT HELPERS
487 // ────────────────────────────────────────────────────────────────────────────
488 // These functions handle the mapping between global leaf indices and worker partitions.
489 // They implement a load-balancing algorithm that distributes leaves evenly across workers.
490
491 /// Compute which worker owns a given leaf based on partition assignments.
492 ///
493 /// This is the inverse of `Worker::compute_partition()`. Given a leaf index,
494 /// it determines which worker is responsible for processing tasks in that leaf.
495 ///
496 /// # Partition Algorithm
497 /// Uses a balanced distribution where:
498 /// - First `leaf_count % worker_count` workers get `(leaf_count / worker_count) + 1` leaves
499 /// - Remaining workers get `leaf_count / worker_count` leaves
500 /// This ensures leaves are distributed as evenly as possible.
501 ///
502 /// # Arguments
503 ///
504 /// * `leaf_idx` - The global leaf index (0..leaf_count)
505 /// * `worker_count` - Total number of active workers
506 ///
507 /// # Returns
508 ///
509 /// Worker ID (0..worker_count) that owns this leaf
510 ///
511 /// # Example
512 ///
513 /// ```ignore
514 /// let owner_id = summary_tree.compute_partition_owner(leaf_idx, worker_count);
515 /// let owner_waker = &service.wakers[owner_id];
516 /// owner_waker.mark_partition_leaf_active(local_idx);
517 /// ```
518 pub fn compute_partition_owner(&self, leaf_idx: usize, worker_count: usize) -> usize {
519 if worker_count == 0 {
520 return 0;
521 }
522
523 let base = self.leaf_count / worker_count;
524 let extra = self.leaf_count % worker_count;
525
526 // First 'extra' workers get (base + 1) leafs each
527 let boundary = extra * (base + 1);
528
529 if leaf_idx < boundary {
530 leaf_idx / (base + 1)
531 } else {
532 extra + (leaf_idx - boundary) / base
533 }
534 }
535
536 /// Compute the partition start index for a given worker.
537 ///
538 /// # Arguments
539 ///
540 /// * `worker_id` - Worker ID (0..worker_count)
541 /// * `worker_count` - Total number of active workers
542 ///
543 /// # Returns
544 ///
545 /// First leaf index in this worker's partition
546 pub fn partition_start_for_worker(&self, worker_id: usize, worker_count: usize) -> usize {
547 if worker_count == 0 {
548 return 0;
549 }
550
551 let base = self.leaf_count / worker_count;
552 let extra = self.leaf_count % worker_count;
553
554 if worker_id < extra {
555 worker_id * (base + 1)
556 } else {
557 extra * (base + 1) + (worker_id - extra) * base
558 }
559 }
560
561 /// Compute the partition end index for a given worker.
562 ///
563 /// # Arguments
564 ///
565 /// * `worker_id` - Worker ID (0..worker_count)
566 /// * `worker_count` - Total number of active workers
567 ///
568 /// # Returns
569 ///
570 /// One past the last leaf index in this worker's partition (exclusive)
571 pub fn partition_end_for_worker(&self, worker_id: usize, worker_count: usize) -> usize {
572 if worker_count == 0 {
573 return 0;
574 }
575
576 let start = self.partition_start_for_worker(worker_id, worker_count);
577 let base = self.leaf_count / worker_count;
578 let extra = self.leaf_count % worker_count;
579
580 let len = if worker_id < extra { base + 1 } else { base };
581
582 (start + len).min(self.leaf_count)
583 }
584
585 /// Convert a global leaf index to a local index within a worker's partition.
586 ///
587 /// # Arguments
588 ///
589 /// * `leaf_idx` - Global leaf index
590 /// * `worker_id` - Worker ID
591 /// * `worker_count` - Total number of workers
592 ///
593 /// # Returns
594 ///
595 /// Local leaf index (0..partition_size) for use with SignalWaker partition bitmap,
596 /// or None if the leaf is not in this worker's partition
597 pub fn global_to_local_leaf_idx(
598 &self,
599 leaf_idx: usize,
600 worker_id: usize,
601 worker_count: usize,
602 ) -> Option<usize> {
603 let partition_start = self.partition_start_for_worker(worker_id, worker_count);
604 let partition_end = self.partition_end_for_worker(worker_id, worker_count);
605
606 if leaf_idx >= partition_start && leaf_idx < partition_end {
607 Some(leaf_idx - partition_start)
608 } else {
609 None
610 }
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use crate::runtime::waker::WorkerWaker;
617
618 use super::*;
619 use std::collections::HashSet;
620 use std::sync::{Arc, Barrier, Mutex};
621 use std::thread;
622 use std::thread::yield_now;
623 use std::time::{Duration, Instant};
624
625 /// Helper function to create a test Summary with dummy wakers
626 fn setup_tree(leaf_count: usize, signals_per_leaf: usize) -> (Summary, Vec<Arc<WorkerWaker>>, Arc<AtomicUsize>) {
627 // Create dummy wakers for testing
628 // SAFETY: The returned Arc<AtomicUsize> must outlive the Summary instance
629 // to prevent dangling pointer access via Summary.worker_count
630 let wakers: Vec<Arc<WorkerWaker>> = (0..4).map(|_| Arc::new(WorkerWaker::new())).collect();
631 let worker_count = Arc::new(AtomicUsize::new(4));
632 let tree = Summary::new(leaf_count, signals_per_leaf, &wakers, &worker_count);
633 (tree, wakers, worker_count)
634 }
635
636 /// Test that marking a signal active updates the leaf word correctly
637 /// and that duplicate marking returns false (idempotent behavior)
638 #[test]
639 fn mark_signal_active_updates_root_and_leaf() {
640 let (tree, _wakers, _worker_count) = setup_tree(4, 4);
641
642 // First activation should return true (leaf was empty)
643 assert!(tree.mark_signal_active(1, 1));
644 assert_eq!(tree.leaf_words[1].load(Ordering::Relaxed), 1u64 << 1);
645
646 // Duplicate activation should return false (already active)
647 assert!(!tree.mark_signal_active(1, 1));
648
649 // Clearing should work and return true (leaf became empty)
650 assert!(tree.mark_signal_inactive(1, 1));
651 assert_eq!(tree.leaf_words[1].load(Ordering::Relaxed), 0);
652 }
653
654 /// Test the conditional clearing functionality when signal is empty
655 /// This tests the race-prone mark_signal_inactive_if_empty function
656 #[test]
657 fn mark_signal_inactive_if_empty_clears_summary() {
658 let (tree, _wakers, _worker_count) = setup_tree(1, 2);
659
660 // Activate a signal
661 assert!(tree.mark_signal_active(0, 1));
662
663 // Create an empty signal for testing
664 let signal = AtomicU64::new(0);
665
666 // Should clear the summary bit since signal is empty
667 tree.mark_signal_inactive_if_empty(0, 1, &signal);
668 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
669 }
670
671 /// Test that task reservation exhausts all 64 bits correctly
672 /// Validates the CAS loop implementation and bit manipulation
673 #[test]
674 fn reserve_task_in_leaf_exhausts_all_bits() {
675 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
676 let mut bits = Vec::with_capacity(64);
677
678 // Reserve all 64 bits
679 for _ in 0..64 {
680 let bit = tree.reserve_task_in_leaf(0, 0).expect("expected free bit");
681 bits.push(bit);
682 }
683
684 // Verify we got all unique bits 0-63
685 bits.sort_unstable();
686 assert_eq!(bits, (0..64).collect::<Vec<_>>());
687
688 // Should be exhausted now
689 assert!(
690 tree.reserve_task_in_leaf(0, 0).is_none(),
691 "all bits should be exhausted"
692 );
693
694 // Release all bits for cleanup
695 for bit in bits {
696 tree.release_task_in_leaf(0, 0, bit as usize);
697 }
698 }
699
700 /// Test distribution across leaves
701 /// Validates that reserve_task can visit all leaves
702 #[test]
703 fn reserve_task_round_robin_visits_all_leaves() {
704 let (tree, _wakers, _worker_count) = setup_tree(4, 1);
705 let mut observed = Vec::with_capacity(16);
706
707 // Reserve multiple tasks to ensure we visit different leaves
708 // With random starting points, we should eventually hit all leaves
709 for _ in 0..16 {
710 let (leaf, sig, bit) = tree.reserve_task().expect("reserve task");
711 observed.push(leaf);
712 tree.release_task_in_leaf(leaf, sig, bit as usize);
713 }
714
715 // Should have visited all 4 leaves at least once
716 observed.sort_unstable();
717 observed.dedup();
718 assert_eq!(observed.len(), 4, "Should visit all 4 leaves");
719 }
720
721 /// Stress test: concurrent reservations must be unique
722 /// Tests atomicity of CAS loop under high contention
723 #[test]
724 fn concurrent_reservations_are_unique() {
725 let (tree, _wakers, _worker_count) = setup_tree(4, 1);
726 let tree = Arc::new(tree);
727 let threads = 8;
728 let reservations_per_thread = 8;
729 let barrier = Arc::new(Barrier::new(threads));
730 let handles = Arc::new(Mutex::new(Vec::with_capacity(
731 threads * reservations_per_thread,
732 )));
733
734 let mut join_handles = Vec::with_capacity(threads);
735 for _ in 0..threads {
736 let tree_clone = Arc::clone(&tree);
737 let barrier = Arc::clone(&barrier);
738 let handles = Arc::clone(&handles);
739 join_handles.push(thread::spawn(move || {
740 barrier.wait();
741 for _ in 0..reservations_per_thread {
742 loop {
743 if let Some(handle) = tree_clone.reserve_task() {
744 let mut guard = handles.lock().unwrap();
745 guard.push(handle);
746 break;
747 } else {
748 yield_now();
749 }
750 }
751 }
752 }));
753 }
754
755 for join in join_handles {
756 join.join().expect("thread panicked");
757 }
758
759 // Verify all reservations are unique (no duplicates)
760 let guard = handles.lock().unwrap();
761 let mut unique = HashSet::new();
762 for &(leaf, signal, bit) in guard.iter() {
763 assert!(
764 unique.insert((leaf, signal, bit)),
765 "duplicate handle detected"
766 );
767 }
768 assert_eq!(guard.len(), threads * reservations_per_thread);
769
770 // Cleanup
771 for &(leaf, signal, bit) in guard.iter() {
772 tree.release_task_in_leaf(leaf, signal, bit as usize);
773 }
774 }
775
776 /// Test that reservation and release properly update the reservation bitmap
777 /// Validates atomic visibility of changes
778 #[test]
779 fn reserve_and_release_task_updates_reservations() {
780 let (tree, _wakers, _worker_count) = setup_tree(4, 1);
781
782 // Reserve a task
783 let handle = tree.reserve_task().expect("task handle");
784 assert_eq!(handle.1, 0); // signal idx
785
786 // Verify reservation bitmap was updated
787 let reservation = tree
788 .task_reservations
789 .get(handle.0 * 1 + handle.1)
790 .unwrap()
791 .load(Ordering::Relaxed);
792 assert_ne!(reservation, 0);
793
794 // Release the task
795 tree.release_task_in_leaf(handle.0, handle.1, handle.2 as usize);
796
797 // Verify reservation bitmap was cleared
798 let reservation = tree
799 .task_reservations
800 .get(handle.0 * 1 + handle.1)
801 .unwrap()
802 .load(Ordering::Relaxed);
803 assert_eq!(reservation, 0);
804 }
805
806 // ────────────────────────────────────────────────────────────────────────────
807 // ATOMIC BIT OPERATIONS TESTS
808 // ────────────────────────────────────────────────────────────────────────────
809
810 /// Test atomic bit operations with proper memory ordering semantics
811 /// Validates that fetch_or/fetch_and operations are truly atomic
812 #[test]
813 fn atomic_bit_operations_are_atomic() {
814 let (tree, _wakers, _worker_count) = setup_tree(2, 2);
815 let tree = Arc::new(tree);
816 let threads = 4;
817 let barrier = Arc::new(Barrier::new(threads));
818
819 // Each thread will set different bits in the same leaf
820 let handles: Vec<_> = (0..threads)
821 .map(|i| {
822 let tree = Arc::clone(&tree);
823 let barrier = Arc::clone(&barrier);
824 thread::spawn(move || {
825 barrier.wait();
826 let signal_idx = i % 2;
827 let mask = 1u64 << signal_idx;
828 tree.mark_signal_active(0, signal_idx);
829
830 // Verify the bit is set
831 let leaf_word = tree.leaf_words[0].load(Ordering::Relaxed);
832 assert_eq!(leaf_word & mask, mask, "Thread {}: bit {} should be set", i, signal_idx);
833 })
834 })
835 .collect();
836
837 for handle in handles {
838 handle.join().unwrap();
839 }
840
841 // All bits should be set
842 let final_word = tree.leaf_words[0].load(Ordering::Relaxed);
843 assert_eq!(final_word, 0b11);
844 }
845
846 /// Test that mark_signal_inactive properly clears only specified bits
847 #[test]
848 fn clear_leaf_bits_clears_specific_bits() {
849 let (tree, _wakers, _worker_count) = setup_tree(1, 3);
850
851 // Set multiple bits
852 tree.mark_signal_active(0, 0);
853 tree.mark_signal_active(0, 1);
854 tree.mark_signal_active(0, 2);
855 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0b111);
856
857 // Clear middle bit
858 assert!(tree.mark_signal_inactive(0, 1));
859 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0b101);
860
861 // Clear first bit
862 assert!(tree.mark_signal_inactive(0, 0));
863 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0b100);
864
865 // Clear last bit - should return true (leaf became empty)
866 assert!(tree.mark_signal_inactive(0, 2));
867 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
868 }
869
870 /// Test idempotency of bit operations
871 #[test]
872 fn bit_operations_are_idempotent() {
873 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
874
875 // First activation should return true
876 assert!(tree.mark_signal_active(0, 0));
877 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
878
879 // Duplicate activation should return false
880 assert!(!tree.mark_signal_active(0, 0));
881 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
882
883 // First deactivation should return true
884 assert!(tree.mark_signal_inactive(0, 0));
885 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
886
887 // Duplicate deactivation should return false
888 assert!(!tree.mark_signal_inactive(0, 0));
889 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 0);
890 }
891
892 // ────────────────────────────────────────────────────────────────────────────
893 // TASK RESERVATION SYSTEM TESTS
894 // ────────────────────────────────────────────────────────────────────────────
895
896 /// Test CAS loop correctness under concurrent access
897 /// Validates that the compare_exchange loop properly handles contention
898 #[test]
899 fn cas_loop_handles_concurrent_contention() {
900 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
901 let tree = Arc::new(tree);
902 let threads = 8;
903 let barrier = Arc::new(Barrier::new(threads));
904 let reservations = Arc::new(Mutex::new(Vec::new()));
905
906 let handles: Vec<_> = (0..threads)
907 .map(|_| {
908 let tree = Arc::clone(&tree);
909 let barrier = Arc::clone(&barrier);
910 let reservations = Arc::clone(&reservations);
911 thread::spawn(move || {
912 barrier.wait();
913 // Each thread tries to reserve multiple times
914 for _ in 0..4 {
915 if let Some(handle) = tree.reserve_task_in_leaf(0, 0) {
916 let mut guard = reservations.lock().unwrap();
917 guard.push(handle);
918 }
919 }
920 })
921 })
922 .collect();
923
924 for handle in handles {
925 handle.join().unwrap();
926 }
927
928 let guard = reservations.lock().unwrap();
929 assert_eq!(guard.len(), 32); // 8 threads * 4 reservations each
930
931 // All reservations should be unique
932 let mut unique = HashSet::new();
933 for &bit in guard.iter() {
934 assert!(unique.insert(bit), "Duplicate reservation detected: {:?}", bit);
935 }
936
937 // Cleanup
938 for &bit in guard.iter() {
939 tree.release_task_in_leaf(0, 0, bit as usize);
940 }
941 }
942
943 /// Test reservation bitmap exhaustion and wraparound
944 #[test]
945 fn reservation_exhaustion_and_wraparound() {
946 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
947
948 // Exhaust all 64 bits
949 let mut reservations = Vec::new();
950 for _ in 0..64 {
951 let bit = tree.reserve_task_in_leaf(0, 0).expect("Should get reservation");
952 reservations.push(bit);
953 }
954
955 // Should be exhausted
956 assert!(tree.reserve_task_in_leaf(0, 0).is_none());
957
958 // Release half the bits (at even indices: 0, 2, 4, ..., 62)
959 let released_bits: Vec<u8> = (0..32).map(|i| reservations[i * 2]).collect();
960 let still_reserved: HashSet<u8> = (0..32).map(|i| reservations[i * 2 + 1]).collect();
961
962 for &bit in &released_bits {
963 tree.release_task_in_leaf(0, 0, bit as usize);
964 }
965
966 // Should be able to reserve again - should get the released bits
967 let new_reservations: Vec<_> = (0..32)
968 .map(|_| tree.reserve_task_in_leaf(0, 0).expect("Should get reservation"))
969 .collect();
970
971 // All new reservations should be unique
972 let mut seen = HashSet::new();
973 for &new_bit in &new_reservations {
974 assert!(seen.insert(new_bit), "Got duplicate reservation: {}", new_bit);
975 }
976
977 // All new reservations should be from the released bits, not the still-reserved ones
978 for &new_bit in &new_reservations {
979 assert!(!still_reserved.contains(&new_bit), "Got reservation for still-reserved bit: {}", new_bit);
980 }
981
982 // Cleanup
983 for &bit in &reservations {
984 tree.release_task_in_leaf(0, 0, bit as usize);
985 }
986 for &bit in &new_reservations {
987 tree.release_task_in_leaf(0, 0, bit as usize);
988 }
989 }
990
991 /// Test signal distribution across a leaf
992 #[test]
993 fn round_robin_signal_distribution() {
994 let (tree, _wakers, _worker_count) = setup_tree(1, 4);
995 let mut observed_signals = Vec::new();
996
997 // Reserve tasks and track signal distribution
998 // With bit-ops selection, signals are chosen by trailing_zeros (lowest free bit)
999 for _ in 0..8 {
1000 let (leaf, signal, _) = tree.reserve_task().expect("reserve task");
1001 observed_signals.push(signal);
1002 tree.release_task_in_leaf(leaf, signal, 0); // Release immediately
1003 }
1004
1005 // Should have visited all signals (though not necessarily round-robin)
1006 observed_signals.sort_unstable();
1007 let unique_signals: HashSet<_> = observed_signals.iter().collect();
1008 assert!(unique_signals.len() >= 1, "Should use at least 1 signal");
1009 }
1010
1011 // ────────────────────────────────────────────────────────────────────────────
1012 // PARTITION MANAGEMENT TESTS
1013 // ────────────────────────────────────────────────────────────────────────────
1014
1015 /// Test partition owner computation with various worker counts
1016 #[test]
1017 fn compute_partition_owner_distribution() {
1018 let (tree, _wakers, _worker_count) = setup_tree(10, 1);
1019
1020 // Test with 3 workers
1021 for leaf_idx in 0..10 {
1022 let owner = tree.compute_partition_owner(leaf_idx, 3);
1023 assert!(owner < 3, "Owner {} should be < 3", owner);
1024 }
1025
1026 // Test with 1 worker (all leaves belong to worker 0)
1027 for leaf_idx in 0..10 {
1028 let owner = tree.compute_partition_owner(leaf_idx, 1);
1029 assert_eq!(owner, 0, "All leaves should belong to worker 0");
1030 }
1031
1032 // Test with equal workers and leaves (perfect distribution)
1033 // Create a tree with 5 leaves for this test
1034 let (tree5, _, _) = setup_tree(5, 1);
1035 for leaf_idx in 0..5 {
1036 let owner = tree5.compute_partition_owner(leaf_idx, 5);
1037 assert_eq!(owner, leaf_idx, "Each leaf should have unique owner");
1038 }
1039 }
1040
1041 /// Test partition boundaries are correct
1042 #[test]
1043 fn partition_boundaries_are_correct() {
1044 let (tree, _wakers, _worker_count) = setup_tree(7, 1); // 7 leaves, 3 workers
1045
1046 // Worker 0: leaves 0, 1, 2 (3 leaves) - gets extra leaf
1047 assert_eq!(tree.partition_start_for_worker(0, 3), 0);
1048 assert_eq!(tree.partition_end_for_worker(0, 3), 3);
1049
1050 // Worker 1: leaves 3, 4 (2 leaves)
1051 assert_eq!(tree.partition_start_for_worker(1, 3), 3);
1052 assert_eq!(tree.partition_end_for_worker(1, 3), 5);
1053
1054 // Worker 2: leaves 5, 6 (2 leaves)
1055 assert_eq!(tree.partition_start_for_worker(2, 3), 5);
1056 assert_eq!(tree.partition_end_for_worker(2, 3), 7);
1057 }
1058
1059 /// Test global to local leaf index conversion
1060 #[test]
1061 fn global_to_local_leaf_conversion() {
1062 let (tree, _wakers, _worker_count) = setup_tree(6, 2); // 6 leaves, 2 workers
1063
1064 // Worker 0: leaves 0, 1, 2
1065 assert_eq!(tree.global_to_local_leaf_idx(0, 0, 2), Some(0));
1066 assert_eq!(tree.global_to_local_leaf_idx(1, 0, 2), Some(1));
1067 assert_eq!(tree.global_to_local_leaf_idx(2, 0, 2), Some(2));
1068 assert_eq!(tree.global_to_local_leaf_idx(3, 0, 2), None); // Not in partition
1069
1070 // Worker 1: leaves 3, 4, 5
1071 assert_eq!(tree.global_to_local_leaf_idx(3, 1, 2), Some(0));
1072 assert_eq!(tree.global_to_local_leaf_idx(4, 1, 2), Some(1));
1073 assert_eq!(tree.global_to_local_leaf_idx(5, 1, 2), Some(2));
1074 assert_eq!(tree.global_to_local_leaf_idx(2, 1, 2), None); // Not in partition
1075 }
1076
1077 /// Test partition computation with edge cases
1078 #[test]
1079 fn partition_computation_edge_cases() {
1080 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
1081
1082 // Single leaf, single worker
1083 assert_eq!(tree.compute_partition_owner(0, 1), 0);
1084 assert_eq!(tree.partition_start_for_worker(0, 1), 0);
1085 assert_eq!(tree.partition_end_for_worker(0, 1), 1);
1086 assert_eq!(tree.global_to_local_leaf_idx(0, 0, 1), Some(0));
1087
1088 // Zero workers
1089 assert_eq!(tree.compute_partition_owner(0, 0), 0);
1090 assert_eq!(tree.partition_start_for_worker(0, 0), 0);
1091 assert_eq!(tree.partition_end_for_worker(0, 0), 0);
1092 }
1093
1094 // ────────────────────────────────────────────────────────────────────────────
1095 // MEMORY ORDERING TESTS
1096 // ────────────────────────────────────────────────────────────────────────────
1097
1098 /// Test that Acquire ordering ensures visibility of prior writes
1099 #[test]
1100 fn acquire_ordering_ensures_visibility() {
1101 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
1102 let tree = Arc::new(tree);
1103 let tree_ = Arc::clone(&tree);
1104 let flag = Arc::new(AtomicU64::new(0));
1105 let flag_ = Arc::clone(&flag);
1106
1107 let handle = thread::spawn(move || {
1108 // Set the flag with Release ordering
1109 flag_.store(1, Ordering::Release);
1110
1111 // Mark signal active (should be visible after flag is set)
1112 tree_.mark_signal_active(0, 0);
1113 });
1114
1115 handle.join().unwrap();
1116
1117 // Read with Acquire ordering (should see the flag)
1118 let observed_flag = flag.load(Ordering::Acquire);
1119 assert_eq!(observed_flag, 1);
1120
1121 // Signal should be active
1122 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
1123 }
1124
1125 /// Test that Relaxed ordering doesn't provide synchronization
1126 #[test]
1127 fn relaxed_ordering_no_synchronization() {
1128 let (tree, _wakers, _worker_count) = setup_tree(1, 1);
1129 let tree = Arc::new(tree);
1130 let tree_ = Arc::clone(&tree);
1131 let data = Arc::new(AtomicU64::new(0));
1132 let data_ = Arc::clone(&data);
1133 let flag = Arc::new(AtomicU64::new(0));
1134 let flag_ = Arc::clone(&flag);
1135
1136 let handle = thread::spawn(move || {
1137 // Writer thread
1138 data_.store(42, Ordering::Relaxed);
1139 flag_.store(1, Ordering::Release);
1140 tree_.mark_signal_active(0, 0);
1141 });
1142
1143 handle.join().unwrap();
1144
1145 // Reader might see flag but not data due to relaxed ordering
1146 let observed_flag = flag.load(Ordering::Acquire);
1147 let observed_data = data.load(Ordering::Relaxed);
1148
1149 // Flag should be visible
1150 assert_eq!(observed_flag, 1);
1151
1152 // Data might not be visible (this test documents the behavior)
1153 // In practice, the signal activation provides some ordering
1154 assert_eq!(tree.leaf_words[0].load(Ordering::Relaxed), 1);
1155 }
1156
1157 // ────────────────────────────────────────────────────────────────────────────
1158 // EDGE CASES AND ERROR HANDLING
1159 // ────────────────────────────────────────────────────────────────────────────
1160
1161 /// Test boundary conditions for indices
1162 #[test]
1163 fn boundary_conditions_for_indices() {
1164 let (tree, _wakers, _worker_count) = setup_tree(2, 3);
1165
1166 // Valid indices should work
1167 assert!(tree.mark_signal_active(0, 0));
1168 assert!(tree.mark_signal_active(1, 2));
1169 assert!(tree.reserve_task_in_leaf(0, 0).is_some());
1170 assert!(tree.reserve_task_in_leaf(1, 2).is_some());
1171
1172 // Invalid leaf index should return None/false
1173 assert!(!tree.mark_signal_active(2, 0)); // leaf_idx >= leaf_count
1174 assert!(!tree.mark_signal_inactive(2, 0));
1175 assert!(tree.reserve_task_in_leaf(2, 0).is_none());
1176
1177 // Invalid signal index should return None/false
1178 assert!(!tree.mark_signal_active(0, 3)); // signal_idx >= signals_per_leaf
1179 assert!(!tree.mark_signal_inactive(0, 3));
1180 assert!(tree.reserve_task_in_leaf(0, 3).is_none());
1181 }
1182
1183 /// Test zero signals per leaf
1184 #[test]
1185 fn zero_signals_per_leaf_handling() {
1186 // This should panic during creation
1187 let result = std::panic::catch_unwind(|| {
1188 let wakers: Vec<Arc<WorkerWaker>> = (0..2).map(|_| Arc::new(WorkerWaker::new())).collect();
1189 let worker_count = Arc::new(AtomicUsize::new(2));
1190 Summary::new(2, 0, &wakers, &worker_count);
1191 });
1192
1193 assert!(result.is_err());
1194 }
1195
1196 /// Test empty wakers array
1197 #[test]
1198 fn empty_wakers_array_handling() {
1199 // This should panic during creation
1200 let result = std::panic::catch_unwind(|| {
1201 let wakers: Vec<Arc<WorkerWaker>> = Vec::new();
1202 let worker_count = Arc::new(AtomicUsize::new(0));
1203 Summary::new(2, 2, &wakers, &worker_count);
1204 });
1205
1206 assert!(result.is_err());
1207 }
1208
1209 /// Test release_task with various configurations
1210 #[test]
1211 fn reserve_task_various_configurations() {
1212 // Empty tree - should panic during creation, not during reserve_task
1213 let result = std::panic::catch_unwind(|| {
1214 let wakers: Vec<Arc<WorkerWaker>> = (0..2).map(|_| Arc::new(WorkerWaker::new())).collect();
1215 let worker_count = Arc::new(AtomicUsize::new(2));
1216 Summary::new(0, 1, &wakers, &worker_count)
1217 });
1218 assert!(result.is_err(), "Empty tree should panic during creation");
1219
1220 // Tree with zero signals per leaf
1221 let result = std::panic::catch_unwind(|| {
1222 let wakers: Vec<Arc<WorkerWaker>> = (0..2).map(|_| Arc::new(WorkerWaker::new())).collect();
1223 let worker_count = Arc::new(AtomicUsize::new(2));
1224 Summary::new(2, 0, &wakers, &worker_count)
1225 });
1226 assert!(result.is_err());
1227 }
1228
1229 // ────────────────────────────────────────────────────────────────────────────
1230 // CONCURRENCY STRESS TESTS
1231 // ────────────────────────────────────────────────────────────────────────────
1232
1233 /// High contention stress test for signal activation/deactivation
1234 #[test]
1235 fn stress_test_signal_activation_deactivation() {
1236 let (tree, _wakers, _worker_count) = setup_tree(4, 2);
1237 let tree = Arc::new(tree);
1238 let threads = 12;
1239 let iterations = 100;
1240 let barrier = Arc::new(Barrier::new(threads));
1241
1242 let handles: Vec<_> = (0..threads)
1243 .map(|thread_id| {
1244 let tree = Arc::clone(&tree);
1245 let barrier = Arc::clone(&barrier);
1246 thread::spawn(move || {
1247 barrier.wait();
1248 for _ in 0..iterations {
1249 let leaf_idx = thread_id % 4;
1250 let signal_idx = thread_id % 2;
1251
1252 // Randomly activate or deactivate
1253 if thread_id % 2 == 0 {
1254 tree.mark_signal_active(leaf_idx, signal_idx);
1255 } else {
1256 tree.mark_signal_inactive(leaf_idx, signal_idx);
1257 }
1258 }
1259 })
1260 })
1261 .collect();
1262
1263 for handle in handles {
1264 handle.join().unwrap();
1265 }
1266
1267 // Tree should still be in valid state
1268 for i in 0..4 {
1269 let word = tree.leaf_words[i].load(Ordering::Relaxed);
1270 assert!(word <= 0b11, "Leaf {} has invalid word: {}", i, word);
1271 }
1272 }
1273
1274 /// Stress test for partition owner computation with changing worker count
1275 #[test]
1276 fn stress_test_partition_owner_with_changing_workers() {
1277 let (tree, _wakers, worker_count) = setup_tree(20, 1);
1278 let tree = Arc::new(tree);
1279
1280 let handles: Vec<_> = (0..8)
1281 .map(|i| {
1282 let tree = Arc::clone(&tree);
1283 let worker_count = Arc::clone(&worker_count);
1284 thread::spawn(move || {
1285 // Each thread uses different worker counts
1286 let test_counts = [1, 2, 4, 5, 10];
1287 for &count in &test_counts {
1288 worker_count.store(count, Ordering::Relaxed);
1289
1290 // Compute partition owners for all leaves
1291 for leaf_idx in 0..20 {
1292 let owner = tree.compute_partition_owner(leaf_idx, count);
1293 assert!(owner < count, "Invalid owner {} for count {}", owner, count);
1294 }
1295 }
1296 })
1297 })
1298 .collect();
1299
1300 for handle in handles {
1301 handle.join().unwrap();
1302 }
1303 }
1304
1305 /// Test for task reservation system to verify no duplicate reservations by checking the bitmap
1306 #[test]
1307 fn stress_test_task_reservation_system() {
1308 let (tree, _wakers, _worker_count) = setup_tree(4, 4); // Increase size to reduce contention
1309 let tree = Arc::new(tree);
1310 let threads = 16; // Reduce threads to reduce contention
1311 let iterations = 200; // Reduce iterations
1312 let barrier = Arc::new(Barrier::new(threads));
1313
1314 let handles: Vec<_> = (0..threads)
1315 .map(|thread_id| {
1316 let tree = Arc::clone(&tree);
1317 let barrier = Arc::clone(&barrier);
1318 thread::spawn(move || {
1319 barrier.wait();
1320 for i in 0..iterations {
1321 // Try to reserve - may fail if all slots are taken
1322 if let Some((leaf, signal, bit)) = tree.reserve_task() {
1323 // Verify the bit is actually set in the reservation bitmap
1324 let reservation_word = tree.reservation_word(leaf, signal);
1325 let current = reservation_word.load(Ordering::SeqCst);
1326 let mask = 1u64 << bit;
1327
1328 // The bit should be set
1329 assert!(current & mask != 0,
1330 "Thread {} iteration {}: Bit {} not set in reservation bitmap for ({}, {})",
1331 thread_id, i, bit, leaf, signal);
1332
1333 // Hold for a bit to create contention
1334 thread::sleep(Duration::from_micros(100));
1335
1336 // Release the reservation
1337 tree.release_task_in_leaf(leaf, signal, bit as usize);
1338
1339 // Note: We don't verify the bit is cleared here because under high contention,
1340 // another thread may immediately re-reserve the same bit. The final check at
1341 // the end of the test verifies all bits are eventually released.
1342 }
1343 }
1344 })
1345 })
1346 .collect();
1347
1348 // Wait for all threads to complete
1349 for handle in handles {
1350 handle.join().unwrap();
1351 }
1352
1353 // Verify all reservation bitmaps are empty
1354 for leaf in 0..tree.leaf_count {
1355 for signal in 0..tree.signals_per_leaf {
1356 let reservation_word = tree.reservation_word(leaf, signal);
1357 let current = reservation_word.load(Ordering::SeqCst);
1358 assert_eq!(current, 0,
1359 "Reservation bitmap not empty for leaf {}, signal {}: {:b}",
1360 leaf, signal, current);
1361 }
1362 }
1363
1364 println!("Successfully completed {} iterations with {} threads", iterations, threads);
1365 }
1366
1367 /// Test TOCTOU race condition mitigation in notification system
1368 #[test]
1369 fn test_toctou_race_mitigation() {
1370 let (tree, wakers_, worker_count) = setup_tree(4, 2);
1371 let tree = Arc::new(tree);
1372 let tree_ = Arc::clone(&tree);
1373
1374 // Start with 2 workers
1375 worker_count.store(2, Ordering::Relaxed);
1376
1377 let handle = thread::spawn(move || {
1378 let tree = tree_;
1379 // Simulate rapid worker count changes
1380 for count in [1, 3, 0, 2, 4] {
1381 worker_count.store(count, Ordering::Relaxed);
1382
1383 // Try to trigger notifications - should not panic
1384 for leaf_idx in 0..4 {
1385 // These calls should handle the changing worker count safely
1386 tree.notify_partition_owner_active(leaf_idx);
1387 tree.notify_partition_owner_inactive(leaf_idx);
1388 }
1389
1390 thread::yield_now();
1391 }
1392 });
1393
1394 handle.join().unwrap();
1395
1396 // Tree should still be functional
1397 assert!(tree.mark_signal_active(0, 0));
1398 assert!(tree.reserve_task().is_some());
1399 }
1400}