maniac_runtime/sync/signal.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
3
4use crate::utils::CachePadded;
5
6use std::sync::atomic::AtomicUsize;
7use std::sync::{Condvar, Mutex};
8use std::time::Duration;
9
10use crate::utils::bits::is_set;
11
12pub const STATUS_SUMMARY_BITS: u32 = 64;
13
14/// Number of bits per signal word (64-bit atomic).
15///
16/// Each signal word can track up to 64 queues. This matches the width of
17/// `AtomicU64` and provides efficient bit manipulation via hardware instructions
18/// (POPCNT, BSF, etc.).
19pub const SIGNAL_CAPACITY: u64 = 64;
20
21/// Bitmask for extracting bit index within a signal word.
22///
23/// Equivalent to `index % 64`, used for fast modulo via bitwise AND:
24/// ```ignore
25/// bit_index = queue_id & SIGNAL_MASK;
26/// ```
27pub const SIGNAL_MASK: u64 = SIGNAL_CAPACITY - 1;
28
29/// A cache-line padded 64-bit atomic bitmap for tracking queue readiness.
30///
31/// Each `Signal` represents a group of up to 64 queues, where each bit indicates
32/// whether the corresponding queue has work available. Multiple `Signal` instances
33/// are coordinated via a `SignalWaker` to form a complete two-level bitmap.
34///
35/// # Design
36///
37/// ```text
38/// Signal (64-bit AtomicU64)
39/// ┌───┬───┬───┬───┬─────┬───┐
40/// │ 0 │ 1 │ 0 │ 1 │ ... │ 0 │ Each bit = one queue's scheduled state
41/// └───┴───┴───┴───┴─────┴───┘
42/// Q0 Q1 Q2 Q3 ... Q63
43/// ```
44///
45/// # Cache Optimization
46///
47/// The inner state is wrapped in `Arc<CachePadded<...>>` to:
48/// - Allow cheap cloning (single pointer copy)
49/// - Prevent false sharing between different signals
50/// - Optimize for hot paths (producers setting bits, executor clearing bits)
51///
52/// # Thread Safety
53///
54/// All operations use atomic instructions. Multiple producers can concurrently set
55/// bits (via `set()`), and the executor can concurrently acquire/clear bits (via
56/// `acquire()` or `try_acquire()`).
57///
58/// # Cloning
59///
60/// `Signal` is cheaply clonable via `Arc`. All clones share the same underlying
61/// atomic bitmap, making it suitable for distribution across multiple producer threads.
62#[derive(Clone)]
63pub struct Signal {
64 /// Shared, cache-line padded inner state.
65 inner: Arc<CachePadded<SignalInner>>,
66}
67
68impl Signal {
69 /// Returns the signal's index within the SignalWaker's signal array.
70 ///
71 /// This index is used to:
72 /// - Map this signal to the corresponding bit in the summary bitmap
73 /// - Identify which group of 64 queues this signal represents
74 ///
75 /// # Example
76 ///
77 /// ```ignore
78 /// let signal = Signal::with_index(5);
79 /// assert_eq!(signal.index(), 5);
80 /// // This signal controls queues 320-383 (5 * 64 through (5+1) * 64 - 1)
81 /// ```
82 #[inline(always)]
83 pub fn index(&self) -> u64 {
84 return self.inner.index;
85 }
86
87 /// Returns a reference to the underlying atomic value.
88 ///
89 /// Provides direct access to the 64-bit bitmap for advanced use cases
90 /// that need custom atomic operations beyond the provided methods.
91 ///
92 /// # Use Cases
93 ///
94 /// - Custom bit manipulation patterns
95 /// - Debugging (observing raw bitmap state)
96 /// - Integration with external synchronization primitives
97 ///
98 /// # Example
99 ///
100 /// ```ignore
101 /// let signal = Signal::new();
102 /// signal.set(10);
103 /// let raw_value = signal.value().load(Ordering::Relaxed);
104 /// assert_eq!(raw_value & (1 << 10), 1 << 10); // Bit 10 is set
105 /// ```
106 #[inline(always)]
107 pub fn value(&self) -> &AtomicU64 {
108 &self.inner.value
109 }
110}
111
112/// Internal state for a Signal, cache-line padded to prevent false sharing.
113///
114/// # Fields
115///
116/// - `index`: The signal's position in the SignalWaker's signal array (0-61)
117/// - `value`: 64-bit atomic bitmap where each bit represents a queue's readiness
118struct SignalInner {
119 /// Signal index in the SignalWaker array.
120 pub index: u64,
121 /// Atomic bitmap tracking up to 64 queues (bit N = queue ready state).
122 value: AtomicU64,
123}
124
125impl Signal {
126 /// Creates a new Signal with index 0 and all bits cleared.
127 ///
128 /// # Example
129 ///
130 /// ```ignore
131 /// let signal = Signal::new();
132 /// assert_eq!(signal.index(), 0);
133 /// assert!(signal.is_empty());
134 /// ```
135 pub fn new() -> Self {
136 Self::with_value(0, 0)
137 }
138
139 /// Creates a new Signal with the specified index and all bits cleared.
140 ///
141 /// # Parameters
142 ///
143 /// - `index`: Position in the SignalWaker's signal array (0-61)
144 ///
145 /// # Example
146 ///
147 /// ```ignore
148 /// let signal = Signal::with_index(10);
149 /// assert_eq!(signal.index(), 10);
150 /// assert!(signal.is_empty());
151 /// ```
152 pub fn with_index(index: u64) -> Self {
153 debug_assert!(
154 index < SIGNAL_CAPACITY as u64,
155 "signal index {} exceeds status summary capacity {}",
156 index,
157 SIGNAL_CAPACITY
158 );
159 Self::with_value(index, 0)
160 }
161
162 /// Creates a new Signal with the specified index and initial bitmap value.
163 ///
164 /// This is primarily used for testing or restoring state. In normal operation,
165 /// signals start with all bits cleared.
166 ///
167 /// # Parameters
168 ///
169 /// - `index`: Position in the SignalWaker's signal array (0-61)
170 /// - `value`: Initial 64-bit bitmap value
171 ///
172 /// # Example
173 ///
174 /// ```ignore
175 /// // Create signal with bits 0, 5, and 10 already set
176 /// let signal = Signal::with_value(3, (1 << 0) | (1 << 5) | (1 << 10));
177 /// assert_eq!(signal.size(), 3);
178 /// assert!(signal.is_set(0));
179 /// assert!(signal.is_set(5));
180 /// assert!(signal.is_set(10));
181 /// ```
182 pub fn with_value(index: u64, value: u64) -> Self {
183 debug_assert!(
184 index < SIGNAL_CAPACITY as u64,
185 "signal index {} exceeds status summary capacity {}",
186 index,
187 SIGNAL_CAPACITY
188 );
189 Self {
190 inner: Arc::new(CachePadded::new(SignalInner {
191 index,
192 value: AtomicU64::new(value),
193 })),
194 }
195 }
196
197 /// Loads the current bitmap value with the specified memory ordering.
198 ///
199 /// # Parameters
200 ///
201 /// - `ordering`: Memory ordering for the load operation
202 ///
203 /// # Returns
204 ///
205 /// The 64-bit bitmap value where each set bit represents a ready queue.
206 ///
207 /// # Example
208 ///
209 /// ```ignore
210 /// signal.set(5);
211 /// signal.set(10);
212 /// let value = signal.load(Ordering::Acquire);
213 /// assert_eq!(value, (1 << 5) | (1 << 10));
214 /// ```
215 #[inline(always)]
216 pub fn load(&self, ordering: Ordering) -> u64 {
217 self.inner.value.load(ordering)
218 }
219
220 /// Returns the number of set bits (ready queues) in this signal.
221 ///
222 /// Equivalent to `popcount(bitmap)`, this counts how many queues in this
223 /// signal group currently have work available.
224 ///
225 /// # Performance
226 ///
227 /// Uses the `POPCNT` instruction on x86_64 (~3 cycles), making it very efficient.
228 ///
229 /// # Example
230 ///
231 /// ```ignore
232 /// let signal = Signal::new();
233 /// signal.set(0);
234 /// signal.set(5);
235 /// signal.set(63);
236 /// assert_eq!(signal.size(), 3);
237 /// ```
238 #[inline(always)]
239 pub fn size(&self) -> u64 {
240 self.load(Ordering::Relaxed).count_ones() as u64
241 }
242
243 /// Returns `true` if no bits are set (no ready queues).
244 ///
245 /// This is more efficient than `size() == 0` for checking emptiness.
246 ///
247 /// # Example
248 ///
249 /// ```ignore
250 /// let signal = Signal::new();
251 /// assert!(signal.is_empty());
252 /// signal.set(10);
253 /// assert!(!signal.is_empty());
254 /// ```
255 #[inline(always)]
256 pub fn is_empty(&self) -> bool {
257 self.load(Ordering::Relaxed).count_ones() == 0
258 }
259
260 /// Atomically sets a bit in the bitmap using fetch_or.
261 ///
262 /// This is the primary method for producers to signal that a queue has work available.
263 ///
264 /// # Parameters
265 ///
266 /// - `index`: Bit position to set (0-63)
267 ///
268 /// # Returns
269 ///
270 /// A tuple `(was_empty, was_set)`:
271 /// - `was_empty`: `true` if this was the first bit set (signal transitioned from empty to non-empty)
272 /// - `was_set`: `true` if the bit was successfully set (wasn't already set)
273 ///
274 /// # Use Cases
275 ///
276 /// The return values are used for summary bitmap updates:
277 /// ```ignore
278 /// let (was_empty, was_set) = signal.set(queue_bit);
279 /// if was_empty && was_set {
280 /// // This signal was empty, now has work - update summary
281 /// waker.mark_active(signal.index());
282 /// }
283 /// ```
284 ///
285 /// # Performance
286 ///
287 /// ~5-10 ns (one atomic fetch_or operation)
288 ///
289 /// # Example
290 ///
291 /// ```ignore
292 /// let signal = Signal::new();
293 /// let (was_empty, was_set) = signal.set(5);
294 /// assert!(was_empty); // Signal was empty
295 /// assert!(was_set); // Bit 5 was not previously set
296 ///
297 /// let (was_empty, was_set) = signal.set(5);
298 /// assert!(!was_empty); // Signal already had bits set
299 /// assert!(!was_set); // Bit 5 was already set
300 /// ```
301 #[inline(always)]
302 pub fn set(&self, index: u64) -> (bool, bool) {
303 crate::bits::set(&self.inner.value, index)
304 }
305
306 /// Atomically sets a bit using a precomputed bitmask.
307 ///
308 /// Similar to `set()`, but takes a precomputed `1 << index` value for cases
309 /// where the bit position is computed once and reused.
310 ///
311 /// # Parameters
312 ///
313 /// - `bit`: Precomputed bitmask with exactly one bit set (e.g., `1 << 5`)
314 ///
315 /// # Returns
316 ///
317 /// The previous bitmap value before setting the bit.
318 ///
319 /// # Example
320 ///
321 /// ```ignore
322 /// let signal = Signal::new();
323 /// let bit_mask = 1u64 << 10;
324 /// let prev = signal.set_with_bit(bit_mask);
325 /// assert_eq!(prev, 0); // Was empty
326 /// assert!(signal.is_set(10));
327 /// ```
328 #[inline(always)]
329 pub fn set_with_bit(&self, bit: u64) -> u64 {
330 crate::bits::set_with_bit(&self.inner.value, bit)
331 }
332
333 /// Atomically clears a bit if it is currently set (CAS-based).
334 ///
335 /// This is the primary method for the executor to claim ownership of a ready queue.
336 /// Uses a CAS loop to ensure the bit is cleared atomically.
337 ///
338 /// # Parameters
339 ///
340 /// - `index`: Bit position to clear (0-63)
341 ///
342 /// # Returns
343 ///
344 /// - `true`: Bit was set and has been successfully cleared (queue acquired)
345 /// - `false`: Bit was not set (queue not ready or already acquired)
346 ///
347 /// # Use Cases
348 ///
349 /// ```ignore
350 /// // Executor loop
351 /// if signal.acquire(queue_bit) {
352 /// // Successfully acquired queue, process it
353 /// process_queue(queue_id);
354 /// }
355 /// ```
356 ///
357 /// # Performance
358 ///
359 /// ~10-20 ns (CAS loop, typically succeeds on first iteration)
360 ///
361 /// # Example
362 ///
363 /// ```ignore
364 /// let signal = Signal::new();
365 /// signal.set(5);
366 /// assert!(signal.acquire(5)); // Successfully cleared bit 5
367 /// assert!(!signal.acquire(5)); // Bit already clear, returns false
368 /// ```
369 #[inline(always)]
370 pub fn acquire(&self, index: u64) -> bool {
371 crate::bits::acquire(&self.inner.value, index)
372 }
373
374 /// Attempts to atomically clear a bit, returning detailed state information.
375 ///
376 /// Similar to `acquire()`, but provides additional information about the
377 /// before/after state of the bitmap, useful for debugging or advanced scheduling.
378 ///
379 /// # Parameters
380 ///
381 /// - `index`: Bit position to clear (0-63)
382 ///
383 /// # Returns
384 ///
385 /// A tuple `(before, after, success)`:
386 /// - `before`: Bitmap value before the operation
387 /// - `after`: Bitmap value after the operation (if successful)
388 /// - `success`: `true` if the bit was cleared, `false` if it wasn't set
389 ///
390 /// # Example
391 ///
392 /// ```ignore
393 /// let signal = Signal::with_value(0, 0b101010); // Bits 1, 3, 5 set
394 /// let (before, after, success) = signal.try_acquire(3);
395 /// assert_eq!(before, 0b101010);
396 /// assert_eq!(after, 0b100010); // Bit 3 cleared
397 /// assert!(success);
398 /// ```
399 #[inline(always)]
400 pub fn try_acquire(&self, index: u64) -> (u64, u64, bool) {
401 crate::bits::try_acquire(&self.inner.value, index)
402 }
403
404 /// Checks if a specific bit is set without modifying the bitmap.
405 ///
406 /// Non-atomic read followed by bit test. Suitable for non-critical checks
407 /// where races are acceptable.
408 ///
409 /// # Parameters
410 ///
411 /// - `index`: Bit position to check (0-63)
412 ///
413 /// # Returns
414 ///
415 /// `true` if the bit is set, `false` otherwise.
416 ///
417 /// # Example
418 ///
419 /// ```ignore
420 /// let signal = Signal::new();
421 /// assert!(!signal.is_set(5));
422 /// signal.set(5);
423 /// assert!(signal.is_set(5));
424 /// ```
425 #[inline(always)]
426 pub fn is_set(&self, index: u64) -> bool {
427 crate::bits::is_set(&self.inner.value, index)
428 }
429}
430
431/// A cache-optimized waker that packs queue summaries and control flags into a single status word.
432#[repr(align(64))]
433pub struct AsyncSignalWaker {
434 /// **Status bitmap**: Queue-word summary bits (0‒61) plus control flags.
435 ///
436 /// - Bits 0‒63: Queue-word hot bits (`mark_active`, `try_unmark_if_empty`, etc.)
437 ///
438 /// Keeping everything in one atomic avoids races between independent u64s.
439 status: CachePadded<AtomicU64>,
440
441 /// **Counting semaphore**: Number of threads that should be awake (available permits).
442 ///
443 /// Incremented by producers when queues become active (0→1 transitions).
444 /// Decremented by consumers via `try_acquire()` or `acquire()`.
445 ///
446 /// **Critical invariant**: Each queue empty→non-empty transition adds exactly
447 /// 1 permit, preventing lost wakeups. Permits accumulate if no threads are
448 /// sleeping, ensuring late arrivals find work.
449 ///
450 /// - Acquire: `AcqRel` (synchronizes with Release from producers)
451 /// - Release: `Release` (makes queue data visible to acquirers)
452 permits: CachePadded<AtomicU64>,
453}
454
455impl AsyncSignalWaker {
456 pub fn new() -> Self {
457 Self {
458 status: CachePadded::new(AtomicU64::new(0)),
459 permits: CachePadded::new(AtomicU64::new(0)),
460 }
461 }
462
463 // ────────────────────────────────────────────────────────────────────────────
464 // PRODUCER-SIDE API
465 // ────────────────────────────────────────────────────────────────────────────
466
467 /// Marks a signal word at `index` (0..63) as active in the summary.
468 ///
469 /// Called by producers when a queue transitions from empty to non-empty.
470 /// If this is a **0→1 transition** (bit was previously clear), adds 1 permit
471 /// and wakes 1 sleeping thread.
472 ///
473 /// # Fast Path
474 ///
475 /// If the bit is already set, returns immediately without touching atomics.
476 /// This is the common case when multiple producers push to the same word group.
477 ///
478 /// # Arguments
479 ///
480 /// * `index` - Word index (0..63) to mark as active
481 ///
482 /// # Example
483 ///
484 /// ```ignore
485 /// // Producer pushes to queue 5 in word 0
486 /// let (was_empty, was_set) = signal.set(5);
487 /// if was_empty && was_set {
488 /// waker.mark_active(0); // Wake 1 consumer
489 /// }
490 /// ```
491 #[inline]
492 pub fn mark_active(&self, index: u64) {
493 debug_assert!(
494 index < STATUS_SUMMARY_BITS as u64,
495 "summary index {} exceeds {} bits",
496 index,
497 STATUS_SUMMARY_BITS
498 );
499 let mask = 1u64 << index;
500 if self.status.load(Ordering::Relaxed) & mask != 0 {
501 return;
502 }
503 let prev = self.status.fetch_or(mask, Ordering::Relaxed);
504 if prev & mask == 0 {
505 self.release(1);
506 }
507 }
508
509 /// Batch version of `mark_active()`: marks multiple words as active at once.
510 ///
511 /// Efficiently handles multiple queues becoming active simultaneously.
512 /// Releases exactly `k` permits, where `k` is the number of **0→1 transitions**
513 /// (newly-active words).
514 ///
515 /// # Optimization
516 ///
517 /// Uses a single `fetch_or` instead of calling `mark_active()` in a loop,
518 /// reducing atomic contention when many queues activate together.
519 ///
520 /// # Arguments
521 ///
522 /// * `mask` - Bitmap of words to mark active (bit `i` = word `i`)
523 ///
524 /// # Example
525 ///
526 /// ```ignore
527 /// // Multiple queues became active
528 /// let mut active_words = 0u64;
529 /// for word_idx in 0..64 {
530 /// if word_became_active(word_idx) {
531 /// active_words |= 1 << word_idx;
532 /// }
533 /// }
534 /// waker.mark_active_mask(active_words); // Single atomic op
535 /// ```
536 #[inline]
537 pub fn mark_active_mask(&self, mask: u64) {
538 let summary_mask = mask;
539 if summary_mask == 0 {
540 return;
541 }
542 let prev = self.status.fetch_or(summary_mask, Ordering::Relaxed);
543 let newly = (!prev) & summary_mask;
544 let k = newly.count_ones() as usize;
545 if k > 0 {
546 self.release(k);
547 }
548 }
549
550 /// Clears the summary bit for `bit_index` if the corresponding signal word is empty.
551 ///
552 /// This is **lazy cleanup** - consumers call this after draining a word to prevent
553 /// false positives in future `snapshot_summary()` calls. However, it's safe to skip
554 /// this; the system remains correct with stale summary bits.
555 ///
556 /// # Arguments
557 ///
558 /// * `bit_index` - Word index (0..63) to potentially clear
559 /// * `signal` - The actual signal word to check for emptiness
560 ///
561 /// # Example
562 ///
563 /// ```ignore
564 /// // After draining all queues in word 3
565 /// waker.try_unmark_if_empty(3, &signal_word_3);
566 /// ```
567 #[inline]
568 pub fn try_unmark_if_empty(&self, bit_index: u64, signal: &AtomicU64) {
569 debug_assert!(
570 bit_index < STATUS_SUMMARY_BITS as u64,
571 "summary index {} exceeds {} bits",
572 bit_index,
573 STATUS_SUMMARY_BITS
574 );
575 let mask = 1u64 << bit_index;
576
577 loop {
578 if signal.load(Ordering::Acquire) != 0 {
579 return;
580 }
581
582 let snapshot = self.status.load(Ordering::Relaxed);
583 if snapshot & mask == 0 {
584 return;
585 }
586
587 match self.status.compare_exchange(
588 snapshot,
589 snapshot & !mask,
590 Ordering::AcqRel,
591 Ordering::Relaxed,
592 ) {
593 Ok(_) => {
594 if signal.load(Ordering::Acquire) != 0 {
595 // Re-arm summary and release if work arrived concurrently.
596 self.mark_active(bit_index);
597 }
598 return;
599 }
600 Err(actual) => {
601 if actual & mask == 0 {
602 return;
603 }
604 }
605 }
606 }
607 }
608
609 /// Unconditionally clears the summary bit for `bit_index`.
610 ///
611 /// Faster than `try_unmark_if_empty()` when the caller already knows
612 /// the word is empty (avoids checking the signal word).
613 ///
614 /// # Arguments
615 ///
616 /// * `bit_index` - Word index (0..63) to clear
617 #[inline]
618 pub fn try_unmark(&self, bit_index: u64) {
619 debug_assert!(
620 bit_index < STATUS_SUMMARY_BITS as u64,
621 "summary index {} exceeds {} bits",
622 bit_index,
623 STATUS_SUMMARY_BITS
624 );
625 let mask = 1u64 << bit_index;
626 if self.status.load(Ordering::Relaxed) & mask != 0 {
627 self.status
628 .fetch_and(!(1u64 << bit_index), Ordering::Relaxed);
629 }
630 }
631
632 // ────────────────────────────────────────────────────────────────────────────
633 // CONSUMER-SIDE API
634 // ────────────────────────────────────────────────────────────────────────────
635
636 /// Returns a snapshot of the current summary bitmap.
637 ///
638 /// Consumers use this to quickly identify which word groups have potential work.
639 /// If bit `i` is set, word `i` *may* have active queues (false positives possible
640 /// due to lazy cleanup).
641 ///
642 /// # Memory Ordering
643 ///
644 /// Uses `Relaxed` because this is a hint, not a synchronization point. The actual
645 /// queue data is synchronized via acquire/release on the permits counter.
646 ///
647 /// # Returns
648 ///
649 /// A u64 bitmap where bit `i` indicates word `i` has potential work.
650 ///
651 /// # Example
652 ///
653 /// ```ignore
654 /// let summary = waker.snapshot_summary();
655 /// for word_idx in 0..64 {
656 /// if summary & (1 << word_idx) != 0 {
657 /// // Check queues in word_idx
658 /// }
659 /// }
660 /// ```
661 #[inline]
662 pub fn snapshot_summary(&self) -> u64 {
663 self.status.load(Ordering::Relaxed)
664 }
665
666 /// Finds the nearest set bit to `nearest_to_index` in the summary.
667 ///
668 /// Useful for maintaining **locality**: continue working on queues near
669 /// the last processed index, improving cache behavior.
670 ///
671 /// # Arguments
672 ///
673 /// * `nearest_to_index` - Preferred starting point (0..63)
674 ///
675 /// # Returns
676 ///
677 /// The index of the nearest set bit, or undefined if summary is empty.
678 ///
679 /// # Example
680 ///
681 /// ```ignore
682 /// let mut last_word = 0;
683 /// loop {
684 /// last_word = waker.summary_select(last_word);
685 /// // Process queues in word last_word
686 /// }
687 /// ```
688 #[inline]
689 pub fn summary_select(&self, nearest_to_index: u64) -> u64 {
690 let summary = self.status.load(Ordering::Relaxed);
691 crate::bits::find_nearest(summary, nearest_to_index)
692 }
693
694 // ────────────────────────────────────────────────────────────────────────────
695 // PERMIT SYSTEM (Counting Semaphore)
696 // ────────────────────────────────────────────────────────────────────────────
697
698 /// Non-blocking attempt to acquire a permit.
699 ///
700 /// Atomically decrements the permit counter if available. This is the **lock-free
701 /// fast path** used by consumers before resorting to blocking.
702 ///
703 /// # Returns
704 ///
705 /// - `true` if a permit was consumed (consumer should process work)
706 /// - `false` if no permits available (queue likely empty)
707 ///
708 /// # Memory Ordering
709 ///
710 /// Uses `AcqRel` to synchronize with producers' `Release` in `release()`.
711 /// This ensures queue data written by producers is visible to this consumer.
712 ///
713 /// # Example
714 ///
715 /// ```ignore
716 /// if waker.try_acquire() {
717 /// // Process work (permit guarantees something is available)
718 /// } else {
719 /// // No work, maybe park or spin
720 /// }
721 /// ```
722 #[inline]
723 pub fn try_acquire(&self) -> bool {
724 self.permits
725 .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |p| p.checked_sub(1))
726 .is_ok()
727 }
728
729 /// Blocking acquire: parks the thread until a permit becomes available.
730 ///
731 /// Tries the fast path first (`try_acquire()`), then falls back to parking
732 /// on a condvar. Handles spurious wakeups by rechecking permits in a loop.
733 ///
734 /// # Blocking Behavior
735 ///
736 /// 1. Increment `sleepers` count
737 /// 2. Wait on condvar (releases mutex)
738 /// 3. Recheck permits after wakeup
739 /// 4. Decrement `sleepers` on exit
740 ///
741 /// # Panics
742 ///
743 /// Panics if the mutex or condvar is poisoned (indicates a panic in another thread
744 /// while holding the lock).
745 ///
746 /// # Example
747 ///
748 /// ```ignore
749 /// loop {
750 /// waker.acquire(); // Blocks until work available
751 /// process_work();
752 /// }
753 /// ```
754 pub fn acquire(&self) {
755 if self.try_acquire() {
756 return;
757 }
758 // TODO: Implement
759 }
760
761 /// Blocking acquire with timeout.
762 ///
763 /// Like `acquire()`, but returns after `timeout` if no permit becomes available.
764 /// Useful for implementing shutdown or periodic maintenance.
765 ///
766 /// # Arguments
767 ///
768 /// * `timeout` - Maximum duration to wait
769 ///
770 /// # Returns
771 ///
772 /// - `true` if a permit was acquired
773 /// - `false` if timed out without acquiring
774 ///
775 /// # Example
776 ///
777 /// ```ignore
778 /// use std::time::Duration;
779 ///
780 /// loop {
781 /// if waker.acquire_timeout(Duration::from_secs(1)) {
782 /// process_work();
783 /// } else {
784 /// // Timeout - check for shutdown signal
785 /// if should_shutdown() { break; }
786 /// }
787 /// }
788 /// ```
789 pub fn acquire_timeout(&self, timeout: Duration) -> bool {
790 if self.try_acquire() {
791 return true;
792 }
793 // TODO: Implement
794 false
795 }
796
797 /// Releases `n` permits and wakes up to `n` sleeping threads.
798 ///
799 /// Called by producers (indirectly via `mark_active`) when queues become active.
800 /// Uses **targeted wakeups**: only notifies up to `min(n, sleepers)` threads,
801 /// avoiding unnecessary `notify_one()` calls.
802 ///
803 /// # Permit Accumulation
804 ///
805 /// If no threads are sleeping, permits accumulate for future consumers.
806 /// This guarantees **no lost wakeups**: late-arriving consumers find work immediately.
807 ///
808 /// # Arguments
809 ///
810 /// * `n` - Number of permits to release (typically 1 or count of newly-active queues)
811 ///
812 /// # Memory Ordering
813 ///
814 /// Uses `Release` to ensure queue data is visible to consumers who `Acquire`
815 /// via `try_acquire()`.
816 ///
817 /// # Example
818 ///
819 /// ```ignore
820 /// // Producer activates 3 queues
821 /// waker.release(3); // Wakes up to 3 sleeping consumers
822 /// ```
823 #[inline]
824 pub fn release(&self, n: usize) {
825 if n == 0 {
826 return;
827 }
828 // TODO: Implement
829 }
830
831 // ────────────────────────────────────────────────────────────────────────────
832 // INSPECTION / DEBUGGING
833 // ────────────────────────────────────────────────────────────────────────────
834
835 /// Returns the current summary bitmap.
836 ///
837 /// Useful for debugging or metrics. Equivalent to `snapshot_summary()` but
838 /// uses `Acquire` ordering for stronger visibility guarantees.
839 #[inline]
840 pub fn summary_bits(&self) -> u64 {
841 self.status.load(Ordering::Acquire)
842 }
843
844 /// Returns the current number of available permits.
845 ///
846 /// Useful for monitoring queue health or load. A high permit count may
847 /// indicate consumers are falling behind.
848 #[inline]
849 pub fn permits(&self) -> u64 {
850 self.permits.load(Ordering::Acquire)
851 }
852}
853
854impl Default for AsyncSignalWaker {
855 fn default() -> Self {
856 Self::new()
857 }
858}
859
860// ──────────────────────────────────────────────────────────────────────────────
861// SignalGate State Machine Constants
862// ──────────────────────────────────────────────────────────────────────────────
863
864/// Queue has no work scheduled and is not being processed.
865///
866/// This is the initial state. Transitions to SCHEDULED when work is enqueued.
867pub const IDLE: u8 = 0;
868
869/// Queue has work available and is waiting for the executor to process it.
870///
871/// Transitions from IDLE when `schedule()` is called. Transitions to EXECUTING
872/// when the executor calls `begin()`.
873pub const SCHEDULED: u8 = 1;
874
875/// Queue is currently being processed by the executor.
876///
877/// Transitions from SCHEDULED when executor calls `begin()`. Transitions back to
878/// IDLE (via `finish()`) or SCHEDULED (via `finish_and_schedule()`) when processing completes.
879pub const EXECUTING: u8 = 2;
880
881/// Per-queue gate coordinating scheduling between producers and executor.
882///
883/// `SignalGate` implements a lock-free state machine that prevents redundant scheduling
884/// and ensures proper handoff of work from producers to the executor. Each queue has
885/// exactly one `SignalGate` instance.
886///
887/// # State Machine
888///
889/// ```text
890/// ┌──────────────────────────────────────────────────────────────┐
891/// │ │
892/// │ IDLE (0) ──schedule()──▶ SCHEDULED (1) ──begin()──▶ EXECUTING (2)
893/// │ ▲ │ │
894/// │ │ │ │
895/// │ └────────finish()────────────┴───────────────────────────┘
896/// │ │ │
897/// │ └──finish_and_schedule()────┘
898/// │ │ │
899/// │ ▼ │
900/// │ SCHEDULED (1) │
901/// └──────────────────────────────────────────────────────────────┘
902/// ```
903///
904/// # State Transitions
905///
906/// - **IDLE → SCHEDULED**: Producer calls `schedule()` after enqueuing items
907/// - **SCHEDULED → EXECUTING**: Executor calls `begin()` before processing
908/// - **EXECUTING → IDLE**: Executor calls `finish()` when done (queue empty)
909/// - **EXECUTING → SCHEDULED**: Executor calls `finish_and_schedule()` when more work remains
910/// - **Any → SCHEDULED**: Concurrent `schedule()` during EXECUTING sets flag, processed in `finish()`
911///
912/// # Concurrency Guarantees
913///
914/// - **Multiple producers**: Safe (atomic flags ensure only one schedule succeeds)
915/// - **Producer + executor**: Safe (state transitions are atomic and properly ordered)
916/// - **Multiple executors**: NOT SAFE (single-threaded consumption assumption)
917///
918/// # Integration with Signal and SignalWaker
919///
920/// When a queue transitions IDLE → SCHEDULED:
921/// 1. Sets bit in the associated `Signal` (64-bit bitmap)
922/// 2. If signal was empty, sets bit in `SignalWaker` summary (64-bit bitmap)
923/// 3. May wake sleeping executor thread via permit system
924///
925/// # Memory Layout
926///
927/// ```text
928/// SignalGate (40 bytes on x86_64)
929/// ┌─────────────┬───────────┬─────────┬─────────┐
930/// │ flags (1B) │ bit_index │ signal │ waker │
931/// │ AtomicU8 │ u64 (8B) │ Arc (8B)│ Arc (8B)│
932/// └─────────────┴───────────┴─────────┴─────────┘
933/// ```
934///
935/// # Example Usage
936///
937/// ```ignore
938/// // Setup
939/// let waker = Arc::new(SignalWaker::new());
940/// let signal = Signal::with_index(5);
941/// let gate = SignalGate::new(10, signal, waker);
942///
943/// // Producer thread
944/// queue.try_push(item)?;
945/// gate.schedule(); // Signal work available
946///
947/// // Executor thread
948/// gate.begin(); // Mark as executing
949/// while let Some(item) = queue.try_pop() {
950/// process(item);
951/// }
952/// if queue.is_empty() {
953/// gate.finish(); // Done, back to IDLE
954/// } else {
955/// gate.finish_and_schedule(); // More work, stay SCHEDULED
956/// }
957/// ```
958pub struct AsyncSignalGate {
959 /// Atomic state flags (IDLE, SCHEDULED, EXECUTING).
960 ///
961 /// Uses bitwise OR to combine flags, allowing detection of concurrent schedules
962 /// during execution (EXECUTING | SCHEDULED = 3).
963 flags: AtomicU8,
964
965 /// Bit position within the Signal's 64-bit bitmap (0-63).
966 ///
967 /// This queue's ready state is represented by bit `1 << bit_index` in the signal.
968 bit_index: u8,
969
970 /// Reference to the Signal word containing this queue's bit.
971 ///
972 /// Shared among up to 64 queues (all queues in the same signal group).
973 signal: Signal,
974
975 /// Reference to the top-level SignalWaker for summary updates.
976 ///
977 /// Shared among all queues in the executor (up to 4096 queues).
978 waker: Arc<AsyncSignalWaker>,
979}
980
981impl AsyncSignalGate {
982 /// Creates a new SignalGate in the IDLE state.
983 ///
984 /// # Parameters
985 ///
986 /// - `bit_index`: Position of this queue's bit within the signal (0-63)
987 /// - `signal`: Reference to the Signal word containing this queue's bit
988 /// - `waker`: Reference to the SignalWaker for summary updates
989 ///
990 /// # Example
991 ///
992 /// ```ignore
993 /// let waker = Arc::new(SignalWaker::new());
994 /// let signal = Signal::with_index(0);
995 /// let gate = SignalGate::new(5, signal, waker);
996 /// // This gate controls bit 5 in signal[0]
997 /// ```
998 pub fn new(bit_index: u8, signal: Signal, waker: Arc<AsyncSignalWaker>) -> Self {
999 Self {
1000 flags: AtomicU8::new(IDLE),
1001 bit_index,
1002 signal,
1003 waker,
1004 }
1005 }
1006
1007 /// Attempts to schedule this queue for execution (IDLE → SCHEDULED transition).
1008 ///
1009 /// Called by producers after enqueuing items to notify the executor. Uses atomic
1010 /// operations to ensure only one successful schedule per work batch.
1011 ///
1012 /// # Algorithm
1013 ///
1014 /// 1. **Fast check**: If already SCHEDULED, return false immediately (idempotent)
1015 /// 2. **Atomic set**: `fetch_or(SCHEDULED)` to set the SCHEDULED flag
1016 /// 3. **State check**: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
1017 /// - Set bit in signal word via `signal.set(bit_index)`
1018 /// - If signal transitioned from empty, update summary via `waker.mark_active()`
1019 /// - Return true (successful schedule)
1020 /// 4. **Otherwise**: Return false (already scheduled or executing)
1021 ///
1022 /// # Returns
1023 ///
1024 /// - `true`: Successfully transitioned from IDLE to SCHEDULED (work will be processed)
1025 /// - `false`: Already scheduled/executing, or concurrent schedule won (idempotent)
1026 ///
1027 /// # Concurrent Behavior
1028 ///
1029 /// - **Multiple producers**: Only the first `schedule()` succeeds (returns true)
1030 /// - **During EXECUTING**: Sets SCHEDULED flag, which `finish()` will detect and reschedule
1031 ///
1032 /// # Memory Ordering
1033 ///
1034 /// - Initial load: `Acquire` (see latest state)
1035 /// - `fetch_or`: `Release` (publish enqueued items to executor)
1036 ///
1037 /// # Performance
1038 ///
1039 /// - **Already scheduled**: ~2-3 ns (fast path, single atomic load)
1040 /// - **Successful schedule**: ~10-20 ns (fetch_or + signal update + potential summary update)
1041 ///
1042 /// # Example
1043 ///
1044 /// ```ignore
1045 /// // Producer 1
1046 /// queue.try_push(item)?;
1047 /// if gate.schedule() {
1048 /// println!("Successfully scheduled"); // First producer
1049 /// }
1050 ///
1051 /// // Producer 2 (concurrent)
1052 /// queue.try_push(another_item)?;
1053 /// if !gate.schedule() {
1054 /// println!("Already scheduled"); // Idempotent, no action needed
1055 /// }
1056 /// ```
1057 #[inline(always)]
1058 pub fn schedule(&self) -> bool {
1059 if (self.flags.load(Ordering::Acquire) & SCHEDULED) != IDLE {
1060 return false;
1061 }
1062
1063 let previous_flags = self.flags.fetch_or(SCHEDULED, Ordering::Release);
1064 let scheduled_nor_executing = (previous_flags & (SCHEDULED | EXECUTING)) == IDLE;
1065
1066 if scheduled_nor_executing {
1067 let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
1068 if was_empty && was_set {
1069 self.waker.mark_active(self.signal.index());
1070 }
1071 true
1072 } else {
1073 false
1074 }
1075 }
1076
1077 /// Marks the queue as EXECUTING (SCHEDULED → EXECUTING transition).
1078 ///
1079 /// Called by the executor when it begins processing this queue. This transition
1080 /// prevents redundant scheduling while work is being processed.
1081 ///
1082 /// # State Transition
1083 ///
1084 /// Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.
1085 /// ```text
1086 /// Before: SCHEDULED (1)
1087 /// After: EXECUTING (2)
1088 /// ```
1089 ///
1090 /// If a producer calls `schedule()` after `begin()` but before `finish()`, the
1091 /// SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED),
1092 /// which `finish()` detects and handles.
1093 ///
1094 /// # Memory Ordering
1095 ///
1096 /// Uses `Ordering::Release` to ensure the state change is visible to concurrent
1097 /// producers calling `schedule()`.
1098 ///
1099 /// # Performance
1100 ///
1101 /// ~1-2 ns (single atomic store)
1102 ///
1103 /// # Example
1104 ///
1105 /// ```ignore
1106 /// // Executor discovers ready queue
1107 /// if signal.acquire(queue_bit) {
1108 /// gate.begin(); // Mark as executing
1109 /// process_queue();
1110 /// gate.finish();
1111 /// }
1112 /// ```
1113 #[inline(always)]
1114 pub fn mark(&self) {
1115 self.flags.store(EXECUTING, Ordering::Release);
1116 }
1117
1118 /// Marks the queue as IDLE and handles concurrent schedules (EXECUTING → IDLE/SCHEDULED).
1119 ///
1120 /// Called by the executor after processing a batch of items. Automatically detects
1121 /// if new work arrived during processing (SCHEDULED flag set concurrently) and
1122 /// reschedules if needed.
1123 ///
1124 /// # Algorithm
1125 ///
1126 /// 1. **Clear EXECUTING**: `fetch_sub(EXECUTING)` atomically transitions to IDLE
1127 /// 2. **Check SCHEDULED**: If the SCHEDULED flag is set in the result:
1128 /// - Means a producer called `schedule()` during execution
1129 /// - Re-set the signal bit to ensure executor sees the work
1130 /// - Queue remains/becomes SCHEDULED
1131 ///
1132 /// # Automatic Rescheduling
1133 ///
1134 /// This method implements a key correctness property: if a producer enqueues work
1135 /// while the executor is processing, that work will not be lost. The SCHEDULED flag
1136 /// acts as a handoff mechanism.
1137 ///
1138 /// ```text
1139 /// Timeline:
1140 /// T0: Executor calls begin() → EXECUTING (2)
1141 /// T1: Producer calls schedule() → EXECUTING | SCHEDULED (3)
1142 /// T2: Executor calls finish() → SCHEDULED (1) [bit re-set in signal]
1143 /// T3: Executor sees bit, processes → ...
1144 /// ```
1145 ///
1146 /// # Memory Ordering
1147 ///
1148 /// Uses `Ordering::AcqRel`:
1149 /// - **Acquire**: See all producer writes (enqueued items)
1150 /// - **Release**: Publish state transition to future readers
1151 ///
1152 /// # Performance
1153 ///
1154 /// - **No concurrent schedule**: ~2-3 ns (fetch_sub only)
1155 /// - **With concurrent schedule**: ~10-15 ns (fetch_sub + signal.set)
1156 ///
1157 /// # Example
1158 ///
1159 /// ```ignore
1160 /// gate.begin();
1161 /// while let Some(item) = queue.try_pop() {
1162 /// process(item);
1163 /// }
1164 /// gate.finish(); // Automatically reschedules if more work arrived
1165 /// ```
1166 #[inline(always)]
1167 pub fn unmark(&self) {
1168 let after_flags = self.flags.fetch_sub(EXECUTING, Ordering::AcqRel);
1169 if after_flags & SCHEDULED != IDLE {
1170 self.signal.set(self.bit_index as u64);
1171 }
1172 }
1173
1174 /// Atomically marks the queue as SCHEDULED, ensuring re-execution.
1175 ///
1176 /// Called by the executor when it knows more work exists but wants to yield the
1177 /// timeslice for fairness. This is an optimization over `finish()` followed by
1178 /// external `schedule()`.
1179 ///
1180 /// # Use Cases
1181 ///
1182 /// 1. **Batch size limiting**: Process N items, then yield to other queues
1183 /// 2. **Fairness**: Prevent queue starvation by rotating execution
1184 /// 3. **Latency control**: Ensure all queues get regular timeslices
1185 ///
1186 /// # Algorithm
1187 ///
1188 /// 1. **Set state**: Store SCHEDULED unconditionally
1189 /// 2. **Update signal**: Set bit in signal word
1190 /// 3. **Update summary**: If signal was empty, mark active in waker
1191 ///
1192 /// # Comparison with finish() + schedule()
1193 ///
1194 /// ```ignore
1195 /// // Separate calls (2 atomic ops)
1196 /// gate.finish(); // EXECUTING → IDLE
1197 /// gate.schedule(); // IDLE → SCHEDULED
1198 ///
1199 /// // Combined call (1 atomic op + signal update)
1200 /// gate.finish_and_schedule(); // EXECUTING → SCHEDULED
1201 /// ```
1202 ///
1203 /// # Memory Ordering
1204 ///
1205 /// Uses `Ordering::Release` to publish both the state change and enqueued items.
1206 ///
1207 /// # Performance
1208 ///
1209 /// ~10-15 ns (store + signal.set + potential summary update)
1210 ///
1211 /// # Example
1212 ///
1213 /// ```ignore
1214 /// gate.begin();
1215 /// let mut processed = 0;
1216 /// while processed < BATCH_SIZE {
1217 /// if let Some(item) = queue.try_pop() {
1218 /// process(item);
1219 /// processed += 1;
1220 /// } else {
1221 /// break;
1222 /// }
1223 /// }
1224 ///
1225 /// if queue.len() > 0 {
1226 /// gate.finish_and_schedule(); // More work, stay scheduled
1227 /// } else {
1228 /// gate.finish(); // Done, go idle
1229 /// }
1230 /// ```
1231 #[inline(always)]
1232 pub fn unmark_and_schedule(&self) {
1233 self.flags.store(SCHEDULED, Ordering::Release);
1234 let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
1235 if was_empty && was_set {
1236 self.waker.mark_active(self.signal.index());
1237 }
1238 }
1239}
1240
1241impl crate::spsc::SignalSchedule for AsyncSignalGate {
1242 fn schedule(&self) -> bool {
1243 self.schedule()
1244 }
1245
1246 fn mark(&self) {
1247 self.mark();
1248 }
1249
1250 fn unmark(&self) {
1251 self.unmark();
1252 }
1253
1254 fn unmark_and_schedule(&self) {
1255 self.unmark_and_schedule();
1256 }
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261 use super::*;
1262}