maniac_runtime/runtime/waker.rs
1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
4
5use crate::utils::CachePadded;
6
7use crate::utils::bits::is_set;
8
9pub const STATUS_SUMMARY_BITS: u32 = 62;
10pub const STATUS_SUMMARY_MASK: u64 = (1u64 << STATUS_SUMMARY_BITS) - 1;
11pub const STATUS_SUMMARY_WORDS: usize = STATUS_SUMMARY_BITS as usize;
12pub const STATUS_BIT_PARTITION: u64 = 1u64 << 62;
13pub const STATUS_BIT_YIELD: u64 = 1u64 << 63;
14
15/// A cache-optimized waker that packs queue summaries and control flags into a single status word.
16///
17/// # Architecture
18///
19/// `WorkerWaker` implements a **two-level hierarchy** for efficient work discovery while
20/// keeping a single atomic source of truth for coordination data:
21///
22/// ```text
23/// Level 1: Status Word (64 bits)
24/// ┌─────────────────────────────────────┐
25/// │ Bits 0-61 │ Bit 62 │ Bit 63 │
26/// │ Queue map │ Part. │ Yield │
27/// └─────────────────────────────────────┘
28/// │ │
29/// ▼ ▼
30/// Level 2: Signal Words (external AtomicU64s)
31/// Word 0 Word 1 ... Word 61
32/// [64 q] [64 q] ... [64 q] Each bit = individual queue state
33///
34/// Total: 62 words × 64 bits = 3,968 queues
35/// ```
36///
37/// # Core Components
38///
39/// 1. **Status Bitmap** (`status`): Single u64 that stores queue-word summary bits
40/// (0‒61) plus control flags (partition/yield). Enables O(1) lookup without a
41/// second atomic.
42///
43/// 2. **Counting Semaphore** (`permits`): Tracks how many threads should be awake.
44/// Each queue transition from empty→non-empty adds exactly 1 permit, guaranteeing
45/// **no lost wakeups**.
46///
47/// 3. **Sleeper Tracking** (`sleepers`): Approximate count of parked threads.
48/// Used to throttle notifications (avoid waking more threads than necessary).
49///
50/// # Design Patterns
51///
52/// ## Cache Optimization
53/// - `CachePadded` on struct for cache-line alignment
54/// - `CachePadded` on hot atomics to prevent false sharing
55/// - Producer/consumer paths access different cache lines
56///
57/// ## Memory Ordering Strategy
58/// - **Status summary bits**: `Relaxed` - hint-based, false positives acceptable
59/// - **Permits**: `AcqRel/Release` - proper synchronization for wakeups
60/// - **Sleepers**: `Relaxed` - approximate count is sufficient
61///
62/// ## Lazy Cleanup
63/// Summary bits may remain set after queues empty (false positives).
64/// Consumers lazily clear bits via `try_unmark_if_empty()`. This trades
65/// occasional extra checks for lower overhead on the hot path.
66///
67/// # Guarantees
68///
69/// - **No lost wakeups**: Permits accumulate even if no threads are sleeping
70/// - **Bounded notifications**: Never wakes more than `sleepers` threads
71/// - **Lock-free fast path**: `try_acquire()` uses only atomics
72/// - **Summary consistency**: false positives OK, false negatives impossible
73///
74/// # Trade-offs
75///
76/// - **False positives**: Summary may indicate work when queues are empty (lazy cleanup)
77/// - **Approximate sleeper count**: May over-notify slightly (but safely)
78/// - **64-word limit**: Summary is single u64 (extensible if needed)
79///
80/// # Usage Example
81///
82/// ```ignore
83/// use std::sync::Arc;
84/// use maniac::WorkerWaker;
85///
86/// let waker = Arc::new(WorkerWaker::new());
87///
88/// // Producer: mark queue 5 in word 0 as active
89/// waker.mark_active(0); // Adds 1 permit, wakes 1 sleeper
90///
91/// // Consumer: find work via summary
92/// let summary = waker.snapshot_summary();
93/// for word_idx in (0..64).filter(|i| summary & (1 << i) != 0) {
94/// // Process queues in word_idx
95/// }
96///
97/// // Consumer: block when no work
98/// waker.acquire(); // Waits for a permit
99/// ```
100///
101/// # Performance Characteristics
102///
103/// - **mark_active**: O(1) atomic, fast if already set
104/// - **mark_active_mask**: O(1) batch update for multiple queues
105/// - **try_acquire**: O(1) lock-free
106/// - **acquire**: O(1) amortized (blocks on contention)
107/// - **snapshot_summary**: O(1) single atomic load
108///
109/// # Thread Safety
110///
111/// All methods are thread-safe. Producers and consumers can operate concurrently
112/// without coordination beyond the internal atomics and mutex/condvar for blocking.
113#[repr(align(64))]
114pub struct WorkerWaker {
115 /// **Status bitmap**: Queue-word summary bits (0‒61) plus control flags.
116 ///
117 /// - Bits 0‒61: Queue-word hot bits (`mark_active`, `try_unmark_if_empty`, etc.)
118 /// - Bit 62 (`STATUS_BIT_PARTITION`): Partition cache says work is present
119 /// - Bit 63 (`STATUS_BIT_YIELD`): Worker should yield ASAP
120 ///
121 /// Keeping everything in one atomic avoids races between independent u64s.
122 status: CachePadded<AtomicU64>,
123
124 /// **Counting semaphore**: Number of threads that should be awake (available permits).
125 ///
126 /// Incremented by producers when queues become active (0→1 transitions).
127 /// Decremented by consumers via `try_acquire()` or `acquire()`.
128 ///
129 /// **Critical invariant**: Each queue empty→non-empty transition adds exactly
130 /// 1 permit, preventing lost wakeups. Permits accumulate if no threads are
131 /// sleeping, ensuring late arrivals find work.
132 ///
133 /// - Acquire: `AcqRel` (synchronizes with Release from producers)
134 /// - Release: `Release` (makes queue data visible to acquirers)
135 permits: CachePadded<AtomicU64>,
136
137 /// **Approximate sleeper count**: Best-effort tracking of parked threads.
138 ///
139 /// Used to throttle `cv.notify_one()` calls in `release()`. We only wake
140 /// up to `min(permits, sleepers)` threads to avoid unnecessary notifications.
141 ///
142 /// Uses `Relaxed` ordering since exactness isn't required:
143 /// - Over-estimate: Extra notify_one() calls (threads recheck permits)
144 /// - Under-estimate: Permits accumulate, future wakeups succeed
145 sleepers: CachePadded<AtomicUsize>,
146
147 /// **Active worker count**: Total number of FastTaskWorker threads currently running.
148 ///
149 /// Updated when workers start (register_worker) and stop (unregister_worker).
150 /// Workers periodically check this to reconfigure their signal partitions for
151 /// optimal load distribution with minimal contention.
152 ///
153 /// Uses `Relaxed` ordering since workers can tolerate slightly stale values
154 /// and will eventually reconfigure on the next periodic check.
155 worker_count: CachePadded<AtomicUsize>,
156
157 /// **Partition summary**: Bitmap tracking which leafs in this worker's SummaryTree partition
158 /// have active tasks (up to 64 leafs per partition).
159 ///
160 /// Bit `i` corresponds to leaf `partition_start + i` in the global SummaryTree.
161 /// This allows each worker to maintain a cache-local view of its assigned partition,
162 /// enabling O(1) work checking without scanning the full tree.
163 ///
164 /// Updated via `sync_partition_summary()` before parking. Uses `Relaxed` ordering
165 /// since it's a hint (false positives are safe, workers verify actual task availability).
166 ///
167 /// When partition_summary transitions 0→non-zero, `STATUS_BIT_PARTITION`
168 /// is set and a permit is added to wake the worker.
169 partition_summary: CachePadded<AtomicU64>,
170
171 /// Mutex for condvar (only used in blocking paths)
172 m: Mutex<()>,
173
174 /// Condvar for parking/waking threads
175 cv: Condvar,
176}
177
178impl WorkerWaker {
179 pub fn new() -> Self {
180 debug_assert_eq!(
181 (STATUS_BIT_PARTITION | STATUS_BIT_YIELD) & STATUS_SUMMARY_MASK,
182 0,
183 "status control bits must not overlap summary mask"
184 );
185 Self {
186 status: CachePadded::new(AtomicU64::new(0)),
187 permits: CachePadded::new(AtomicU64::new(0)),
188 sleepers: CachePadded::new(AtomicUsize::new(0)),
189 worker_count: CachePadded::new(AtomicUsize::new(0)),
190 partition_summary: CachePadded::new(AtomicU64::new(0)),
191 m: Mutex::new(()),
192 cv: Condvar::new(),
193 }
194 }
195
196 // ────────────────────────────────────────────────────────────────────────────
197 // PRODUCER-SIDE API
198 // ────────────────────────────────────────────────────────────────────────────
199
200 /// Returns the full 64-bit raw status word for this worker,
201 /// which contains all control and summary bits.
202 ///
203 /// # Details
204 /// The status word encodes:
205 /// - Status control bits (e.g., yield, partition-ready)
206 /// - Partition summary bits (track active leafs in this partition)
207 ///
208 /// This is a low-level snapshot, useful for diagnostics, debugging,
209 /// or fast checks on global/partition state.
210 ///
211 /// # Memory Ordering
212 /// Uses relaxed ordering for performance, as consumers
213 /// tolerate minor staleness and correctness is ensured elsewhere.
214 #[inline]
215 pub fn status(&self) -> u64 {
216 self.status.load(Ordering::Relaxed)
217 }
218
219 /// Returns the current state of the primary control bits ("yield" and "partition").
220 ///
221 /// # Returns
222 /// A tuple `(is_yield, is_partition_active)` representing:
223 /// - `is_yield`: Whether the yield control bit is set, instructing the worker to yield.
224 /// - `is_partition_active`: Whether the partition summary bit is set, indicating there is pending work detected in this worker's assigned partition.
225 ///
226 /// This allows higher-level logic to react based on whether the worker
227 /// should yield or has instant work available.
228 ///
229 /// # Memory Ordering
230 /// Uses relaxed ordering for performance, as spurious
231 /// staleness is benign and status is periodically refreshed.
232 #[inline]
233 pub fn status_bits(&self) -> (bool, bool) {
234 let status = self.status.load(Ordering::Relaxed);
235 // (yield, partition)
236 (
237 status & STATUS_BIT_YIELD != 0,
238 status & STATUS_BIT_PARTITION != 0,
239 )
240 }
241
242 /// Sets the `STATUS_BIT_YIELD` flag for this worker and releases a permit if it was not previously set.
243 ///
244 /// # Purpose
245 /// Requests the worker to yield (i.e., temporarily relinquish active scheduling)
246 /// so that other workers can take priority or perform balancing. This enables
247 /// cooperative multitasking among workers in high-contention or handoff scenarios.
248 ///
249 /// # Behavior
250 /// - If the yield bit was previously unset (i.e., this is the first request to yield),
251 /// this method also releases one permit to ensure the sleeping worker receives a wakeup.
252 /// - If already set, does nothing except marking the yield flag again (idempotent).
253 ///
254 /// # Concurrency
255 /// Safe for concurrent use: races to set the yield bit and release permits are benign.
256 ///
257 /// # Memory Ordering
258 /// Uses Acquire/Release ordering to ensure that the yield bit is
259 /// visible to consumers before subsequent state changes or wakeups.
260 #[inline]
261 pub fn mark_yield(&self) {
262 let prev = self.status.fetch_or(STATUS_BIT_YIELD, Ordering::AcqRel);
263 if prev & STATUS_BIT_YIELD == 0 {
264 self.release(1);
265 }
266 }
267
268 /// Attempts to clear the yield bit (`STATUS_BIT_YIELD`) in the status word.
269 ///
270 /// # Purpose
271 ///
272 /// This function is used to indicate that the current worker should stop yielding,
273 /// i.e., it is no longer in a yielded state and is eligible to process new work.
274 /// The yield bit is typically set to signal a worker to yield and released to
275 /// allow the worker to resume normal operation. Clearing this bit is a
276 /// coordinated operation to avoid spurious lost work or premature reactivation.
277 ///
278 /// # Concurrency
279 ///
280 /// The method uses a loop with atomic compare-and-exchange to guarantee that the
281 /// yield bit is only cleared if it was previously set, handling concurrent attempts
282 /// to manipulate this bit. In case there is a race and the bit has already been
283 /// cleared by another thread, this function will exit quietly and make no changes.
284 ///
285 /// # Behavior
286 ///
287 /// - If the yield bit is already clear, the function returns immediately.
288 /// - Otherwise, it performs a compare-and-exchange to clear the bit. If this
289 /// succeeds, it exits; if not, it reloads the word and repeats the process,
290 /// only trying again if the yield bit is still set.
291 #[inline]
292 pub fn try_unmark_yield(&self) {
293 loop {
294 // Load the current status word with Acquire ordering to observe the latest status.
295 let snapshot = self.status.load(Ordering::Acquire);
296 // If the yield bit is not set, no action is needed.
297 if snapshot & STATUS_BIT_YIELD == 0 {
298 return;
299 }
300
301 // Attempt to clear the yield bit atomically while preserving other bits.
302 match self.status.compare_exchange(
303 snapshot,
304 snapshot & !STATUS_BIT_YIELD,
305 Ordering::AcqRel,
306 Ordering::Relaxed,
307 ) {
308 // If successful, the yield bit was cleared; return.
309 Ok(_) => return,
310 // If status changed in the meantime, reload and retry if the yield bit is still set.
311 Err(actual) => {
312 if actual & STATUS_BIT_YIELD == 0 {
313 return;
314 }
315 }
316 }
317 }
318 }
319
320 /// Marks the partition bit as active, indicating there is work in the partition.
321 ///
322 /// This sets the `STATUS_BIT_PARTITION` bit in the status word. If this was a
323 /// transition from no active partition to active (i.e., the bit was previously
324 /// clear), it releases one permit to wake up a worker to process tasks in this
325 /// partition.
326 ///
327 /// # Example
328 ///
329 /// ```
330 /// // Called when a leaf in the partition becomes non-empty
331 /// waker.mark_tasks();
332 /// ```
333 #[inline]
334 pub fn mark_tasks(&self) {
335 let prev = self.status.fetch_or(STATUS_BIT_PARTITION, Ordering::AcqRel);
336 if prev & STATUS_BIT_PARTITION == 0 {
337 self.release(1);
338 }
339 }
340
341 /// Attempts to clear the partition active bit (`STATUS_BIT_PARTITION`) in the status word if no
342 /// leaves in the partition are active.
343 ///
344 /// This is typically called after a partition leaf transitions to empty. If the bit was set
345 /// (partition was active), this function clears it, indicating no more work is present in the partition.
346 /// If new work becomes available (i.e., the partition_summary is nonzero) after the bit is cleared,
347 /// it immediately re-arms the bit to avoid lost wakeups.
348 ///
349 /// This function is safe to call spuriously and will exit without making changes if the partition bit
350 /// is already clear.
351 ///
352 /// # Concurrency
353 ///
354 /// Uses a loop with atomic compare-and-exchange to ensure the bit is only cleared if no other
355 /// thread has concurrently set it again. If racing with a producer, the bit will be re-armed as needed to
356 /// prevent missing new work.
357 #[inline]
358 pub fn try_unmark_tasks(&self) {
359 loop {
360 let snapshot = self.status.load(Ordering::Relaxed);
361 if snapshot & STATUS_BIT_PARTITION == 0 {
362 return;
363 }
364
365 match self.status.compare_exchange(
366 snapshot,
367 snapshot & !STATUS_BIT_PARTITION,
368 Ordering::AcqRel,
369 Ordering::Relaxed,
370 ) {
371 Ok(_) => {
372 // If new partition work arrived concurrently, re-arm the bit
373 if self.partition_summary.load(Ordering::Acquire) != 0 {
374 self.mark_tasks();
375 }
376 return;
377 }
378 Err(actual) => {
379 if actual & STATUS_BIT_PARTITION == 0 {
380 return;
381 }
382 }
383 }
384 }
385 }
386
387 /// Marks a signal word at `index` (0..63) as active in the summary.
388 ///
389 /// Called by producers when a queue transitions from empty to non-empty.
390 /// If this is a **0→1 transition** (bit was previously clear), adds 1 permit
391 /// and wakes 1 sleeping thread.
392 ///
393 /// # Fast Path
394 ///
395 /// If the bit is already set, returns immediately without touching atomics.
396 /// This is the common case when multiple producers push to the same word group.
397 ///
398 /// # Arguments
399 ///
400 /// * `index` - Word index (0..63) to mark as active
401 ///
402 /// # Example
403 ///
404 /// ```ignore
405 /// // Producer pushes to queue 5 in word 0
406 /// let (was_empty, was_set) = signal.set(5);
407 /// if was_empty && was_set {
408 /// waker.mark_active(0); // Wake 1 consumer
409 /// }
410 /// ```
411 #[inline]
412 pub fn mark_active(&self, index: u64) {
413 debug_assert!(
414 index < STATUS_SUMMARY_BITS as u64,
415 "summary index {} exceeds {} bits",
416 index,
417 STATUS_SUMMARY_BITS
418 );
419 let mask = 1u64 << index;
420 if self.status.load(Ordering::Relaxed) & mask != 0 {
421 return;
422 }
423 let prev = self.status.fetch_or(mask, Ordering::Relaxed);
424 if prev & mask == 0 {
425 self.release(1);
426 }
427 }
428
429 /// Batch version of `mark_active()`: marks multiple words as active at once.
430 ///
431 /// Efficiently handles multiple queues becoming active simultaneously.
432 /// Releases exactly `k` permits, where `k` is the number of **0→1 transitions**
433 /// (newly-active words).
434 ///
435 /// # Optimization
436 ///
437 /// Uses a single `fetch_or` instead of calling `mark_active()` in a loop,
438 /// reducing atomic contention when many queues activate together.
439 ///
440 /// # Arguments
441 ///
442 /// * `mask` - Bitmap of words to mark active (bit `i` = word `i`)
443 ///
444 /// # Example
445 ///
446 /// ```ignore
447 /// // Multiple queues became active
448 /// let mut active_words = 0u64;
449 /// for word_idx in 0..64 {
450 /// if word_became_active(word_idx) {
451 /// active_words |= 1 << word_idx;
452 /// }
453 /// }
454 /// waker.mark_active_mask(active_words); // Single atomic op
455 /// ```
456 #[inline]
457 pub fn mark_active_mask(&self, mask: u64) {
458 let summary_mask = mask & STATUS_SUMMARY_MASK;
459 if summary_mask == 0 {
460 return;
461 }
462 let prev = self.status.fetch_or(summary_mask, Ordering::Relaxed);
463 let newly = (!prev) & summary_mask;
464 let k = newly.count_ones() as usize;
465 if k > 0 {
466 self.release(k);
467 }
468 }
469
470 /// Clears the summary bit for `bit_index` if the corresponding signal word is empty.
471 ///
472 /// This is **lazy cleanup** - consumers call this after draining a word to prevent
473 /// false positives in future `snapshot_summary()` calls. However, it's safe to skip
474 /// this; the system remains correct with stale summary bits.
475 ///
476 /// # Arguments
477 ///
478 /// * `bit_index` - Word index (0..63) to potentially clear
479 /// * `signal` - The actual signal word to check for emptiness
480 ///
481 /// # Example
482 ///
483 /// ```ignore
484 /// // After draining all queues in word 3
485 /// waker.try_unmark_if_empty(3, &signal_word_3);
486 /// ```
487 #[inline]
488 pub fn try_unmark_if_empty(&self, bit_index: u64, signal: &AtomicU64) {
489 debug_assert!(
490 bit_index < STATUS_SUMMARY_BITS as u64,
491 "summary index {} exceeds {} bits",
492 bit_index,
493 STATUS_SUMMARY_BITS
494 );
495 let mask = 1u64 << bit_index;
496
497 loop {
498 if signal.load(Ordering::Acquire) != 0 {
499 return;
500 }
501
502 let snapshot = self.status.load(Ordering::Relaxed);
503 if snapshot & mask == 0 {
504 return;
505 }
506
507 match self.status.compare_exchange(
508 snapshot,
509 snapshot & !mask,
510 Ordering::AcqRel,
511 Ordering::Relaxed,
512 ) {
513 Ok(_) => {
514 if signal.load(Ordering::Acquire) != 0 {
515 // Re-arm summary and release if work arrived concurrently.
516 self.mark_active(bit_index);
517 }
518 return;
519 }
520 Err(actual) => {
521 if actual & mask == 0 {
522 return;
523 }
524 }
525 }
526 }
527 }
528
529 /// Unconditionally clears the summary bit for `bit_index`.
530 ///
531 /// Faster than `try_unmark_if_empty()` when the caller already knows
532 /// the word is empty (avoids checking the signal word).
533 ///
534 /// # Arguments
535 ///
536 /// * `bit_index` - Word index (0..63) to clear
537 #[inline]
538 pub fn try_unmark(&self, bit_index: u64) {
539 debug_assert!(
540 bit_index < STATUS_SUMMARY_BITS as u64,
541 "summary index {} exceeds {} bits",
542 bit_index,
543 STATUS_SUMMARY_BITS
544 );
545 let mask = 1u64 << bit_index;
546 if self.status.load(Ordering::Relaxed) & mask != 0 {
547 self.status
548 .fetch_and(!(1u64 << bit_index), Ordering::Relaxed);
549 }
550 }
551
552 // ────────────────────────────────────────────────────────────────────────────
553 // CONSUMER-SIDE API
554 // ────────────────────────────────────────────────────────────────────────────
555
556 /// Returns a snapshot of the current summary bitmap.
557 ///
558 /// Consumers use this to quickly identify which word groups have potential work.
559 /// If bit `i` is set, word `i` *may* have active queues (false positives possible
560 /// due to lazy cleanup).
561 ///
562 /// # Memory Ordering
563 ///
564 /// Uses `Relaxed` because this is a hint, not a synchronization point. The actual
565 /// queue data is synchronized via acquire/release on the permits counter.
566 ///
567 /// # Returns
568 ///
569 /// A u64 bitmap where bit `i` indicates word `i` has potential work.
570 ///
571 /// # Example
572 ///
573 /// ```ignore
574 /// let summary = waker.snapshot_summary();
575 /// for word_idx in 0..64 {
576 /// if summary & (1 << word_idx) != 0 {
577 /// // Check queues in word_idx
578 /// }
579 /// }
580 /// ```
581 #[inline]
582 pub fn snapshot_summary(&self) -> u64 {
583 self.status.load(Ordering::Relaxed) & STATUS_SUMMARY_MASK
584 }
585
586 /// Finds the nearest set bit to `nearest_to_index` in the summary.
587 ///
588 /// Useful for maintaining **locality**: continue working on queues near
589 /// the last processed index, improving cache behavior.
590 ///
591 /// # Arguments
592 ///
593 /// * `nearest_to_index` - Preferred starting point (0..63)
594 ///
595 /// # Returns
596 ///
597 /// The index of the nearest set bit, or undefined if summary is empty.
598 ///
599 /// # Example
600 ///
601 /// ```ignore
602 /// let mut last_word = 0;
603 /// loop {
604 /// last_word = waker.summary_select(last_word);
605 /// // Process queues in word last_word
606 /// }
607 /// ```
608 #[inline]
609 pub fn summary_select(&self, nearest_to_index: u64) -> u64 {
610 let summary = self.status.load(Ordering::Relaxed) & STATUS_SUMMARY_MASK;
611 crate::bits::find_nearest(summary, nearest_to_index)
612 }
613
614 // ────────────────────────────────────────────────────────────────────────────
615 // PERMIT SYSTEM (Counting Semaphore)
616 // ────────────────────────────────────────────────────────────────────────────
617
618 /// Non-blocking attempt to acquire a permit.
619 ///
620 /// Atomically decrements the permit counter if available. This is the **lock-free
621 /// fast path** used by consumers before resorting to blocking.
622 ///
623 /// # Returns
624 ///
625 /// - `true` if a permit was consumed (consumer should process work)
626 /// - `false` if no permits available (queue likely empty)
627 ///
628 /// # Memory Ordering
629 ///
630 /// Uses `AcqRel` to synchronize with producers' `Release` in `release()`.
631 /// This ensures queue data written by producers is visible to this consumer.
632 ///
633 /// # Example
634 ///
635 /// ```ignore
636 /// if waker.try_acquire() {
637 /// // Process work (permit guarantees something is available)
638 /// } else {
639 /// // No work, maybe park or spin
640 /// }
641 /// ```
642 #[inline]
643 pub fn try_acquire(&self) -> bool {
644 self.permits
645 .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |p| p.checked_sub(1))
646 .is_ok()
647 }
648
649 /// Blocking acquire: parks the thread until a permit becomes available.
650 ///
651 /// Tries the fast path first (`try_acquire()`), then falls back to parking
652 /// on a condvar. Handles spurious wakeups by rechecking permits in a loop.
653 ///
654 /// # Blocking Behavior
655 ///
656 /// 1. Increment `sleepers` count
657 /// 2. Wait on condvar (releases mutex)
658 /// 3. Recheck permits after wakeup
659 /// 4. Decrement `sleepers` on exit
660 ///
661 /// # Panics
662 ///
663 /// Panics if the mutex or condvar is poisoned (indicates a panic in another thread
664 /// while holding the lock).
665 ///
666 /// # Example
667 ///
668 /// ```ignore
669 /// loop {
670 /// waker.acquire(); // Blocks until work available
671 /// process_work();
672 /// }
673 /// ```
674 pub fn acquire(&self) {
675 if self.try_acquire() {
676 return;
677 }
678 let mut g = self.m.lock().expect("waker mutex poisoned");
679 self.sleepers.fetch_add(1, Ordering::Relaxed);
680
681 loop {
682 if self.try_acquire() {
683 self.sleepers.fetch_sub(1, Ordering::Relaxed);
684 return;
685 }
686 g = self.cv.wait(g).expect("waker condvar wait poisoned");
687 }
688 }
689
690 /// Blocking acquire with timeout.
691 ///
692 /// Like `acquire()`, but returns after `timeout` if no permit becomes available.
693 /// Useful for implementing shutdown or periodic maintenance.
694 ///
695 /// # Arguments
696 ///
697 /// * `timeout` - Maximum duration to wait
698 ///
699 /// # Returns
700 ///
701 /// - `true` if a permit was acquired
702 /// - `false` if timed out without acquiring
703 ///
704 /// # Example
705 ///
706 /// ```ignore
707 /// use std::time::Duration;
708 ///
709 /// loop {
710 /// if waker.acquire_timeout(Duration::from_secs(1)) {
711 /// process_work();
712 /// } else {
713 /// // Timeout - check for shutdown signal
714 /// if should_shutdown() { break; }
715 /// }
716 /// }
717 /// ```
718 pub fn acquire_timeout(&self, timeout: Duration) -> bool {
719 if self.try_acquire() {
720 return true;
721 }
722 let start = std::time::Instant::now();
723 let mut g = self.m.lock().expect("waker mutex poisoned");
724 self.sleepers.fetch_add(1, Ordering::Relaxed);
725
726 while start.elapsed() < timeout {
727 if self.try_acquire() {
728 self.sleepers.fetch_sub(1, Ordering::Relaxed);
729 return true;
730 }
731 let left = timeout.saturating_sub(start.elapsed());
732 let (gg, res) = self
733 .cv
734 .wait_timeout(g, left)
735 .expect("waker condvar wait poisoned");
736 g = gg;
737 if res.timed_out() {
738 break;
739 }
740 }
741 self.sleepers.fetch_sub(1, Ordering::Relaxed);
742 false
743 }
744
745 /// Releases `n` permits and wakes up to `n` sleeping threads.
746 ///
747 /// Called by producers (indirectly via `mark_active`) when queues become active.
748 /// Uses **targeted wakeups**: only notifies up to `min(n, sleepers)` threads,
749 /// avoiding unnecessary `notify_one()` calls.
750 ///
751 /// # Permit Accumulation
752 ///
753 /// If no threads are sleeping, permits accumulate for future consumers.
754 /// This guarantees **no lost wakeups**: late-arriving consumers find work immediately.
755 ///
756 /// # Arguments
757 ///
758 /// * `n` - Number of permits to release (typically 1 or count of newly-active queues)
759 ///
760 /// # Memory Ordering
761 ///
762 /// Uses `Release` to ensure queue data is visible to consumers who `Acquire`
763 /// via `try_acquire()`.
764 ///
765 /// # Example
766 ///
767 /// ```ignore
768 /// // Producer activates 3 queues
769 /// waker.release(3); // Wakes up to 3 sleeping consumers
770 /// ```
771 #[inline]
772 pub fn release(&self, n: usize) {
773 if n == 0 {
774 return;
775 }
776 self.permits.fetch_add(n as u64, Ordering::Release);
777 let to_wake = n.min(self.sleepers.load(Ordering::Relaxed));
778
779 for _ in 0..to_wake {
780 self.cv.notify_one();
781 }
782 }
783
784 // ────────────────────────────────────────────────────────────────────────────
785 // INSPECTION / DEBUGGING
786 // ────────────────────────────────────────────────────────────────────────────
787
788 /// Returns the current summary bitmap.
789 ///
790 /// Useful for debugging or metrics. Equivalent to `snapshot_summary()` but
791 /// uses `Acquire` ordering for stronger visibility guarantees.
792 #[inline]
793 pub fn summary_bits(&self) -> u64 {
794 self.status.load(Ordering::Acquire) & STATUS_SUMMARY_MASK
795 }
796
797 /// Returns the current number of available permits.
798 ///
799 /// Useful for monitoring queue health or load. A high permit count may
800 /// indicate consumers are falling behind.
801 #[inline]
802 pub fn permits(&self) -> u64 {
803 self.permits.load(Ordering::Acquire)
804 }
805
806 /// Returns the approximate number of sleeping threads.
807 ///
808 /// Best-effort count (uses Relaxed ordering). Useful for debugging or
809 /// understanding system utilization.
810 #[inline]
811 pub fn sleepers(&self) -> usize {
812 self.sleepers.load(Ordering::Relaxed)
813 }
814
815 /// Increments the sleeper count, indicating a thread is about to park.
816 ///
817 /// Should be called BEFORE checking for work the final time, to prevent
818 /// lost wakeups. The calling thread must unregister via `unregister_sleeper()`
819 /// after waking up.
820 ///
821 /// # Example
822 ///
823 /// ```ignore
824 /// waker.register_sleeper();
825 /// // Final check for work
826 /// if has_work() {
827 /// waker.unregister_sleeper();
828 /// return; // Found work, don't park
829 /// }
830 /// // Actually park...
831 /// waker.acquire();
832 /// waker.unregister_sleeper();
833 /// ```
834 #[inline]
835 pub fn register_sleeper(&self) {
836 self.sleepers.fetch_add(1, Ordering::Relaxed);
837 }
838
839 /// Decrements the sleeper count, indicating a thread has woken up.
840 ///
841 /// Should be called after waking up from `acquire()` or if aborting
842 /// a park attempt after `register_sleeper()`.
843 ///
844 /// # Example
845 ///
846 /// ```ignore
847 /// waker.register_sleeper();
848 /// // ... park ...
849 /// waker.unregister_sleeper(); // Woke up
850 /// ```
851 #[inline]
852 pub fn unregister_sleeper(&self) {
853 self.sleepers.fetch_sub(1, Ordering::Relaxed);
854 }
855
856 // ────────────────────────────────────────────────────────────────────────────
857 // WORKER MANAGEMENT API
858 // ────────────────────────────────────────────────────────────────────────────
859
860 /// Registers a new worker thread and returns the new total worker count.
861 ///
862 /// Should be called when a FastTaskWorker thread starts. Workers use this
863 /// count to partition the signal space for optimal load distribution.
864 ///
865 /// # Returns
866 ///
867 /// The new total worker count after registration.
868 ///
869 /// # Example
870 ///
871 /// ```ignore
872 /// let waker = arena.waker();
873 /// let total_workers = unsafe { (*waker).register_worker() };
874 /// println!("Now have {} workers", total_workers);
875 /// ```
876 #[inline]
877 pub fn register_worker(&self) -> usize {
878 self.worker_count.fetch_add(1, Ordering::Relaxed) + 1
879 }
880
881 /// Unregisters a worker thread and returns the new total worker count.
882 ///
883 /// Should be called when a FastTaskWorker thread stops. This allows
884 /// remaining workers to reconfigure their partitions.
885 ///
886 /// # Returns
887 ///
888 /// The new total worker count after unregistration.
889 ///
890 /// # Example
891 ///
892 /// ```ignore
893 /// // Worker stopping
894 /// let waker = arena.waker();
895 /// let remaining_workers = unsafe { (*waker).unregister_worker() };
896 /// println!("{} workers remaining", remaining_workers);
897 /// ```
898 #[inline]
899 pub fn unregister_worker(&self) -> usize {
900 self.worker_count
901 .fetch_sub(1, Ordering::Relaxed)
902 .saturating_sub(1)
903 }
904
905 /// Returns the current number of active worker threads.
906 ///
907 /// Workers periodically check this value to detect when the worker count
908 /// has changed and reconfigure their signal partitions accordingly.
909 ///
910 /// Uses Relaxed ordering since workers can tolerate slightly stale values
911 /// and will eventually see the update on their next check.
912 ///
913 /// # Example
914 ///
915 /// ```ignore
916 /// let waker = arena.waker();
917 /// let count = unsafe { (*waker).get_worker_count() };
918 /// if count != cached_count {
919 /// // Reconfigure partition
920 /// }
921 /// ```
922 #[inline]
923 pub fn get_worker_count(&self) -> usize {
924 self.worker_count.load(Ordering::Relaxed)
925 }
926
927 // ────────────────────────────────────────────────────────────────────────────
928 // PARTITION SUMMARY MANAGEMENT
929 // ────────────────────────────────────────────────────────────────────────────
930
931 /// Synchronize partition summary from SummaryTree leaf range.
932 ///
933 /// Samples the worker's assigned partition of the SummaryTree and updates
934 /// the local `partition_summary` bitmap. When the partition transitions from
935 /// empty to non-empty, sets `STATUS_BIT_PARTITION` and adds a permit to wake
936 /// the worker.
937 ///
938 /// This should be called before parking to ensure the worker doesn't sleep
939 /// when tasks are available in its partition.
940 ///
941 /// # Arguments
942 ///
943 /// * `partition_start` - First leaf index in this worker's partition
944 /// * `partition_end` - One past the last leaf index (exclusive)
945 /// * `leaf_words` - Slice of AtomicU64 leaf words from SummaryTree
946 ///
947 /// # Returns
948 ///
949 /// `true` if the partition currently has work, `false` otherwise
950 ///
951 /// # Panics
952 ///
953 /// Panics in debug mode if partition is larger than 64 leafs
954 ///
955 /// # Example
956 ///
957 /// ```ignore
958 /// // Before parking, sync partition status
959 /// let waker = &service.wakers[worker_id];
960 /// let has_work = waker.sync_partition_summary(
961 /// self.partition_start,
962 /// self.partition_end,
963 /// &self.arena.active_tree().leaf_words,
964 /// );
965 /// ```
966 pub fn sync_partition_summary(
967 &self,
968 partition_start: usize,
969 partition_end: usize,
970 leaf_words: &[AtomicU64],
971 ) -> bool {
972 debug_assert!(
973 partition_end >= partition_start,
974 "partition_end ({}) must be >= partition_start ({})",
975 partition_end,
976 partition_start
977 );
978
979 let partition_len = partition_end.saturating_sub(partition_start);
980 if partition_len == 0 {
981 let prev = self.partition_summary.swap(0, Ordering::AcqRel);
982 if prev != 0 {
983 self.try_unmark_tasks();
984 }
985 return false;
986 }
987
988 debug_assert!(
989 partition_len <= 64,
990 "partition size {} exceeds 64-bit bitmap capacity",
991 partition_len
992 );
993
994 let partition_mask = if partition_len >= 64 {
995 u64::MAX
996 } else {
997 (1u64 << partition_len) - 1
998 };
999
1000 loop {
1001 let mut new_summary = 0u64;
1002
1003 for (offset, leaf_idx) in (partition_start..partition_end).enumerate() {
1004 if let Some(leaf_word) = leaf_words.get(leaf_idx) {
1005 if leaf_word.load(Ordering::Acquire) != 0 {
1006 new_summary |= 1u64 << offset;
1007 }
1008 }
1009 }
1010
1011 let prev = self.partition_summary.load(Ordering::Acquire);
1012 let prev_masked = prev & partition_mask;
1013
1014 if prev_masked != 0 {
1015 let mut to_clear = prev_masked & !new_summary;
1016 while to_clear != 0 {
1017 let bit = to_clear.trailing_zeros() as usize;
1018 let leaf_idx = partition_start + bit;
1019 if let Some(leaf_word) = leaf_words.get(leaf_idx) {
1020 if leaf_word.load(Ordering::Acquire) != 0 {
1021 new_summary |= 1u64 << bit;
1022 }
1023 }
1024 to_clear &= to_clear - 1;
1025 }
1026 }
1027
1028 let desired = (prev & !partition_mask) | new_summary;
1029
1030 match self.partition_summary.compare_exchange(
1031 prev,
1032 desired,
1033 Ordering::AcqRel,
1034 Ordering::Acquire,
1035 ) {
1036 Ok(_) => {
1037 let had_work = prev_masked != 0;
1038 let has_work = (desired & partition_mask) != 0;
1039
1040 if has_work && !had_work {
1041 self.mark_tasks();
1042 } else if !has_work && had_work {
1043 self.try_unmark_tasks();
1044 }
1045
1046 return has_work;
1047 }
1048 Err(_) => continue,
1049 }
1050 }
1051 }
1052
1053 /// Get current partition summary bitmap.
1054 ///
1055 /// Returns a bitmap where bit `i` indicates whether leaf `partition_start + i`
1056 /// has active tasks. This is a snapshot and may become stale immediately.
1057 ///
1058 /// Uses `Relaxed` ordering since this is a hint for optimization purposes.
1059 ///
1060 /// # Returns
1061 ///
1062 /// Bitmap of active leafs in this worker's partition
1063 #[inline]
1064 pub fn partition_summary(&self) -> u64 {
1065 self.partition_summary.load(Ordering::Relaxed)
1066 }
1067
1068 /// Check if a specific leaf in the partition has work.
1069 ///
1070 /// # Arguments
1071 ///
1072 /// * `local_leaf_idx` - Leaf index relative to partition start (0-63)
1073 ///
1074 /// # Returns
1075 ///
1076 /// `true` if the leaf appears to have work based on the cached summary
1077 ///
1078 /// # Example
1079 ///
1080 /// ```ignore
1081 /// // Check if first leaf in partition has work
1082 /// if waker.partition_leaf_has_work(0) {
1083 /// // Try to acquire from that leaf
1084 /// }
1085 /// ```
1086 #[inline]
1087 pub fn partition_leaf_has_work(&self, local_leaf_idx: usize) -> bool {
1088 debug_assert!(
1089 local_leaf_idx < 64,
1090 "local_leaf_idx {} out of range",
1091 local_leaf_idx
1092 );
1093 let summary = self.partition_summary.load(Ordering::Relaxed);
1094 summary & (1u64 << local_leaf_idx) != 0
1095 }
1096
1097 /// Directly update partition summary for a specific leaf.
1098 ///
1099 /// This is called when a task is scheduled into a leaf to immediately update
1100 /// the partition owner's summary without waiting for the next sync.
1101 ///
1102 /// # Arguments
1103 ///
1104 /// * `local_leaf_idx` - Leaf index relative to partition start (0-63)
1105 ///
1106 /// # Returns
1107 ///
1108 /// `true` if this was the first active leaf (partition was empty before)
1109 ///
1110 /// # Example
1111 ///
1112 /// ```ignore
1113 /// // When scheduling a task, immediately update owner's partition summary
1114 /// let owner_waker = &service.wakers[owner_id];
1115 /// if owner_waker.mark_partition_leaf_active(local_leaf_idx) {
1116 /// // This was the first task - worker will be woken by the partition flag
1117 /// }
1118 /// ```
1119 pub fn mark_partition_leaf_active(&self, local_leaf_idx: usize) -> bool {
1120 debug_assert!(
1121 local_leaf_idx < 64,
1122 "local_leaf_idx {} out of range",
1123 local_leaf_idx
1124 );
1125
1126 let mask = 1u64 << local_leaf_idx;
1127 let old_summary = self.partition_summary.fetch_or(mask, Ordering::AcqRel);
1128
1129 // If partition was empty, mark partition flag to wake the worker
1130 if old_summary == 0 {
1131 self.mark_tasks();
1132 true
1133 } else {
1134 false
1135 }
1136 }
1137
1138 /// Clear partition summary for a specific leaf.
1139 ///
1140 /// Called when a leaf becomes empty. If this was the last active leaf,
1141 /// attempts to clear the partition status bit.
1142 ///
1143 /// # Arguments
1144 ///
1145 /// * `local_leaf_idx` - Leaf index relative to partition start (0-63)
1146 pub fn clear_partition_leaf(&self, local_leaf_idx: usize) {
1147 debug_assert!(
1148 local_leaf_idx < 64,
1149 "local_leaf_idx {} out of range",
1150 local_leaf_idx
1151 );
1152
1153 let bit = 1u64 << local_leaf_idx;
1154 let old_summary = self.partition_summary.fetch_and(!bit, Ordering::AcqRel);
1155
1156 if (old_summary & bit) != 0 && (old_summary & !bit) == 0 {
1157 self.try_unmark_tasks();
1158 }
1159 }
1160}
1161
1162impl Default for WorkerWaker {
1163 fn default() -> Self {
1164 Self::new()
1165 }
1166}