maniac_runtime/runtime/
waker.rs

1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
4
5use crate::utils::CachePadded;
6
7use crate::utils::bits::is_set;
8
9pub const STATUS_SUMMARY_BITS: u32 = 62;
10pub const STATUS_SUMMARY_MASK: u64 = (1u64 << STATUS_SUMMARY_BITS) - 1;
11pub const STATUS_SUMMARY_WORDS: usize = STATUS_SUMMARY_BITS as usize;
12pub const STATUS_BIT_PARTITION: u64 = 1u64 << 62;
13pub const STATUS_BIT_YIELD: u64 = 1u64 << 63;
14
15/// A cache-optimized waker that packs queue summaries and control flags into a single status word.
16///
17/// # Architecture
18///
19/// `WorkerWaker` implements a **two-level hierarchy** for efficient work discovery while
20/// keeping a single atomic source of truth for coordination data:
21///
22/// ```text
23/// Level 1: Status Word (64 bits)
24///          ┌─────────────────────────────────────┐
25///          │ Bits 0-61  │ Bit 62 │ Bit 63        │
26///          │ Queue map  │ Part.  │ Yield         │
27///          └─────────────────────────────────────┘
28///                 │           │
29///                 ▼           ▼
30/// Level 2: Signal Words (external AtomicU64s)
31///          Word 0  Word 1  ...  Word 61
32///          [64 q]  [64 q]  ...  [64 q]           Each bit = individual queue state
33///
34/// Total: 62 words × 64 bits = 3,968 queues
35/// ```
36///
37/// # Core Components
38///
39/// 1. **Status Bitmap** (`status`): Single u64 that stores queue-word summary bits
40///    (0‒61) plus control flags (partition/yield). Enables O(1) lookup without a
41///    second atomic.
42///
43/// 2. **Counting Semaphore** (`permits`): Tracks how many threads should be awake.
44///    Each queue transition from empty→non-empty adds exactly 1 permit, guaranteeing
45///    **no lost wakeups**.
46///
47/// 3. **Sleeper Tracking** (`sleepers`): Approximate count of parked threads.
48///    Used to throttle notifications (avoid waking more threads than necessary).
49///
50/// # Design Patterns
51///
52/// ## Cache Optimization
53/// - `CachePadded` on struct for cache-line alignment
54/// - `CachePadded` on hot atomics to prevent false sharing
55/// - Producer/consumer paths access different cache lines
56///
57/// ## Memory Ordering Strategy
58/// - **Status summary bits**: `Relaxed` - hint-based, false positives acceptable
59/// - **Permits**: `AcqRel/Release` - proper synchronization for wakeups
60/// - **Sleepers**: `Relaxed` - approximate count is sufficient
61///
62/// ## Lazy Cleanup
63/// Summary bits may remain set after queues empty (false positives).
64/// Consumers lazily clear bits via `try_unmark_if_empty()`. This trades
65/// occasional extra checks for lower overhead on the hot path.
66///
67/// # Guarantees
68///
69/// - **No lost wakeups**: Permits accumulate even if no threads are sleeping
70/// - **Bounded notifications**: Never wakes more than `sleepers` threads
71/// - **Lock-free fast path**: `try_acquire()` uses only atomics
72/// - **Summary consistency**: false positives OK, false negatives impossible
73///
74/// # Trade-offs
75///
76/// - **False positives**: Summary may indicate work when queues are empty (lazy cleanup)
77/// - **Approximate sleeper count**: May over-notify slightly (but safely)
78/// - **64-word limit**: Summary is single u64 (extensible if needed)
79///
80/// # Usage Example
81///
82/// ```ignore
83/// use std::sync::Arc;
84/// use maniac::WorkerWaker;
85///
86/// let waker = Arc::new(WorkerWaker::new());
87///
88/// // Producer: mark queue 5 in word 0 as active
89/// waker.mark_active(0);  // Adds 1 permit, wakes 1 sleeper
90///
91/// // Consumer: find work via summary
92/// let summary = waker.snapshot_summary();
93/// for word_idx in (0..64).filter(|i| summary & (1 << i) != 0) {
94///     // Process queues in word_idx
95/// }
96///
97/// // Consumer: block when no work
98/// waker.acquire();  // Waits for a permit
99/// ```
100///
101/// # Performance Characteristics
102///
103/// - **mark_active**: O(1) atomic, fast if already set
104/// - **mark_active_mask**: O(1) batch update for multiple queues
105/// - **try_acquire**: O(1) lock-free
106/// - **acquire**: O(1) amortized (blocks on contention)
107/// - **snapshot_summary**: O(1) single atomic load
108///
109/// # Thread Safety
110///
111/// All methods are thread-safe. Producers and consumers can operate concurrently
112/// without coordination beyond the internal atomics and mutex/condvar for blocking.
113#[repr(align(64))]
114pub struct WorkerWaker {
115    /// **Status bitmap**: Queue-word summary bits (0‒61) plus control flags.
116    ///
117    /// - Bits 0‒61: Queue-word hot bits (`mark_active`, `try_unmark_if_empty`, etc.)
118    /// - Bit 62 (`STATUS_BIT_PARTITION`): Partition cache says work is present
119    /// - Bit 63 (`STATUS_BIT_YIELD`): Worker should yield ASAP
120    ///
121    /// Keeping everything in one atomic avoids races between independent u64s.
122    status: CachePadded<AtomicU64>,
123
124    /// **Counting semaphore**: Number of threads that should be awake (available permits).
125    ///
126    /// Incremented by producers when queues become active (0→1 transitions).
127    /// Decremented by consumers via `try_acquire()` or `acquire()`.
128    ///
129    /// **Critical invariant**: Each queue empty→non-empty transition adds exactly
130    /// 1 permit, preventing lost wakeups. Permits accumulate if no threads are
131    /// sleeping, ensuring late arrivals find work.
132    ///
133    /// - Acquire: `AcqRel` (synchronizes with Release from producers)
134    /// - Release: `Release` (makes queue data visible to acquirers)
135    permits: CachePadded<AtomicU64>,
136
137    /// **Approximate sleeper count**: Best-effort tracking of parked threads.
138    ///
139    /// Used to throttle `cv.notify_one()` calls in `release()`. We only wake
140    /// up to `min(permits, sleepers)` threads to avoid unnecessary notifications.
141    ///
142    /// Uses `Relaxed` ordering since exactness isn't required:
143    /// - Over-estimate: Extra notify_one() calls (threads recheck permits)
144    /// - Under-estimate: Permits accumulate, future wakeups succeed
145    sleepers: CachePadded<AtomicUsize>,
146
147    /// **Active worker count**: Total number of FastTaskWorker threads currently running.
148    ///
149    /// Updated when workers start (register_worker) and stop (unregister_worker).
150    /// Workers periodically check this to reconfigure their signal partitions for
151    /// optimal load distribution with minimal contention.
152    ///
153    /// Uses `Relaxed` ordering since workers can tolerate slightly stale values
154    /// and will eventually reconfigure on the next periodic check.
155    worker_count: CachePadded<AtomicUsize>,
156
157    /// **Partition summary**: Bitmap tracking which leafs in this worker's SummaryTree partition
158    /// have active tasks (up to 64 leafs per partition).
159    ///
160    /// Bit `i` corresponds to leaf `partition_start + i` in the global SummaryTree.
161    /// This allows each worker to maintain a cache-local view of its assigned partition,
162    /// enabling O(1) work checking without scanning the full tree.
163    ///
164    /// Updated via `sync_partition_summary()` before parking. Uses `Relaxed` ordering
165    /// since it's a hint (false positives are safe, workers verify actual task availability).
166    ///
167    /// When partition_summary transitions 0→non-zero, `STATUS_BIT_PARTITION`
168    /// is set and a permit is added to wake the worker.
169    partition_summary: CachePadded<AtomicU64>,
170
171    /// Mutex for condvar (only used in blocking paths)
172    m: Mutex<()>,
173
174    /// Condvar for parking/waking threads
175    cv: Condvar,
176}
177
178impl WorkerWaker {
179    pub fn new() -> Self {
180        debug_assert_eq!(
181            (STATUS_BIT_PARTITION | STATUS_BIT_YIELD) & STATUS_SUMMARY_MASK,
182            0,
183            "status control bits must not overlap summary mask"
184        );
185        Self {
186            status: CachePadded::new(AtomicU64::new(0)),
187            permits: CachePadded::new(AtomicU64::new(0)),
188            sleepers: CachePadded::new(AtomicUsize::new(0)),
189            worker_count: CachePadded::new(AtomicUsize::new(0)),
190            partition_summary: CachePadded::new(AtomicU64::new(0)),
191            m: Mutex::new(()),
192            cv: Condvar::new(),
193        }
194    }
195
196    // ────────────────────────────────────────────────────────────────────────────
197    // PRODUCER-SIDE API
198    // ────────────────────────────────────────────────────────────────────────────
199
200    /// Returns the full 64-bit raw status word for this worker,
201    /// which contains all control and summary bits.
202    ///
203    /// # Details
204    /// The status word encodes:
205    /// - Status control bits (e.g., yield, partition-ready)
206    /// - Partition summary bits (track active leafs in this partition)
207    ///
208    /// This is a low-level snapshot, useful for diagnostics, debugging,
209    /// or fast checks on global/partition state.
210    ///
211    /// # Memory Ordering
212    /// Uses relaxed ordering for performance, as consumers
213    /// tolerate minor staleness and correctness is ensured elsewhere.
214    #[inline]
215    pub fn status(&self) -> u64 {
216        self.status.load(Ordering::Relaxed)
217    }
218
219    /// Returns the current state of the primary control bits ("yield" and "partition").
220    ///
221    /// # Returns
222    /// A tuple `(is_yield, is_partition_active)` representing:
223    /// - `is_yield`: Whether the yield control bit is set, instructing the worker to yield.
224    /// - `is_partition_active`: Whether the partition summary bit is set, indicating there is pending work detected in this worker's assigned partition.
225    ///
226    /// This allows higher-level logic to react based on whether the worker
227    /// should yield or has instant work available.
228    ///
229    /// # Memory Ordering
230    /// Uses relaxed ordering for performance, as spurious
231    /// staleness is benign and status is periodically refreshed.
232    #[inline]
233    pub fn status_bits(&self) -> (bool, bool) {
234        let status = self.status.load(Ordering::Relaxed);
235        // (yield, partition)
236        (
237            status & STATUS_BIT_YIELD != 0,
238            status & STATUS_BIT_PARTITION != 0,
239        )
240    }
241
242    /// Sets the `STATUS_BIT_YIELD` flag for this worker and releases a permit if it was not previously set.
243    ///
244    /// # Purpose
245    /// Requests the worker to yield (i.e., temporarily relinquish active scheduling)
246    /// so that other workers can take priority or perform balancing. This enables
247    /// cooperative multitasking among workers in high-contention or handoff scenarios.
248    ///
249    /// # Behavior
250    /// - If the yield bit was previously unset (i.e., this is the first request to yield),
251    ///   this method also releases one permit to ensure the sleeping worker receives a wakeup.
252    /// - If already set, does nothing except marking the yield flag again (idempotent).
253    ///
254    /// # Concurrency
255    /// Safe for concurrent use: races to set the yield bit and release permits are benign.
256    ///
257    /// # Memory Ordering
258    /// Uses Acquire/Release ordering to ensure that the yield bit is
259    /// visible to consumers before subsequent state changes or wakeups.
260    #[inline]
261    pub fn mark_yield(&self) {
262        let prev = self.status.fetch_or(STATUS_BIT_YIELD, Ordering::AcqRel);
263        if prev & STATUS_BIT_YIELD == 0 {
264            self.release(1);
265        }
266    }
267
268    /// Attempts to clear the yield bit (`STATUS_BIT_YIELD`) in the status word.
269    ///
270    /// # Purpose
271    ///
272    /// This function is used to indicate that the current worker should stop yielding,
273    /// i.e., it is no longer in a yielded state and is eligible to process new work.
274    /// The yield bit is typically set to signal a worker to yield and released to
275    /// allow the worker to resume normal operation. Clearing this bit is a
276    /// coordinated operation to avoid spurious lost work or premature reactivation.
277    ///
278    /// # Concurrency
279    ///
280    /// The method uses a loop with atomic compare-and-exchange to guarantee that the
281    /// yield bit is only cleared if it was previously set, handling concurrent attempts
282    /// to manipulate this bit. In case there is a race and the bit has already been
283    /// cleared by another thread, this function will exit quietly and make no changes.
284    ///
285    /// # Behavior
286    ///
287    /// - If the yield bit is already clear, the function returns immediately.
288    /// - Otherwise, it performs a compare-and-exchange to clear the bit. If this
289    ///   succeeds, it exits; if not, it reloads the word and repeats the process,
290    ///   only trying again if the yield bit is still set.
291    #[inline]
292    pub fn try_unmark_yield(&self) {
293        loop {
294            // Load the current status word with Acquire ordering to observe the latest status.
295            let snapshot = self.status.load(Ordering::Acquire);
296            // If the yield bit is not set, no action is needed.
297            if snapshot & STATUS_BIT_YIELD == 0 {
298                return;
299            }
300
301            // Attempt to clear the yield bit atomically while preserving other bits.
302            match self.status.compare_exchange(
303                snapshot,
304                snapshot & !STATUS_BIT_YIELD,
305                Ordering::AcqRel,
306                Ordering::Relaxed,
307            ) {
308                // If successful, the yield bit was cleared; return.
309                Ok(_) => return,
310                // If status changed in the meantime, reload and retry if the yield bit is still set.
311                Err(actual) => {
312                    if actual & STATUS_BIT_YIELD == 0 {
313                        return;
314                    }
315                }
316            }
317        }
318    }
319
320    /// Marks the partition bit as active, indicating there is work in the partition.
321    ///
322    /// This sets the `STATUS_BIT_PARTITION` bit in the status word. If this was a
323    /// transition from no active partition to active (i.e., the bit was previously
324    /// clear), it releases one permit to wake up a worker to process tasks in this
325    /// partition.
326    ///
327    /// # Example
328    ///
329    /// ```
330    /// // Called when a leaf in the partition becomes non-empty
331    /// waker.mark_tasks();
332    /// ```
333    #[inline]
334    pub fn mark_tasks(&self) {
335        let prev = self.status.fetch_or(STATUS_BIT_PARTITION, Ordering::AcqRel);
336        if prev & STATUS_BIT_PARTITION == 0 {
337            self.release(1);
338        }
339    }
340
341    /// Attempts to clear the partition active bit (`STATUS_BIT_PARTITION`) in the status word if no
342    /// leaves in the partition are active.
343    ///
344    /// This is typically called after a partition leaf transitions to empty. If the bit was set
345    /// (partition was active), this function clears it, indicating no more work is present in the partition.
346    /// If new work becomes available (i.e., the partition_summary is nonzero) after the bit is cleared,
347    /// it immediately re-arms the bit to avoid lost wakeups.
348    ///
349    /// This function is safe to call spuriously and will exit without making changes if the partition bit
350    /// is already clear.
351    ///
352    /// # Concurrency
353    ///
354    /// Uses a loop with atomic compare-and-exchange to ensure the bit is only cleared if no other
355    /// thread has concurrently set it again. If racing with a producer, the bit will be re-armed as needed to
356    /// prevent missing new work.
357    #[inline]
358    pub fn try_unmark_tasks(&self) {
359        loop {
360            let snapshot = self.status.load(Ordering::Relaxed);
361            if snapshot & STATUS_BIT_PARTITION == 0 {
362                return;
363            }
364
365            match self.status.compare_exchange(
366                snapshot,
367                snapshot & !STATUS_BIT_PARTITION,
368                Ordering::AcqRel,
369                Ordering::Relaxed,
370            ) {
371                Ok(_) => {
372                    // If new partition work arrived concurrently, re-arm the bit
373                    if self.partition_summary.load(Ordering::Acquire) != 0 {
374                        self.mark_tasks();
375                    }
376                    return;
377                }
378                Err(actual) => {
379                    if actual & STATUS_BIT_PARTITION == 0 {
380                        return;
381                    }
382                }
383            }
384        }
385    }
386
387    /// Marks a signal word at `index` (0..63) as active in the summary.
388    ///
389    /// Called by producers when a queue transitions from empty to non-empty.
390    /// If this is a **0→1 transition** (bit was previously clear), adds 1 permit
391    /// and wakes 1 sleeping thread.
392    ///
393    /// # Fast Path
394    ///
395    /// If the bit is already set, returns immediately without touching atomics.
396    /// This is the common case when multiple producers push to the same word group.
397    ///
398    /// # Arguments
399    ///
400    /// * `index` - Word index (0..63) to mark as active
401    ///
402    /// # Example
403    ///
404    /// ```ignore
405    /// // Producer pushes to queue 5 in word 0
406    /// let (was_empty, was_set) = signal.set(5);
407    /// if was_empty && was_set {
408    ///     waker.mark_active(0);  // Wake 1 consumer
409    /// }
410    /// ```
411    #[inline]
412    pub fn mark_active(&self, index: u64) {
413        debug_assert!(
414            index < STATUS_SUMMARY_BITS as u64,
415            "summary index {} exceeds {} bits",
416            index,
417            STATUS_SUMMARY_BITS
418        );
419        let mask = 1u64 << index;
420        if self.status.load(Ordering::Relaxed) & mask != 0 {
421            return;
422        }
423        let prev = self.status.fetch_or(mask, Ordering::Relaxed);
424        if prev & mask == 0 {
425            self.release(1);
426        }
427    }
428
429    /// Batch version of `mark_active()`: marks multiple words as active at once.
430    ///
431    /// Efficiently handles multiple queues becoming active simultaneously.
432    /// Releases exactly `k` permits, where `k` is the number of **0→1 transitions**
433    /// (newly-active words).
434    ///
435    /// # Optimization
436    ///
437    /// Uses a single `fetch_or` instead of calling `mark_active()` in a loop,
438    /// reducing atomic contention when many queues activate together.
439    ///
440    /// # Arguments
441    ///
442    /// * `mask` - Bitmap of words to mark active (bit `i` = word `i`)
443    ///
444    /// # Example
445    ///
446    /// ```ignore
447    /// // Multiple queues became active
448    /// let mut active_words = 0u64;
449    /// for word_idx in 0..64 {
450    ///     if word_became_active(word_idx) {
451    ///         active_words |= 1 << word_idx;
452    ///     }
453    /// }
454    /// waker.mark_active_mask(active_words);  // Single atomic op
455    /// ```
456    #[inline]
457    pub fn mark_active_mask(&self, mask: u64) {
458        let summary_mask = mask & STATUS_SUMMARY_MASK;
459        if summary_mask == 0 {
460            return;
461        }
462        let prev = self.status.fetch_or(summary_mask, Ordering::Relaxed);
463        let newly = (!prev) & summary_mask;
464        let k = newly.count_ones() as usize;
465        if k > 0 {
466            self.release(k);
467        }
468    }
469
470    /// Clears the summary bit for `bit_index` if the corresponding signal word is empty.
471    ///
472    /// This is **lazy cleanup** - consumers call this after draining a word to prevent
473    /// false positives in future `snapshot_summary()` calls. However, it's safe to skip
474    /// this; the system remains correct with stale summary bits.
475    ///
476    /// # Arguments
477    ///
478    /// * `bit_index` - Word index (0..63) to potentially clear
479    /// * `signal` - The actual signal word to check for emptiness
480    ///
481    /// # Example
482    ///
483    /// ```ignore
484    /// // After draining all queues in word 3
485    /// waker.try_unmark_if_empty(3, &signal_word_3);
486    /// ```
487    #[inline]
488    pub fn try_unmark_if_empty(&self, bit_index: u64, signal: &AtomicU64) {
489        debug_assert!(
490            bit_index < STATUS_SUMMARY_BITS as u64,
491            "summary index {} exceeds {} bits",
492            bit_index,
493            STATUS_SUMMARY_BITS
494        );
495        let mask = 1u64 << bit_index;
496
497        loop {
498            if signal.load(Ordering::Acquire) != 0 {
499                return;
500            }
501
502            let snapshot = self.status.load(Ordering::Relaxed);
503            if snapshot & mask == 0 {
504                return;
505            }
506
507            match self.status.compare_exchange(
508                snapshot,
509                snapshot & !mask,
510                Ordering::AcqRel,
511                Ordering::Relaxed,
512            ) {
513                Ok(_) => {
514                    if signal.load(Ordering::Acquire) != 0 {
515                        // Re-arm summary and release if work arrived concurrently.
516                        self.mark_active(bit_index);
517                    }
518                    return;
519                }
520                Err(actual) => {
521                    if actual & mask == 0 {
522                        return;
523                    }
524                }
525            }
526        }
527    }
528
529    /// Unconditionally clears the summary bit for `bit_index`.
530    ///
531    /// Faster than `try_unmark_if_empty()` when the caller already knows
532    /// the word is empty (avoids checking the signal word).
533    ///
534    /// # Arguments
535    ///
536    /// * `bit_index` - Word index (0..63) to clear
537    #[inline]
538    pub fn try_unmark(&self, bit_index: u64) {
539        debug_assert!(
540            bit_index < STATUS_SUMMARY_BITS as u64,
541            "summary index {} exceeds {} bits",
542            bit_index,
543            STATUS_SUMMARY_BITS
544        );
545        let mask = 1u64 << bit_index;
546        if self.status.load(Ordering::Relaxed) & mask != 0 {
547            self.status
548                .fetch_and(!(1u64 << bit_index), Ordering::Relaxed);
549        }
550    }
551
552    // ────────────────────────────────────────────────────────────────────────────
553    // CONSUMER-SIDE API
554    // ────────────────────────────────────────────────────────────────────────────
555
556    /// Returns a snapshot of the current summary bitmap.
557    ///
558    /// Consumers use this to quickly identify which word groups have potential work.
559    /// If bit `i` is set, word `i` *may* have active queues (false positives possible
560    /// due to lazy cleanup).
561    ///
562    /// # Memory Ordering
563    ///
564    /// Uses `Relaxed` because this is a hint, not a synchronization point. The actual
565    /// queue data is synchronized via acquire/release on the permits counter.
566    ///
567    /// # Returns
568    ///
569    /// A u64 bitmap where bit `i` indicates word `i` has potential work.
570    ///
571    /// # Example
572    ///
573    /// ```ignore
574    /// let summary = waker.snapshot_summary();
575    /// for word_idx in 0..64 {
576    ///     if summary & (1 << word_idx) != 0 {
577    ///         // Check queues in word_idx
578    ///     }
579    /// }
580    /// ```
581    #[inline]
582    pub fn snapshot_summary(&self) -> u64 {
583        self.status.load(Ordering::Relaxed) & STATUS_SUMMARY_MASK
584    }
585
586    /// Finds the nearest set bit to `nearest_to_index` in the summary.
587    ///
588    /// Useful for maintaining **locality**: continue working on queues near
589    /// the last processed index, improving cache behavior.
590    ///
591    /// # Arguments
592    ///
593    /// * `nearest_to_index` - Preferred starting point (0..63)
594    ///
595    /// # Returns
596    ///
597    /// The index of the nearest set bit, or undefined if summary is empty.
598    ///
599    /// # Example
600    ///
601    /// ```ignore
602    /// let mut last_word = 0;
603    /// loop {
604    ///     last_word = waker.summary_select(last_word);
605    ///     // Process queues in word last_word
606    /// }
607    /// ```
608    #[inline]
609    pub fn summary_select(&self, nearest_to_index: u64) -> u64 {
610        let summary = self.status.load(Ordering::Relaxed) & STATUS_SUMMARY_MASK;
611        crate::bits::find_nearest(summary, nearest_to_index)
612    }
613
614    // ────────────────────────────────────────────────────────────────────────────
615    // PERMIT SYSTEM (Counting Semaphore)
616    // ────────────────────────────────────────────────────────────────────────────
617
618    /// Non-blocking attempt to acquire a permit.
619    ///
620    /// Atomically decrements the permit counter if available. This is the **lock-free
621    /// fast path** used by consumers before resorting to blocking.
622    ///
623    /// # Returns
624    ///
625    /// - `true` if a permit was consumed (consumer should process work)
626    /// - `false` if no permits available (queue likely empty)
627    ///
628    /// # Memory Ordering
629    ///
630    /// Uses `AcqRel` to synchronize with producers' `Release` in `release()`.
631    /// This ensures queue data written by producers is visible to this consumer.
632    ///
633    /// # Example
634    ///
635    /// ```ignore
636    /// if waker.try_acquire() {
637    ///     // Process work (permit guarantees something is available)
638    /// } else {
639    ///     // No work, maybe park or spin
640    /// }
641    /// ```
642    #[inline]
643    pub fn try_acquire(&self) -> bool {
644        self.permits
645            .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |p| p.checked_sub(1))
646            .is_ok()
647    }
648
649    /// Blocking acquire: parks the thread until a permit becomes available.
650    ///
651    /// Tries the fast path first (`try_acquire()`), then falls back to parking
652    /// on a condvar. Handles spurious wakeups by rechecking permits in a loop.
653    ///
654    /// # Blocking Behavior
655    ///
656    /// 1. Increment `sleepers` count
657    /// 2. Wait on condvar (releases mutex)
658    /// 3. Recheck permits after wakeup
659    /// 4. Decrement `sleepers` on exit
660    ///
661    /// # Panics
662    ///
663    /// Panics if the mutex or condvar is poisoned (indicates a panic in another thread
664    /// while holding the lock).
665    ///
666    /// # Example
667    ///
668    /// ```ignore
669    /// loop {
670    ///     waker.acquire();  // Blocks until work available
671    ///     process_work();
672    /// }
673    /// ```
674    pub fn acquire(&self) {
675        if self.try_acquire() {
676            return;
677        }
678        let mut g = self.m.lock().expect("waker mutex poisoned");
679        self.sleepers.fetch_add(1, Ordering::Relaxed);
680
681        loop {
682            if self.try_acquire() {
683                self.sleepers.fetch_sub(1, Ordering::Relaxed);
684                return;
685            }
686            g = self.cv.wait(g).expect("waker condvar wait poisoned");
687        }
688    }
689
690    /// Blocking acquire with timeout.
691    ///
692    /// Like `acquire()`, but returns after `timeout` if no permit becomes available.
693    /// Useful for implementing shutdown or periodic maintenance.
694    ///
695    /// # Arguments
696    ///
697    /// * `timeout` - Maximum duration to wait
698    ///
699    /// # Returns
700    ///
701    /// - `true` if a permit was acquired
702    /// - `false` if timed out without acquiring
703    ///
704    /// # Example
705    ///
706    /// ```ignore
707    /// use std::time::Duration;
708    ///
709    /// loop {
710    ///     if waker.acquire_timeout(Duration::from_secs(1)) {
711    ///         process_work();
712    ///     } else {
713    ///         // Timeout - check for shutdown signal
714    ///         if should_shutdown() { break; }
715    ///     }
716    /// }
717    /// ```
718    pub fn acquire_timeout(&self, timeout: Duration) -> bool {
719        if self.try_acquire() {
720            return true;
721        }
722        let start = std::time::Instant::now();
723        let mut g = self.m.lock().expect("waker mutex poisoned");
724        self.sleepers.fetch_add(1, Ordering::Relaxed);
725
726        while start.elapsed() < timeout {
727            if self.try_acquire() {
728                self.sleepers.fetch_sub(1, Ordering::Relaxed);
729                return true;
730            }
731            let left = timeout.saturating_sub(start.elapsed());
732            let (gg, res) = self
733                .cv
734                .wait_timeout(g, left)
735                .expect("waker condvar wait poisoned");
736            g = gg;
737            if res.timed_out() {
738                break;
739            }
740        }
741        self.sleepers.fetch_sub(1, Ordering::Relaxed);
742        false
743    }
744
745    /// Releases `n` permits and wakes up to `n` sleeping threads.
746    ///
747    /// Called by producers (indirectly via `mark_active`) when queues become active.
748    /// Uses **targeted wakeups**: only notifies up to `min(n, sleepers)` threads,
749    /// avoiding unnecessary `notify_one()` calls.
750    ///
751    /// # Permit Accumulation
752    ///
753    /// If no threads are sleeping, permits accumulate for future consumers.
754    /// This guarantees **no lost wakeups**: late-arriving consumers find work immediately.
755    ///
756    /// # Arguments
757    ///
758    /// * `n` - Number of permits to release (typically 1 or count of newly-active queues)
759    ///
760    /// # Memory Ordering
761    ///
762    /// Uses `Release` to ensure queue data is visible to consumers who `Acquire`
763    /// via `try_acquire()`.
764    ///
765    /// # Example
766    ///
767    /// ```ignore
768    /// // Producer activates 3 queues
769    /// waker.release(3);  // Wakes up to 3 sleeping consumers
770    /// ```
771    #[inline]
772    pub fn release(&self, n: usize) {
773        if n == 0 {
774            return;
775        }
776        self.permits.fetch_add(n as u64, Ordering::Release);
777        let to_wake = n.min(self.sleepers.load(Ordering::Relaxed));
778
779        for _ in 0..to_wake {
780            self.cv.notify_one();
781        }
782    }
783
784    // ────────────────────────────────────────────────────────────────────────────
785    // INSPECTION / DEBUGGING
786    // ────────────────────────────────────────────────────────────────────────────
787
788    /// Returns the current summary bitmap.
789    ///
790    /// Useful for debugging or metrics. Equivalent to `snapshot_summary()` but
791    /// uses `Acquire` ordering for stronger visibility guarantees.
792    #[inline]
793    pub fn summary_bits(&self) -> u64 {
794        self.status.load(Ordering::Acquire) & STATUS_SUMMARY_MASK
795    }
796
797    /// Returns the current number of available permits.
798    ///
799    /// Useful for monitoring queue health or load. A high permit count may
800    /// indicate consumers are falling behind.
801    #[inline]
802    pub fn permits(&self) -> u64 {
803        self.permits.load(Ordering::Acquire)
804    }
805
806    /// Returns the approximate number of sleeping threads.
807    ///
808    /// Best-effort count (uses Relaxed ordering). Useful for debugging or
809    /// understanding system utilization.
810    #[inline]
811    pub fn sleepers(&self) -> usize {
812        self.sleepers.load(Ordering::Relaxed)
813    }
814
815    /// Increments the sleeper count, indicating a thread is about to park.
816    ///
817    /// Should be called BEFORE checking for work the final time, to prevent
818    /// lost wakeups. The calling thread must unregister via `unregister_sleeper()`
819    /// after waking up.
820    ///
821    /// # Example
822    ///
823    /// ```ignore
824    /// waker.register_sleeper();
825    /// // Final check for work
826    /// if has_work() {
827    ///     waker.unregister_sleeper();
828    ///     return; // Found work, don't park
829    /// }
830    /// // Actually park...
831    /// waker.acquire();
832    /// waker.unregister_sleeper();
833    /// ```
834    #[inline]
835    pub fn register_sleeper(&self) {
836        self.sleepers.fetch_add(1, Ordering::Relaxed);
837    }
838
839    /// Decrements the sleeper count, indicating a thread has woken up.
840    ///
841    /// Should be called after waking up from `acquire()` or if aborting
842    /// a park attempt after `register_sleeper()`.
843    ///
844    /// # Example
845    ///
846    /// ```ignore
847    /// waker.register_sleeper();
848    /// // ... park ...
849    /// waker.unregister_sleeper(); // Woke up
850    /// ```
851    #[inline]
852    pub fn unregister_sleeper(&self) {
853        self.sleepers.fetch_sub(1, Ordering::Relaxed);
854    }
855
856    // ────────────────────────────────────────────────────────────────────────────
857    // WORKER MANAGEMENT API
858    // ────────────────────────────────────────────────────────────────────────────
859
860    /// Registers a new worker thread and returns the new total worker count.
861    ///
862    /// Should be called when a FastTaskWorker thread starts. Workers use this
863    /// count to partition the signal space for optimal load distribution.
864    ///
865    /// # Returns
866    ///
867    /// The new total worker count after registration.
868    ///
869    /// # Example
870    ///
871    /// ```ignore
872    /// let waker = arena.waker();
873    /// let total_workers = unsafe { (*waker).register_worker() };
874    /// println!("Now have {} workers", total_workers);
875    /// ```
876    #[inline]
877    pub fn register_worker(&self) -> usize {
878        self.worker_count.fetch_add(1, Ordering::Relaxed) + 1
879    }
880
881    /// Unregisters a worker thread and returns the new total worker count.
882    ///
883    /// Should be called when a FastTaskWorker thread stops. This allows
884    /// remaining workers to reconfigure their partitions.
885    ///
886    /// # Returns
887    ///
888    /// The new total worker count after unregistration.
889    ///
890    /// # Example
891    ///
892    /// ```ignore
893    /// // Worker stopping
894    /// let waker = arena.waker();
895    /// let remaining_workers = unsafe { (*waker).unregister_worker() };
896    /// println!("{} workers remaining", remaining_workers);
897    /// ```
898    #[inline]
899    pub fn unregister_worker(&self) -> usize {
900        self.worker_count
901            .fetch_sub(1, Ordering::Relaxed)
902            .saturating_sub(1)
903    }
904
905    /// Returns the current number of active worker threads.
906    ///
907    /// Workers periodically check this value to detect when the worker count
908    /// has changed and reconfigure their signal partitions accordingly.
909    ///
910    /// Uses Relaxed ordering since workers can tolerate slightly stale values
911    /// and will eventually see the update on their next check.
912    ///
913    /// # Example
914    ///
915    /// ```ignore
916    /// let waker = arena.waker();
917    /// let count = unsafe { (*waker).get_worker_count() };
918    /// if count != cached_count {
919    ///     // Reconfigure partition
920    /// }
921    /// ```
922    #[inline]
923    pub fn get_worker_count(&self) -> usize {
924        self.worker_count.load(Ordering::Relaxed)
925    }
926
927    // ────────────────────────────────────────────────────────────────────────────
928    // PARTITION SUMMARY MANAGEMENT
929    // ────────────────────────────────────────────────────────────────────────────
930
931    /// Synchronize partition summary from SummaryTree leaf range.
932    ///
933    /// Samples the worker's assigned partition of the SummaryTree and updates
934    /// the local `partition_summary` bitmap. When the partition transitions from
935    /// empty to non-empty, sets `STATUS_BIT_PARTITION` and adds a permit to wake
936    /// the worker.
937    ///
938    /// This should be called before parking to ensure the worker doesn't sleep
939    /// when tasks are available in its partition.
940    ///
941    /// # Arguments
942    ///
943    /// * `partition_start` - First leaf index in this worker's partition
944    /// * `partition_end` - One past the last leaf index (exclusive)
945    /// * `leaf_words` - Slice of AtomicU64 leaf words from SummaryTree
946    ///
947    /// # Returns
948    ///
949    /// `true` if the partition currently has work, `false` otherwise
950    ///
951    /// # Panics
952    ///
953    /// Panics in debug mode if partition is larger than 64 leafs
954    ///
955    /// # Example
956    ///
957    /// ```ignore
958    /// // Before parking, sync partition status
959    /// let waker = &service.wakers[worker_id];
960    /// let has_work = waker.sync_partition_summary(
961    ///     self.partition_start,
962    ///     self.partition_end,
963    ///     &self.arena.active_tree().leaf_words,
964    /// );
965    /// ```
966    pub fn sync_partition_summary(
967        &self,
968        partition_start: usize,
969        partition_end: usize,
970        leaf_words: &[AtomicU64],
971    ) -> bool {
972        debug_assert!(
973            partition_end >= partition_start,
974            "partition_end ({}) must be >= partition_start ({})",
975            partition_end,
976            partition_start
977        );
978
979        let partition_len = partition_end.saturating_sub(partition_start);
980        if partition_len == 0 {
981            let prev = self.partition_summary.swap(0, Ordering::AcqRel);
982            if prev != 0 {
983                self.try_unmark_tasks();
984            }
985            return false;
986        }
987
988        debug_assert!(
989            partition_len <= 64,
990            "partition size {} exceeds 64-bit bitmap capacity",
991            partition_len
992        );
993
994        let partition_mask = if partition_len >= 64 {
995            u64::MAX
996        } else {
997            (1u64 << partition_len) - 1
998        };
999
1000        loop {
1001            let mut new_summary = 0u64;
1002
1003            for (offset, leaf_idx) in (partition_start..partition_end).enumerate() {
1004                if let Some(leaf_word) = leaf_words.get(leaf_idx) {
1005                    if leaf_word.load(Ordering::Acquire) != 0 {
1006                        new_summary |= 1u64 << offset;
1007                    }
1008                }
1009            }
1010
1011            let prev = self.partition_summary.load(Ordering::Acquire);
1012            let prev_masked = prev & partition_mask;
1013
1014            if prev_masked != 0 {
1015                let mut to_clear = prev_masked & !new_summary;
1016                while to_clear != 0 {
1017                    let bit = to_clear.trailing_zeros() as usize;
1018                    let leaf_idx = partition_start + bit;
1019                    if let Some(leaf_word) = leaf_words.get(leaf_idx) {
1020                        if leaf_word.load(Ordering::Acquire) != 0 {
1021                            new_summary |= 1u64 << bit;
1022                        }
1023                    }
1024                    to_clear &= to_clear - 1;
1025                }
1026            }
1027
1028            let desired = (prev & !partition_mask) | new_summary;
1029
1030            match self.partition_summary.compare_exchange(
1031                prev,
1032                desired,
1033                Ordering::AcqRel,
1034                Ordering::Acquire,
1035            ) {
1036                Ok(_) => {
1037                    let had_work = prev_masked != 0;
1038                    let has_work = (desired & partition_mask) != 0;
1039
1040                    if has_work && !had_work {
1041                        self.mark_tasks();
1042                    } else if !has_work && had_work {
1043                        self.try_unmark_tasks();
1044                    }
1045
1046                    return has_work;
1047                }
1048                Err(_) => continue,
1049            }
1050        }
1051    }
1052
1053    /// Get current partition summary bitmap.
1054    ///
1055    /// Returns a bitmap where bit `i` indicates whether leaf `partition_start + i`
1056    /// has active tasks. This is a snapshot and may become stale immediately.
1057    ///
1058    /// Uses `Relaxed` ordering since this is a hint for optimization purposes.
1059    ///
1060    /// # Returns
1061    ///
1062    /// Bitmap of active leafs in this worker's partition
1063    #[inline]
1064    pub fn partition_summary(&self) -> u64 {
1065        self.partition_summary.load(Ordering::Relaxed)
1066    }
1067
1068    /// Check if a specific leaf in the partition has work.
1069    ///
1070    /// # Arguments
1071    ///
1072    /// * `local_leaf_idx` - Leaf index relative to partition start (0-63)
1073    ///
1074    /// # Returns
1075    ///
1076    /// `true` if the leaf appears to have work based on the cached summary
1077    ///
1078    /// # Example
1079    ///
1080    /// ```ignore
1081    /// // Check if first leaf in partition has work
1082    /// if waker.partition_leaf_has_work(0) {
1083    ///     // Try to acquire from that leaf
1084    /// }
1085    /// ```
1086    #[inline]
1087    pub fn partition_leaf_has_work(&self, local_leaf_idx: usize) -> bool {
1088        debug_assert!(
1089            local_leaf_idx < 64,
1090            "local_leaf_idx {} out of range",
1091            local_leaf_idx
1092        );
1093        let summary = self.partition_summary.load(Ordering::Relaxed);
1094        summary & (1u64 << local_leaf_idx) != 0
1095    }
1096
1097    /// Directly update partition summary for a specific leaf.
1098    ///
1099    /// This is called when a task is scheduled into a leaf to immediately update
1100    /// the partition owner's summary without waiting for the next sync.
1101    ///
1102    /// # Arguments
1103    ///
1104    /// * `local_leaf_idx` - Leaf index relative to partition start (0-63)
1105    ///
1106    /// # Returns
1107    ///
1108    /// `true` if this was the first active leaf (partition was empty before)
1109    ///
1110    /// # Example
1111    ///
1112    /// ```ignore
1113    /// // When scheduling a task, immediately update owner's partition summary
1114    /// let owner_waker = &service.wakers[owner_id];
1115    /// if owner_waker.mark_partition_leaf_active(local_leaf_idx) {
1116    ///     // This was the first task - worker will be woken by the partition flag
1117    /// }
1118    /// ```
1119    pub fn mark_partition_leaf_active(&self, local_leaf_idx: usize) -> bool {
1120        debug_assert!(
1121            local_leaf_idx < 64,
1122            "local_leaf_idx {} out of range",
1123            local_leaf_idx
1124        );
1125
1126        let mask = 1u64 << local_leaf_idx;
1127        let old_summary = self.partition_summary.fetch_or(mask, Ordering::AcqRel);
1128
1129        // If partition was empty, mark partition flag to wake the worker
1130        if old_summary == 0 {
1131            self.mark_tasks();
1132            true
1133        } else {
1134            false
1135        }
1136    }
1137
1138    /// Clear partition summary for a specific leaf.
1139    ///
1140    /// Called when a leaf becomes empty. If this was the last active leaf,
1141    /// attempts to clear the partition status bit.
1142    ///
1143    /// # Arguments
1144    ///
1145    /// * `local_leaf_idx` - Leaf index relative to partition start (0-63)
1146    pub fn clear_partition_leaf(&self, local_leaf_idx: usize) {
1147        debug_assert!(
1148            local_leaf_idx < 64,
1149            "local_leaf_idx {} out of range",
1150            local_leaf_idx
1151        );
1152
1153        let bit = 1u64 << local_leaf_idx;
1154        let old_summary = self.partition_summary.fetch_and(!bit, Ordering::AcqRel);
1155
1156        if (old_summary & bit) != 0 && (old_summary & !bit) == 0 {
1157            self.try_unmark_tasks();
1158        }
1159    }
1160}
1161
1162impl Default for WorkerWaker {
1163    fn default() -> Self {
1164        Self::new()
1165    }
1166}