maniac_runtime/sync/
signal.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
3
4use crate::utils::CachePadded;
5
6use std::sync::atomic::AtomicUsize;
7use std::sync::{Condvar, Mutex};
8use std::time::Duration;
9
10use crate::utils::bits::is_set;
11
12pub const STATUS_SUMMARY_BITS: u32 = 64;
13
14/// Number of bits per signal word (64-bit atomic).
15///
16/// Each signal word can track up to 64 queues. This matches the width of
17/// `AtomicU64` and provides efficient bit manipulation via hardware instructions
18/// (POPCNT, BSF, etc.).
19pub const SIGNAL_CAPACITY: u64 = 64;
20
21/// Bitmask for extracting bit index within a signal word.
22///
23/// Equivalent to `index % 64`, used for fast modulo via bitwise AND:
24/// ```ignore
25/// bit_index = queue_id & SIGNAL_MASK;
26/// ```
27pub const SIGNAL_MASK: u64 = SIGNAL_CAPACITY - 1;
28
29/// A cache-line padded 64-bit atomic bitmap for tracking queue readiness.
30///
31/// Each `Signal` represents a group of up to 64 queues, where each bit indicates
32/// whether the corresponding queue has work available. Multiple `Signal` instances
33/// are coordinated via a `SignalWaker` to form a complete two-level bitmap.
34///
35/// # Design
36///
37/// ```text
38/// Signal (64-bit AtomicU64)
39/// ┌───┬───┬───┬───┬─────┬───┐
40/// │ 0 │ 1 │ 0 │ 1 │ ... │ 0 │  Each bit = one queue's scheduled state
41/// └───┴───┴───┴───┴─────┴───┘
42///   Q0  Q1  Q2  Q3  ...  Q63
43/// ```
44///
45/// # Cache Optimization
46///
47/// The inner state is wrapped in `Arc<CachePadded<...>>` to:
48/// - Allow cheap cloning (single pointer copy)
49/// - Prevent false sharing between different signals
50/// - Optimize for hot paths (producers setting bits, executor clearing bits)
51///
52/// # Thread Safety
53///
54/// All operations use atomic instructions. Multiple producers can concurrently set
55/// bits (via `set()`), and the executor can concurrently acquire/clear bits (via
56/// `acquire()` or `try_acquire()`).
57///
58/// # Cloning
59///
60/// `Signal` is cheaply clonable via `Arc`. All clones share the same underlying
61/// atomic bitmap, making it suitable for distribution across multiple producer threads.
62#[derive(Clone)]
63pub struct Signal {
64    /// Shared, cache-line padded inner state.
65    inner: Arc<CachePadded<SignalInner>>,
66}
67
68impl Signal {
69    /// Returns the signal's index within the SignalWaker's signal array.
70    ///
71    /// This index is used to:
72    /// - Map this signal to the corresponding bit in the summary bitmap
73    /// - Identify which group of 64 queues this signal represents
74    ///
75    /// # Example
76    ///
77    /// ```ignore
78    /// let signal = Signal::with_index(5);
79    /// assert_eq!(signal.index(), 5);
80    /// // This signal controls queues 320-383 (5 * 64 through (5+1) * 64 - 1)
81    /// ```
82    #[inline(always)]
83    pub fn index(&self) -> u64 {
84        return self.inner.index;
85    }
86
87    /// Returns a reference to the underlying atomic value.
88    ///
89    /// Provides direct access to the 64-bit bitmap for advanced use cases
90    /// that need custom atomic operations beyond the provided methods.
91    ///
92    /// # Use Cases
93    ///
94    /// - Custom bit manipulation patterns
95    /// - Debugging (observing raw bitmap state)
96    /// - Integration with external synchronization primitives
97    ///
98    /// # Example
99    ///
100    /// ```ignore
101    /// let signal = Signal::new();
102    /// signal.set(10);
103    /// let raw_value = signal.value().load(Ordering::Relaxed);
104    /// assert_eq!(raw_value & (1 << 10), 1 << 10);  // Bit 10 is set
105    /// ```
106    #[inline(always)]
107    pub fn value(&self) -> &AtomicU64 {
108        &self.inner.value
109    }
110}
111
112/// Internal state for a Signal, cache-line padded to prevent false sharing.
113///
114/// # Fields
115///
116/// - `index`: The signal's position in the SignalWaker's signal array (0-61)
117/// - `value`: 64-bit atomic bitmap where each bit represents a queue's readiness
118struct SignalInner {
119    /// Signal index in the SignalWaker array.
120    pub index: u64,
121    /// Atomic bitmap tracking up to 64 queues (bit N = queue ready state).
122    value: AtomicU64,
123}
124
125impl Signal {
126    /// Creates a new Signal with index 0 and all bits cleared.
127    ///
128    /// # Example
129    ///
130    /// ```ignore
131    /// let signal = Signal::new();
132    /// assert_eq!(signal.index(), 0);
133    /// assert!(signal.is_empty());
134    /// ```
135    pub fn new() -> Self {
136        Self::with_value(0, 0)
137    }
138
139    /// Creates a new Signal with the specified index and all bits cleared.
140    ///
141    /// # Parameters
142    ///
143    /// - `index`: Position in the SignalWaker's signal array (0-61)
144    ///
145    /// # Example
146    ///
147    /// ```ignore
148    /// let signal = Signal::with_index(10);
149    /// assert_eq!(signal.index(), 10);
150    /// assert!(signal.is_empty());
151    /// ```
152    pub fn with_index(index: u64) -> Self {
153        debug_assert!(
154            index < SIGNAL_CAPACITY as u64,
155            "signal index {} exceeds status summary capacity {}",
156            index,
157            SIGNAL_CAPACITY
158        );
159        Self::with_value(index, 0)
160    }
161
162    /// Creates a new Signal with the specified index and initial bitmap value.
163    ///
164    /// This is primarily used for testing or restoring state. In normal operation,
165    /// signals start with all bits cleared.
166    ///
167    /// # Parameters
168    ///
169    /// - `index`: Position in the SignalWaker's signal array (0-61)
170    /// - `value`: Initial 64-bit bitmap value
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// // Create signal with bits 0, 5, and 10 already set
176    /// let signal = Signal::with_value(3, (1 << 0) | (1 << 5) | (1 << 10));
177    /// assert_eq!(signal.size(), 3);
178    /// assert!(signal.is_set(0));
179    /// assert!(signal.is_set(5));
180    /// assert!(signal.is_set(10));
181    /// ```
182    pub fn with_value(index: u64, value: u64) -> Self {
183        debug_assert!(
184            index < SIGNAL_CAPACITY as u64,
185            "signal index {} exceeds status summary capacity {}",
186            index,
187            SIGNAL_CAPACITY
188        );
189        Self {
190            inner: Arc::new(CachePadded::new(SignalInner {
191                index,
192                value: AtomicU64::new(value),
193            })),
194        }
195    }
196
197    /// Loads the current bitmap value with the specified memory ordering.
198    ///
199    /// # Parameters
200    ///
201    /// - `ordering`: Memory ordering for the load operation
202    ///
203    /// # Returns
204    ///
205    /// The 64-bit bitmap value where each set bit represents a ready queue.
206    ///
207    /// # Example
208    ///
209    /// ```ignore
210    /// signal.set(5);
211    /// signal.set(10);
212    /// let value = signal.load(Ordering::Acquire);
213    /// assert_eq!(value, (1 << 5) | (1 << 10));
214    /// ```
215    #[inline(always)]
216    pub fn load(&self, ordering: Ordering) -> u64 {
217        self.inner.value.load(ordering)
218    }
219
220    /// Returns the number of set bits (ready queues) in this signal.
221    ///
222    /// Equivalent to `popcount(bitmap)`, this counts how many queues in this
223    /// signal group currently have work available.
224    ///
225    /// # Performance
226    ///
227    /// Uses the `POPCNT` instruction on x86_64 (~3 cycles), making it very efficient.
228    ///
229    /// # Example
230    ///
231    /// ```ignore
232    /// let signal = Signal::new();
233    /// signal.set(0);
234    /// signal.set(5);
235    /// signal.set(63);
236    /// assert_eq!(signal.size(), 3);
237    /// ```
238    #[inline(always)]
239    pub fn size(&self) -> u64 {
240        self.load(Ordering::Relaxed).count_ones() as u64
241    }
242
243    /// Returns `true` if no bits are set (no ready queues).
244    ///
245    /// This is more efficient than `size() == 0` for checking emptiness.
246    ///
247    /// # Example
248    ///
249    /// ```ignore
250    /// let signal = Signal::new();
251    /// assert!(signal.is_empty());
252    /// signal.set(10);
253    /// assert!(!signal.is_empty());
254    /// ```
255    #[inline(always)]
256    pub fn is_empty(&self) -> bool {
257        self.load(Ordering::Relaxed).count_ones() == 0
258    }
259
260    /// Atomically sets a bit in the bitmap using fetch_or.
261    ///
262    /// This is the primary method for producers to signal that a queue has work available.
263    ///
264    /// # Parameters
265    ///
266    /// - `index`: Bit position to set (0-63)
267    ///
268    /// # Returns
269    ///
270    /// A tuple `(was_empty, was_set)`:
271    /// - `was_empty`: `true` if this was the first bit set (signal transitioned from empty to non-empty)
272    /// - `was_set`: `true` if the bit was successfully set (wasn't already set)
273    ///
274    /// # Use Cases
275    ///
276    /// The return values are used for summary bitmap updates:
277    /// ```ignore
278    /// let (was_empty, was_set) = signal.set(queue_bit);
279    /// if was_empty && was_set {
280    ///     // This signal was empty, now has work - update summary
281    ///     waker.mark_active(signal.index());
282    /// }
283    /// ```
284    ///
285    /// # Performance
286    ///
287    /// ~5-10 ns (one atomic fetch_or operation)
288    ///
289    /// # Example
290    ///
291    /// ```ignore
292    /// let signal = Signal::new();
293    /// let (was_empty, was_set) = signal.set(5);
294    /// assert!(was_empty);  // Signal was empty
295    /// assert!(was_set);    // Bit 5 was not previously set
296    ///
297    /// let (was_empty, was_set) = signal.set(5);
298    /// assert!(!was_empty); // Signal already had bits set
299    /// assert!(!was_set);   // Bit 5 was already set
300    /// ```
301    #[inline(always)]
302    pub fn set(&self, index: u64) -> (bool, bool) {
303        crate::bits::set(&self.inner.value, index)
304    }
305
306    /// Atomically sets a bit using a precomputed bitmask.
307    ///
308    /// Similar to `set()`, but takes a precomputed `1 << index` value for cases
309    /// where the bit position is computed once and reused.
310    ///
311    /// # Parameters
312    ///
313    /// - `bit`: Precomputed bitmask with exactly one bit set (e.g., `1 << 5`)
314    ///
315    /// # Returns
316    ///
317    /// The previous bitmap value before setting the bit.
318    ///
319    /// # Example
320    ///
321    /// ```ignore
322    /// let signal = Signal::new();
323    /// let bit_mask = 1u64 << 10;
324    /// let prev = signal.set_with_bit(bit_mask);
325    /// assert_eq!(prev, 0);  // Was empty
326    /// assert!(signal.is_set(10));
327    /// ```
328    #[inline(always)]
329    pub fn set_with_bit(&self, bit: u64) -> u64 {
330        crate::bits::set_with_bit(&self.inner.value, bit)
331    }
332
333    /// Atomically clears a bit if it is currently set (CAS-based).
334    ///
335    /// This is the primary method for the executor to claim ownership of a ready queue.
336    /// Uses a CAS loop to ensure the bit is cleared atomically.
337    ///
338    /// # Parameters
339    ///
340    /// - `index`: Bit position to clear (0-63)
341    ///
342    /// # Returns
343    ///
344    /// - `true`: Bit was set and has been successfully cleared (queue acquired)
345    /// - `false`: Bit was not set (queue not ready or already acquired)
346    ///
347    /// # Use Cases
348    ///
349    /// ```ignore
350    /// // Executor loop
351    /// if signal.acquire(queue_bit) {
352    ///     // Successfully acquired queue, process it
353    ///     process_queue(queue_id);
354    /// }
355    /// ```
356    ///
357    /// # Performance
358    ///
359    /// ~10-20 ns (CAS loop, typically succeeds on first iteration)
360    ///
361    /// # Example
362    ///
363    /// ```ignore
364    /// let signal = Signal::new();
365    /// signal.set(5);
366    /// assert!(signal.acquire(5));  // Successfully cleared bit 5
367    /// assert!(!signal.acquire(5)); // Bit already clear, returns false
368    /// ```
369    #[inline(always)]
370    pub fn acquire(&self, index: u64) -> bool {
371        crate::bits::acquire(&self.inner.value, index)
372    }
373
374    /// Attempts to atomically clear a bit, returning detailed state information.
375    ///
376    /// Similar to `acquire()`, but provides additional information about the
377    /// before/after state of the bitmap, useful for debugging or advanced scheduling.
378    ///
379    /// # Parameters
380    ///
381    /// - `index`: Bit position to clear (0-63)
382    ///
383    /// # Returns
384    ///
385    /// A tuple `(before, after, success)`:
386    /// - `before`: Bitmap value before the operation
387    /// - `after`: Bitmap value after the operation (if successful)
388    /// - `success`: `true` if the bit was cleared, `false` if it wasn't set
389    ///
390    /// # Example
391    ///
392    /// ```ignore
393    /// let signal = Signal::with_value(0, 0b101010);  // Bits 1, 3, 5 set
394    /// let (before, after, success) = signal.try_acquire(3);
395    /// assert_eq!(before, 0b101010);
396    /// assert_eq!(after, 0b100010);   // Bit 3 cleared
397    /// assert!(success);
398    /// ```
399    #[inline(always)]
400    pub fn try_acquire(&self, index: u64) -> (u64, u64, bool) {
401        crate::bits::try_acquire(&self.inner.value, index)
402    }
403
404    /// Checks if a specific bit is set without modifying the bitmap.
405    ///
406    /// Non-atomic read followed by bit test. Suitable for non-critical checks
407    /// where races are acceptable.
408    ///
409    /// # Parameters
410    ///
411    /// - `index`: Bit position to check (0-63)
412    ///
413    /// # Returns
414    ///
415    /// `true` if the bit is set, `false` otherwise.
416    ///
417    /// # Example
418    ///
419    /// ```ignore
420    /// let signal = Signal::new();
421    /// assert!(!signal.is_set(5));
422    /// signal.set(5);
423    /// assert!(signal.is_set(5));
424    /// ```
425    #[inline(always)]
426    pub fn is_set(&self, index: u64) -> bool {
427        crate::bits::is_set(&self.inner.value, index)
428    }
429}
430
431/// A cache-optimized waker that packs queue summaries and control flags into a single status word.
432#[repr(align(64))]
433pub struct AsyncSignalWaker {
434    /// **Status bitmap**: Queue-word summary bits (0‒61) plus control flags.
435    ///
436    /// - Bits 0‒63: Queue-word hot bits (`mark_active`, `try_unmark_if_empty`, etc.)
437    ///
438    /// Keeping everything in one atomic avoids races between independent u64s.
439    status: CachePadded<AtomicU64>,
440
441    /// **Counting semaphore**: Number of threads that should be awake (available permits).
442    ///
443    /// Incremented by producers when queues become active (0→1 transitions).
444    /// Decremented by consumers via `try_acquire()` or `acquire()`.
445    ///
446    /// **Critical invariant**: Each queue empty→non-empty transition adds exactly
447    /// 1 permit, preventing lost wakeups. Permits accumulate if no threads are
448    /// sleeping, ensuring late arrivals find work.
449    ///
450    /// - Acquire: `AcqRel` (synchronizes with Release from producers)
451    /// - Release: `Release` (makes queue data visible to acquirers)
452    permits: CachePadded<AtomicU64>,
453}
454
455impl AsyncSignalWaker {
456    pub fn new() -> Self {
457        Self {
458            status: CachePadded::new(AtomicU64::new(0)),
459            permits: CachePadded::new(AtomicU64::new(0)),
460        }
461    }
462
463    // ────────────────────────────────────────────────────────────────────────────
464    // PRODUCER-SIDE API
465    // ────────────────────────────────────────────────────────────────────────────
466
467    /// Marks a signal word at `index` (0..63) as active in the summary.
468    ///
469    /// Called by producers when a queue transitions from empty to non-empty.
470    /// If this is a **0→1 transition** (bit was previously clear), adds 1 permit
471    /// and wakes 1 sleeping thread.
472    ///
473    /// # Fast Path
474    ///
475    /// If the bit is already set, returns immediately without touching atomics.
476    /// This is the common case when multiple producers push to the same word group.
477    ///
478    /// # Arguments
479    ///
480    /// * `index` - Word index (0..63) to mark as active
481    ///
482    /// # Example
483    ///
484    /// ```ignore
485    /// // Producer pushes to queue 5 in word 0
486    /// let (was_empty, was_set) = signal.set(5);
487    /// if was_empty && was_set {
488    ///     waker.mark_active(0);  // Wake 1 consumer
489    /// }
490    /// ```
491    #[inline]
492    pub fn mark_active(&self, index: u64) {
493        debug_assert!(
494            index < STATUS_SUMMARY_BITS as u64,
495            "summary index {} exceeds {} bits",
496            index,
497            STATUS_SUMMARY_BITS
498        );
499        let mask = 1u64 << index;
500        if self.status.load(Ordering::Relaxed) & mask != 0 {
501            return;
502        }
503        let prev = self.status.fetch_or(mask, Ordering::Relaxed);
504        if prev & mask == 0 {
505            self.release(1);
506        }
507    }
508
509    /// Batch version of `mark_active()`: marks multiple words as active at once.
510    ///
511    /// Efficiently handles multiple queues becoming active simultaneously.
512    /// Releases exactly `k` permits, where `k` is the number of **0→1 transitions**
513    /// (newly-active words).
514    ///
515    /// # Optimization
516    ///
517    /// Uses a single `fetch_or` instead of calling `mark_active()` in a loop,
518    /// reducing atomic contention when many queues activate together.
519    ///
520    /// # Arguments
521    ///
522    /// * `mask` - Bitmap of words to mark active (bit `i` = word `i`)
523    ///
524    /// # Example
525    ///
526    /// ```ignore
527    /// // Multiple queues became active
528    /// let mut active_words = 0u64;
529    /// for word_idx in 0..64 {
530    ///     if word_became_active(word_idx) {
531    ///         active_words |= 1 << word_idx;
532    ///     }
533    /// }
534    /// waker.mark_active_mask(active_words);  // Single atomic op
535    /// ```
536    #[inline]
537    pub fn mark_active_mask(&self, mask: u64) {
538        let summary_mask = mask;
539        if summary_mask == 0 {
540            return;
541        }
542        let prev = self.status.fetch_or(summary_mask, Ordering::Relaxed);
543        let newly = (!prev) & summary_mask;
544        let k = newly.count_ones() as usize;
545        if k > 0 {
546            self.release(k);
547        }
548    }
549
550    /// Clears the summary bit for `bit_index` if the corresponding signal word is empty.
551    ///
552    /// This is **lazy cleanup** - consumers call this after draining a word to prevent
553    /// false positives in future `snapshot_summary()` calls. However, it's safe to skip
554    /// this; the system remains correct with stale summary bits.
555    ///
556    /// # Arguments
557    ///
558    /// * `bit_index` - Word index (0..63) to potentially clear
559    /// * `signal` - The actual signal word to check for emptiness
560    ///
561    /// # Example
562    ///
563    /// ```ignore
564    /// // After draining all queues in word 3
565    /// waker.try_unmark_if_empty(3, &signal_word_3);
566    /// ```
567    #[inline]
568    pub fn try_unmark_if_empty(&self, bit_index: u64, signal: &AtomicU64) {
569        debug_assert!(
570            bit_index < STATUS_SUMMARY_BITS as u64,
571            "summary index {} exceeds {} bits",
572            bit_index,
573            STATUS_SUMMARY_BITS
574        );
575        let mask = 1u64 << bit_index;
576
577        loop {
578            if signal.load(Ordering::Acquire) != 0 {
579                return;
580            }
581
582            let snapshot = self.status.load(Ordering::Relaxed);
583            if snapshot & mask == 0 {
584                return;
585            }
586
587            match self.status.compare_exchange(
588                snapshot,
589                snapshot & !mask,
590                Ordering::AcqRel,
591                Ordering::Relaxed,
592            ) {
593                Ok(_) => {
594                    if signal.load(Ordering::Acquire) != 0 {
595                        // Re-arm summary and release if work arrived concurrently.
596                        self.mark_active(bit_index);
597                    }
598                    return;
599                }
600                Err(actual) => {
601                    if actual & mask == 0 {
602                        return;
603                    }
604                }
605            }
606        }
607    }
608
609    /// Unconditionally clears the summary bit for `bit_index`.
610    ///
611    /// Faster than `try_unmark_if_empty()` when the caller already knows
612    /// the word is empty (avoids checking the signal word).
613    ///
614    /// # Arguments
615    ///
616    /// * `bit_index` - Word index (0..63) to clear
617    #[inline]
618    pub fn try_unmark(&self, bit_index: u64) {
619        debug_assert!(
620            bit_index < STATUS_SUMMARY_BITS as u64,
621            "summary index {} exceeds {} bits",
622            bit_index,
623            STATUS_SUMMARY_BITS
624        );
625        let mask = 1u64 << bit_index;
626        if self.status.load(Ordering::Relaxed) & mask != 0 {
627            self.status
628                .fetch_and(!(1u64 << bit_index), Ordering::Relaxed);
629        }
630    }
631
632    // ────────────────────────────────────────────────────────────────────────────
633    // CONSUMER-SIDE API
634    // ────────────────────────────────────────────────────────────────────────────
635
636    /// Returns a snapshot of the current summary bitmap.
637    ///
638    /// Consumers use this to quickly identify which word groups have potential work.
639    /// If bit `i` is set, word `i` *may* have active queues (false positives possible
640    /// due to lazy cleanup).
641    ///
642    /// # Memory Ordering
643    ///
644    /// Uses `Relaxed` because this is a hint, not a synchronization point. The actual
645    /// queue data is synchronized via acquire/release on the permits counter.
646    ///
647    /// # Returns
648    ///
649    /// A u64 bitmap where bit `i` indicates word `i` has potential work.
650    ///
651    /// # Example
652    ///
653    /// ```ignore
654    /// let summary = waker.snapshot_summary();
655    /// for word_idx in 0..64 {
656    ///     if summary & (1 << word_idx) != 0 {
657    ///         // Check queues in word_idx
658    ///     }
659    /// }
660    /// ```
661    #[inline]
662    pub fn snapshot_summary(&self) -> u64 {
663        self.status.load(Ordering::Relaxed)
664    }
665
666    /// Finds the nearest set bit to `nearest_to_index` in the summary.
667    ///
668    /// Useful for maintaining **locality**: continue working on queues near
669    /// the last processed index, improving cache behavior.
670    ///
671    /// # Arguments
672    ///
673    /// * `nearest_to_index` - Preferred starting point (0..63)
674    ///
675    /// # Returns
676    ///
677    /// The index of the nearest set bit, or undefined if summary is empty.
678    ///
679    /// # Example
680    ///
681    /// ```ignore
682    /// let mut last_word = 0;
683    /// loop {
684    ///     last_word = waker.summary_select(last_word);
685    ///     // Process queues in word last_word
686    /// }
687    /// ```
688    #[inline]
689    pub fn summary_select(&self, nearest_to_index: u64) -> u64 {
690        let summary = self.status.load(Ordering::Relaxed);
691        crate::bits::find_nearest(summary, nearest_to_index)
692    }
693
694    // ────────────────────────────────────────────────────────────────────────────
695    // PERMIT SYSTEM (Counting Semaphore)
696    // ────────────────────────────────────────────────────────────────────────────
697
698    /// Non-blocking attempt to acquire a permit.
699    ///
700    /// Atomically decrements the permit counter if available. This is the **lock-free
701    /// fast path** used by consumers before resorting to blocking.
702    ///
703    /// # Returns
704    ///
705    /// - `true` if a permit was consumed (consumer should process work)
706    /// - `false` if no permits available (queue likely empty)
707    ///
708    /// # Memory Ordering
709    ///
710    /// Uses `AcqRel` to synchronize with producers' `Release` in `release()`.
711    /// This ensures queue data written by producers is visible to this consumer.
712    ///
713    /// # Example
714    ///
715    /// ```ignore
716    /// if waker.try_acquire() {
717    ///     // Process work (permit guarantees something is available)
718    /// } else {
719    ///     // No work, maybe park or spin
720    /// }
721    /// ```
722    #[inline]
723    pub fn try_acquire(&self) -> bool {
724        self.permits
725            .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |p| p.checked_sub(1))
726            .is_ok()
727    }
728
729    /// Blocking acquire: parks the thread until a permit becomes available.
730    ///
731    /// Tries the fast path first (`try_acquire()`), then falls back to parking
732    /// on a condvar. Handles spurious wakeups by rechecking permits in a loop.
733    ///
734    /// # Blocking Behavior
735    ///
736    /// 1. Increment `sleepers` count
737    /// 2. Wait on condvar (releases mutex)
738    /// 3. Recheck permits after wakeup
739    /// 4. Decrement `sleepers` on exit
740    ///
741    /// # Panics
742    ///
743    /// Panics if the mutex or condvar is poisoned (indicates a panic in another thread
744    /// while holding the lock).
745    ///
746    /// # Example
747    ///
748    /// ```ignore
749    /// loop {
750    ///     waker.acquire();  // Blocks until work available
751    ///     process_work();
752    /// }
753    /// ```
754    pub fn acquire(&self) {
755        if self.try_acquire() {
756            return;
757        }
758        // TODO: Implement
759    }
760
761    /// Blocking acquire with timeout.
762    ///
763    /// Like `acquire()`, but returns after `timeout` if no permit becomes available.
764    /// Useful for implementing shutdown or periodic maintenance.
765    ///
766    /// # Arguments
767    ///
768    /// * `timeout` - Maximum duration to wait
769    ///
770    /// # Returns
771    ///
772    /// - `true` if a permit was acquired
773    /// - `false` if timed out without acquiring
774    ///
775    /// # Example
776    ///
777    /// ```ignore
778    /// use std::time::Duration;
779    ///
780    /// loop {
781    ///     if waker.acquire_timeout(Duration::from_secs(1)) {
782    ///         process_work();
783    ///     } else {
784    ///         // Timeout - check for shutdown signal
785    ///         if should_shutdown() { break; }
786    ///     }
787    /// }
788    /// ```
789    pub fn acquire_timeout(&self, timeout: Duration) -> bool {
790        if self.try_acquire() {
791            return true;
792        }
793        // TODO: Implement
794        false
795    }
796
797    /// Releases `n` permits and wakes up to `n` sleeping threads.
798    ///
799    /// Called by producers (indirectly via `mark_active`) when queues become active.
800    /// Uses **targeted wakeups**: only notifies up to `min(n, sleepers)` threads,
801    /// avoiding unnecessary `notify_one()` calls.
802    ///
803    /// # Permit Accumulation
804    ///
805    /// If no threads are sleeping, permits accumulate for future consumers.
806    /// This guarantees **no lost wakeups**: late-arriving consumers find work immediately.
807    ///
808    /// # Arguments
809    ///
810    /// * `n` - Number of permits to release (typically 1 or count of newly-active queues)
811    ///
812    /// # Memory Ordering
813    ///
814    /// Uses `Release` to ensure queue data is visible to consumers who `Acquire`
815    /// via `try_acquire()`.
816    ///
817    /// # Example
818    ///
819    /// ```ignore
820    /// // Producer activates 3 queues
821    /// waker.release(3);  // Wakes up to 3 sleeping consumers
822    /// ```
823    #[inline]
824    pub fn release(&self, n: usize) {
825        if n == 0 {
826            return;
827        }
828        // TODO: Implement
829    }
830
831    // ────────────────────────────────────────────────────────────────────────────
832    // INSPECTION / DEBUGGING
833    // ────────────────────────────────────────────────────────────────────────────
834
835    /// Returns the current summary bitmap.
836    ///
837    /// Useful for debugging or metrics. Equivalent to `snapshot_summary()` but
838    /// uses `Acquire` ordering for stronger visibility guarantees.
839    #[inline]
840    pub fn summary_bits(&self) -> u64 {
841        self.status.load(Ordering::Acquire)
842    }
843
844    /// Returns the current number of available permits.
845    ///
846    /// Useful for monitoring queue health or load. A high permit count may
847    /// indicate consumers are falling behind.
848    #[inline]
849    pub fn permits(&self) -> u64 {
850        self.permits.load(Ordering::Acquire)
851    }
852}
853
854impl Default for AsyncSignalWaker {
855    fn default() -> Self {
856        Self::new()
857    }
858}
859
860// ──────────────────────────────────────────────────────────────────────────────
861// SignalGate State Machine Constants
862// ──────────────────────────────────────────────────────────────────────────────
863
864/// Queue has no work scheduled and is not being processed.
865///
866/// This is the initial state. Transitions to SCHEDULED when work is enqueued.
867pub const IDLE: u8 = 0;
868
869/// Queue has work available and is waiting for the executor to process it.
870///
871/// Transitions from IDLE when `schedule()` is called. Transitions to EXECUTING
872/// when the executor calls `begin()`.
873pub const SCHEDULED: u8 = 1;
874
875/// Queue is currently being processed by the executor.
876///
877/// Transitions from SCHEDULED when executor calls `begin()`. Transitions back to
878/// IDLE (via `finish()`) or SCHEDULED (via `finish_and_schedule()`) when processing completes.
879pub const EXECUTING: u8 = 2;
880
881/// Per-queue gate coordinating scheduling between producers and executor.
882///
883/// `SignalGate` implements a lock-free state machine that prevents redundant scheduling
884/// and ensures proper handoff of work from producers to the executor. Each queue has
885/// exactly one `SignalGate` instance.
886///
887/// # State Machine
888///
889/// ```text
890/// ┌──────────────────────────────────────────────────────────────┐
891/// │                                                              │
892/// │  IDLE (0)  ──schedule()──▶  SCHEDULED (1)  ──begin()──▶  EXECUTING (2)
893/// │     ▲                            │                           │
894/// │     │                            │                           │
895/// │     └────────finish()────────────┴───────────────────────────┘
896/// │                                  │                           │
897/// │                                  └──finish_and_schedule()────┘
898/// │                                              │               │
899/// │                                              ▼               │
900/// │                                         SCHEDULED (1)        │
901/// └──────────────────────────────────────────────────────────────┘
902/// ```
903///
904/// # State Transitions
905///
906/// - **IDLE → SCHEDULED**: Producer calls `schedule()` after enqueuing items
907/// - **SCHEDULED → EXECUTING**: Executor calls `begin()` before processing
908/// - **EXECUTING → IDLE**: Executor calls `finish()` when done (queue empty)
909/// - **EXECUTING → SCHEDULED**: Executor calls `finish_and_schedule()` when more work remains
910/// - **Any → SCHEDULED**: Concurrent `schedule()` during EXECUTING sets flag, processed in `finish()`
911///
912/// # Concurrency Guarantees
913///
914/// - **Multiple producers**: Safe (atomic flags ensure only one schedule succeeds)
915/// - **Producer + executor**: Safe (state transitions are atomic and properly ordered)
916/// - **Multiple executors**: NOT SAFE (single-threaded consumption assumption)
917///
918/// # Integration with Signal and SignalWaker
919///
920/// When a queue transitions IDLE → SCHEDULED:
921/// 1. Sets bit in the associated `Signal` (64-bit bitmap)
922/// 2. If signal was empty, sets bit in `SignalWaker` summary (64-bit bitmap)
923/// 3. May wake sleeping executor thread via permit system
924///
925/// # Memory Layout
926///
927/// ```text
928/// SignalGate (40 bytes on x86_64)
929/// ┌─────────────┬───────────┬─────────┬─────────┐
930/// │ flags (1B)  │ bit_index │ signal  │ waker   │
931/// │ AtomicU8    │ u64 (8B)  │ Arc (8B)│ Arc (8B)│
932/// └─────────────┴───────────┴─────────┴─────────┘
933/// ```
934///
935/// # Example Usage
936///
937/// ```ignore
938/// // Setup
939/// let waker = Arc::new(SignalWaker::new());
940/// let signal = Signal::with_index(5);
941/// let gate = SignalGate::new(10, signal, waker);
942///
943/// // Producer thread
944/// queue.try_push(item)?;
945/// gate.schedule();  // Signal work available
946///
947/// // Executor thread
948/// gate.begin();     // Mark as executing
949/// while let Some(item) = queue.try_pop() {
950///     process(item);
951/// }
952/// if queue.is_empty() {
953///     gate.finish();  // Done, back to IDLE
954/// } else {
955///     gate.finish_and_schedule();  // More work, stay SCHEDULED
956/// }
957/// ```
958pub struct AsyncSignalGate {
959    /// Atomic state flags (IDLE, SCHEDULED, EXECUTING).
960    ///
961    /// Uses bitwise OR to combine flags, allowing detection of concurrent schedules
962    /// during execution (EXECUTING | SCHEDULED = 3).
963    flags: AtomicU8,
964
965    /// Bit position within the Signal's 64-bit bitmap (0-63).
966    ///
967    /// This queue's ready state is represented by bit `1 << bit_index` in the signal.
968    bit_index: u8,
969
970    /// Reference to the Signal word containing this queue's bit.
971    ///
972    /// Shared among up to 64 queues (all queues in the same signal group).
973    signal: Signal,
974
975    /// Reference to the top-level SignalWaker for summary updates.
976    ///
977    /// Shared among all queues in the executor (up to 4096 queues).
978    waker: Arc<AsyncSignalWaker>,
979}
980
981impl AsyncSignalGate {
982    /// Creates a new SignalGate in the IDLE state.
983    ///
984    /// # Parameters
985    ///
986    /// - `bit_index`: Position of this queue's bit within the signal (0-63)
987    /// - `signal`: Reference to the Signal word containing this queue's bit
988    /// - `waker`: Reference to the SignalWaker for summary updates
989    ///
990    /// # Example
991    ///
992    /// ```ignore
993    /// let waker = Arc::new(SignalWaker::new());
994    /// let signal = Signal::with_index(0);
995    /// let gate = SignalGate::new(5, signal, waker);
996    /// // This gate controls bit 5 in signal[0]
997    /// ```
998    pub fn new(bit_index: u8, signal: Signal, waker: Arc<AsyncSignalWaker>) -> Self {
999        Self {
1000            flags: AtomicU8::new(IDLE),
1001            bit_index,
1002            signal,
1003            waker,
1004        }
1005    }
1006
1007    /// Attempts to schedule this queue for execution (IDLE → SCHEDULED transition).
1008    ///
1009    /// Called by producers after enqueuing items to notify the executor. Uses atomic
1010    /// operations to ensure only one successful schedule per work batch.
1011    ///
1012    /// # Algorithm
1013    ///
1014    /// 1. **Fast check**: If already SCHEDULED, return false immediately (idempotent)
1015    /// 2. **Atomic set**: `fetch_or(SCHEDULED)` to set the SCHEDULED flag
1016    /// 3. **State check**: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
1017    ///    - Set bit in signal word via `signal.set(bit_index)`
1018    ///    - If signal transitioned from empty, update summary via `waker.mark_active()`
1019    ///    - Return true (successful schedule)
1020    /// 4. **Otherwise**: Return false (already scheduled or executing)
1021    ///
1022    /// # Returns
1023    ///
1024    /// - `true`: Successfully transitioned from IDLE to SCHEDULED (work will be processed)
1025    /// - `false`: Already scheduled/executing, or concurrent schedule won (idempotent)
1026    ///
1027    /// # Concurrent Behavior
1028    ///
1029    /// - **Multiple producers**: Only the first `schedule()` succeeds (returns true)
1030    /// - **During EXECUTING**: Sets SCHEDULED flag, which `finish()` will detect and reschedule
1031    ///
1032    /// # Memory Ordering
1033    ///
1034    /// - Initial load: `Acquire` (see latest state)
1035    /// - `fetch_or`: `Release` (publish enqueued items to executor)
1036    ///
1037    /// # Performance
1038    ///
1039    /// - **Already scheduled**: ~2-3 ns (fast path, single atomic load)
1040    /// - **Successful schedule**: ~10-20 ns (fetch_or + signal update + potential summary update)
1041    ///
1042    /// # Example
1043    ///
1044    /// ```ignore
1045    /// // Producer 1
1046    /// queue.try_push(item)?;
1047    /// if gate.schedule() {
1048    ///     println!("Successfully scheduled");  // First producer
1049    /// }
1050    ///
1051    /// // Producer 2 (concurrent)
1052    /// queue.try_push(another_item)?;
1053    /// if !gate.schedule() {
1054    ///     println!("Already scheduled");  // Idempotent, no action needed
1055    /// }
1056    /// ```
1057    #[inline(always)]
1058    pub fn schedule(&self) -> bool {
1059        if (self.flags.load(Ordering::Acquire) & SCHEDULED) != IDLE {
1060            return false;
1061        }
1062
1063        let previous_flags = self.flags.fetch_or(SCHEDULED, Ordering::Release);
1064        let scheduled_nor_executing = (previous_flags & (SCHEDULED | EXECUTING)) == IDLE;
1065
1066        if scheduled_nor_executing {
1067            let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
1068            if was_empty && was_set {
1069                self.waker.mark_active(self.signal.index());
1070            }
1071            true
1072        } else {
1073            false
1074        }
1075    }
1076
1077    /// Marks the queue as EXECUTING (SCHEDULED → EXECUTING transition).
1078    ///
1079    /// Called by the executor when it begins processing this queue. This transition
1080    /// prevents redundant scheduling while work is being processed.
1081    ///
1082    /// # State Transition
1083    ///
1084    /// Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.
1085    /// ```text
1086    /// Before: SCHEDULED (1)
1087    /// After:  EXECUTING (2)
1088    /// ```
1089    ///
1090    /// If a producer calls `schedule()` after `begin()` but before `finish()`, the
1091    /// SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED),
1092    /// which `finish()` detects and handles.
1093    ///
1094    /// # Memory Ordering
1095    ///
1096    /// Uses `Ordering::Release` to ensure the state change is visible to concurrent
1097    /// producers calling `schedule()`.
1098    ///
1099    /// # Performance
1100    ///
1101    /// ~1-2 ns (single atomic store)
1102    ///
1103    /// # Example
1104    ///
1105    /// ```ignore
1106    /// // Executor discovers ready queue
1107    /// if signal.acquire(queue_bit) {
1108    ///     gate.begin();  // Mark as executing
1109    ///     process_queue();
1110    ///     gate.finish();
1111    /// }
1112    /// ```
1113    #[inline(always)]
1114    pub fn mark(&self) {
1115        self.flags.store(EXECUTING, Ordering::Release);
1116    }
1117
1118    /// Marks the queue as IDLE and handles concurrent schedules (EXECUTING → IDLE/SCHEDULED).
1119    ///
1120    /// Called by the executor after processing a batch of items. Automatically detects
1121    /// if new work arrived during processing (SCHEDULED flag set concurrently) and
1122    /// reschedules if needed.
1123    ///
1124    /// # Algorithm
1125    ///
1126    /// 1. **Clear EXECUTING**: `fetch_sub(EXECUTING)` atomically transitions to IDLE
1127    /// 2. **Check SCHEDULED**: If the SCHEDULED flag is set in the result:
1128    ///    - Means a producer called `schedule()` during execution
1129    ///    - Re-set the signal bit to ensure executor sees the work
1130    ///    - Queue remains/becomes SCHEDULED
1131    ///
1132    /// # Automatic Rescheduling
1133    ///
1134    /// This method implements a key correctness property: if a producer enqueues work
1135    /// while the executor is processing, that work will not be lost. The SCHEDULED flag
1136    /// acts as a handoff mechanism.
1137    ///
1138    /// ```text
1139    /// Timeline:
1140    /// T0: Executor calls begin()           → EXECUTING (2)
1141    /// T1: Producer calls schedule()        → EXECUTING | SCHEDULED (3)
1142    /// T2: Executor calls finish()          → SCHEDULED (1) [bit re-set in signal]
1143    /// T3: Executor sees bit, processes     → ...
1144    /// ```
1145    ///
1146    /// # Memory Ordering
1147    ///
1148    /// Uses `Ordering::AcqRel`:
1149    /// - **Acquire**: See all producer writes (enqueued items)
1150    /// - **Release**: Publish state transition to future readers
1151    ///
1152    /// # Performance
1153    ///
1154    /// - **No concurrent schedule**: ~2-3 ns (fetch_sub only)
1155    /// - **With concurrent schedule**: ~10-15 ns (fetch_sub + signal.set)
1156    ///
1157    /// # Example
1158    ///
1159    /// ```ignore
1160    /// gate.begin();
1161    /// while let Some(item) = queue.try_pop() {
1162    ///     process(item);
1163    /// }
1164    /// gate.finish();  // Automatically reschedules if more work arrived
1165    /// ```
1166    #[inline(always)]
1167    pub fn unmark(&self) {
1168        let after_flags = self.flags.fetch_sub(EXECUTING, Ordering::AcqRel);
1169        if after_flags & SCHEDULED != IDLE {
1170            self.signal.set(self.bit_index as u64);
1171        }
1172    }
1173
1174    /// Atomically marks the queue as SCHEDULED, ensuring re-execution.
1175    ///
1176    /// Called by the executor when it knows more work exists but wants to yield the
1177    /// timeslice for fairness. This is an optimization over `finish()` followed by
1178    /// external `schedule()`.
1179    ///
1180    /// # Use Cases
1181    ///
1182    /// 1. **Batch size limiting**: Process N items, then yield to other queues
1183    /// 2. **Fairness**: Prevent queue starvation by rotating execution
1184    /// 3. **Latency control**: Ensure all queues get regular timeslices
1185    ///
1186    /// # Algorithm
1187    ///
1188    /// 1. **Set state**: Store SCHEDULED unconditionally
1189    /// 2. **Update signal**: Set bit in signal word
1190    /// 3. **Update summary**: If signal was empty, mark active in waker
1191    ///
1192    /// # Comparison with finish() + schedule()
1193    ///
1194    /// ```ignore
1195    /// // Separate calls (2 atomic ops)
1196    /// gate.finish();      // EXECUTING → IDLE
1197    /// gate.schedule();    // IDLE → SCHEDULED
1198    ///
1199    /// // Combined call (1 atomic op + signal update)
1200    /// gate.finish_and_schedule();  // EXECUTING → SCHEDULED
1201    /// ```
1202    ///
1203    /// # Memory Ordering
1204    ///
1205    /// Uses `Ordering::Release` to publish both the state change and enqueued items.
1206    ///
1207    /// # Performance
1208    ///
1209    /// ~10-15 ns (store + signal.set + potential summary update)
1210    ///
1211    /// # Example
1212    ///
1213    /// ```ignore
1214    /// gate.begin();
1215    /// let mut processed = 0;
1216    /// while processed < BATCH_SIZE {
1217    ///     if let Some(item) = queue.try_pop() {
1218    ///         process(item);
1219    ///         processed += 1;
1220    ///     } else {
1221    ///         break;
1222    ///     }
1223    /// }
1224    ///
1225    /// if queue.len() > 0 {
1226    ///     gate.finish_and_schedule();  // More work, stay scheduled
1227    /// } else {
1228    ///     gate.finish();  // Done, go idle
1229    /// }
1230    /// ```
1231    #[inline(always)]
1232    pub fn unmark_and_schedule(&self) {
1233        self.flags.store(SCHEDULED, Ordering::Release);
1234        let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
1235        if was_empty && was_set {
1236            self.waker.mark_active(self.signal.index());
1237        }
1238    }
1239}
1240
1241impl crate::spsc::SignalSchedule for AsyncSignalGate {
1242    fn schedule(&self) -> bool {
1243        self.schedule()
1244    }
1245
1246    fn mark(&self) {
1247        self.mark();
1248    }
1249
1250    fn unmark(&self) {
1251        self.unmark();
1252    }
1253
1254    fn unmark_and_schedule(&self) {
1255        self.unmark_and_schedule();
1256    }
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261    use super::*;
1262}