maniac_runtime/runtime/
signal.rs

1//! Signal coordination primitives for lock-free work scheduling.
2//!
3//! This module provides the building blocks for coordinating work between producers and
4//! an executor in a lock-free manner. The core abstraction is a two-level bitmap system
5//! where each bit represents a queue that has work available.
6//!
7//! # Architecture
8//!
9//! ```text
10//! SignalWaker (Status Layer)
11//! ┌──────────────────────────────────────────────┐
12//! │ Status: [bit0, bit1, ..., bit61, flags...]   │  62 queue bits + control flags
13//! └──────────────────────────────────────────────┘
14//!        │       │                  │
15//!        ▼       ▼                  ▼
16//!   Signal[0] Signal[1]  ...   Signal[61]           62 signal words
17//!   [64 bits] [64 bits]       [64 bits]             (one per group of 64 queues)
18//!      │  │      │  │            │  │
19//!      ▼  ▼      ▼  ▼            ▼  ▼
20//!   Queue Queue Queue Queue ... Queue Queue         Up to 3,968 queues total
21//!      0    1     64   65      3902  3903
22//! ```
23//!
24//! # Components
25//!
26//! ## Signal
27//!
28//! A cache-line padded 64-bit atomic bitmap representing up to 64 queues. Each bit
29//! corresponds to a queue's scheduled state. Setting a bit indicates the queue has
30//! work available and should be processed by the executor.
31//!
32//! ## SignalGate
33//!
34//! A per-queue coordination structure that manages the state machine for scheduling:
35//! - **IDLE**: Queue has no work scheduled
36//! - **SCHEDULED**: Queue has work and is waiting for executor
37//! - **EXECUTING**: Queue is currently being processed
38//!
39//! The state machine prevents redundant scheduling and ensures proper handoff between
40//! producer and executor.
41//!
42//! # Integration
43//!
44//! This module integrates with `SignalWaker` (waker.rs) to provide a complete
45//! work-stealing scheduler. Producers call `SignalGate::schedule()` to enqueue work,
46//! and the executor uses `SignalWaker` to discover and process ready queues.
47//!
48//! # Constants
49//!
50//! The design is optimized for x86_64 cache line sizes and common executor configurations.
51
52use super::waker::{STATUS_SUMMARY_WORDS, WorkerWaker};
53use crate::spsc::SignalSchedule;
54use crate::utils::CachePadded;
55use std::sync::Arc;
56use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
57
58/// Number of bits per signal word (64-bit atomic).
59///
60/// Each signal word can track up to 64 queues. This matches the width of
61/// `AtomicU64` and provides efficient bit manipulation via hardware instructions
62/// (POPCNT, BSF, etc.).
63pub const SIGNAL_CAPACITY: u64 = 64;
64
65/// Bitmask for extracting bit index within a signal word.
66///
67/// Equivalent to `index % 64`, used for fast modulo via bitwise AND:
68/// ```ignore
69/// bit_index = queue_id & SIGNAL_MASK;
70/// ```
71pub const SIGNAL_MASK: u64 = SIGNAL_CAPACITY - 1;
72
73/// A cache-line padded 64-bit atomic bitmap for tracking queue readiness.
74///
75/// Each `Signal` represents a group of up to 64 queues, where each bit indicates
76/// whether the corresponding queue has work available. Multiple `Signal` instances
77/// are coordinated via a `SignalWaker` to form a complete two-level bitmap.
78///
79/// # Design
80///
81/// ```text
82/// Signal (64-bit AtomicU64)
83/// ┌───┬───┬───┬───┬─────┬───┐
84/// │ 0 │ 1 │ 0 │ 1 │ ... │ 0 │  Each bit = one queue's scheduled state
85/// └───┴───┴───┴───┴─────┴───┘
86///   Q0  Q1  Q2  Q3  ...  Q63
87/// ```
88///
89/// # Cache Optimization
90///
91/// The inner state is wrapped in `Arc<CachePadded<...>>` to:
92/// - Allow cheap cloning (single pointer copy)
93/// - Prevent false sharing between different signals
94/// - Optimize for hot paths (producers setting bits, executor clearing bits)
95///
96/// # Thread Safety
97///
98/// All operations use atomic instructions. Multiple producers can concurrently set
99/// bits (via `set()`), and the executor can concurrently acquire/clear bits (via
100/// `acquire()` or `try_acquire()`).
101///
102/// # Cloning
103///
104/// `Signal` is cheaply clonable via `Arc`. All clones share the same underlying
105/// atomic bitmap, making it suitable for distribution across multiple producer threads.
106#[derive(Clone)]
107pub struct Signal {
108    /// Shared, cache-line padded inner state.
109    inner: Arc<CachePadded<SignalInner>>,
110}
111
112impl Signal {
113    /// Returns the signal's index within the SignalWaker's signal array.
114    ///
115    /// This index is used to:
116    /// - Map this signal to the corresponding bit in the summary bitmap
117    /// - Identify which group of 64 queues this signal represents
118    ///
119    /// # Example
120    ///
121    /// ```ignore
122    /// let signal = Signal::with_index(5);
123    /// assert_eq!(signal.index(), 5);
124    /// // This signal controls queues 320-383 (5 * 64 through (5+1) * 64 - 1)
125    /// ```
126    #[inline(always)]
127    pub fn index(&self) -> u64 {
128        return self.inner.index;
129    }
130
131    /// Returns a reference to the underlying atomic value.
132    ///
133    /// Provides direct access to the 64-bit bitmap for advanced use cases
134    /// that need custom atomic operations beyond the provided methods.
135    ///
136    /// # Use Cases
137    ///
138    /// - Custom bit manipulation patterns
139    /// - Debugging (observing raw bitmap state)
140    /// - Integration with external synchronization primitives
141    ///
142    /// # Example
143    ///
144    /// ```ignore
145    /// let signal = Signal::new();
146    /// signal.set(10);
147    /// let raw_value = signal.value().load(Ordering::Relaxed);
148    /// assert_eq!(raw_value & (1 << 10), 1 << 10);  // Bit 10 is set
149    /// ```
150    #[inline(always)]
151    pub fn value(&self) -> &AtomicU64 {
152        &self.inner.value
153    }
154}
155
156/// Internal state for a Signal, cache-line padded to prevent false sharing.
157///
158/// # Fields
159///
160/// - `index`: The signal's position in the SignalWaker's signal array (0-61)
161/// - `value`: 64-bit atomic bitmap where each bit represents a queue's readiness
162struct SignalInner {
163    /// Signal index in the SignalWaker array.
164    pub index: u64,
165    /// Atomic bitmap tracking up to 64 queues (bit N = queue ready state).
166    value: AtomicU64,
167}
168
169impl Signal {
170    /// Creates a new Signal with index 0 and all bits cleared.
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// let signal = Signal::new();
176    /// assert_eq!(signal.index(), 0);
177    /// assert!(signal.is_empty());
178    /// ```
179    pub fn new() -> Self {
180        Self::with_value(0, 0)
181    }
182
183    /// Creates a new Signal with the specified index and all bits cleared.
184    ///
185    /// # Parameters
186    ///
187    /// - `index`: Position in the SignalWaker's signal array (0-61)
188    ///
189    /// # Example
190    ///
191    /// ```ignore
192    /// let signal = Signal::with_index(10);
193    /// assert_eq!(signal.index(), 10);
194    /// assert!(signal.is_empty());
195    /// ```
196    pub fn with_index(index: u64) -> Self {
197        debug_assert!(
198            index < STATUS_SUMMARY_WORDS as u64,
199            "signal index {} exceeds status summary capacity {}",
200            index,
201            STATUS_SUMMARY_WORDS
202        );
203        Self::with_value(index, 0)
204    }
205
206    /// Creates a new Signal with the specified index and initial bitmap value.
207    ///
208    /// This is primarily used for testing or restoring state. In normal operation,
209    /// signals start with all bits cleared.
210    ///
211    /// # Parameters
212    ///
213    /// - `index`: Position in the SignalWaker's signal array (0-61)
214    /// - `value`: Initial 64-bit bitmap value
215    ///
216    /// # Example
217    ///
218    /// ```ignore
219    /// // Create signal with bits 0, 5, and 10 already set
220    /// let signal = Signal::with_value(3, (1 << 0) | (1 << 5) | (1 << 10));
221    /// assert_eq!(signal.size(), 3);
222    /// assert!(signal.is_set(0));
223    /// assert!(signal.is_set(5));
224    /// assert!(signal.is_set(10));
225    /// ```
226    pub fn with_value(index: u64, value: u64) -> Self {
227        debug_assert!(
228            index < STATUS_SUMMARY_WORDS as u64,
229            "signal index {} exceeds status summary capacity {}",
230            index,
231            STATUS_SUMMARY_WORDS
232        );
233        Self {
234            inner: Arc::new(CachePadded::new(SignalInner {
235                index,
236                value: AtomicU64::new(value),
237            })),
238        }
239    }
240
241    /// Loads the current bitmap value with the specified memory ordering.
242    ///
243    /// # Parameters
244    ///
245    /// - `ordering`: Memory ordering for the load operation
246    ///
247    /// # Returns
248    ///
249    /// The 64-bit bitmap value where each set bit represents a ready queue.
250    ///
251    /// # Example
252    ///
253    /// ```ignore
254    /// signal.set(5);
255    /// signal.set(10);
256    /// let value = signal.load(Ordering::Acquire);
257    /// assert_eq!(value, (1 << 5) | (1 << 10));
258    /// ```
259    #[inline(always)]
260    pub fn load(&self, ordering: Ordering) -> u64 {
261        self.inner.value.load(ordering)
262    }
263
264    /// Returns the number of set bits (ready queues) in this signal.
265    ///
266    /// Equivalent to `popcount(bitmap)`, this counts how many queues in this
267    /// signal group currently have work available.
268    ///
269    /// # Performance
270    ///
271    /// Uses the `POPCNT` instruction on x86_64 (~3 cycles), making it very efficient.
272    ///
273    /// # Example
274    ///
275    /// ```ignore
276    /// let signal = Signal::new();
277    /// signal.set(0);
278    /// signal.set(5);
279    /// signal.set(63);
280    /// assert_eq!(signal.size(), 3);
281    /// ```
282    #[inline(always)]
283    pub fn size(&self) -> u64 {
284        self.load(Ordering::Relaxed).count_ones() as u64
285    }
286
287    /// Returns `true` if no bits are set (no ready queues).
288    ///
289    /// This is more efficient than `size() == 0` for checking emptiness.
290    ///
291    /// # Example
292    ///
293    /// ```ignore
294    /// let signal = Signal::new();
295    /// assert!(signal.is_empty());
296    /// signal.set(10);
297    /// assert!(!signal.is_empty());
298    /// ```
299    #[inline(always)]
300    pub fn is_empty(&self) -> bool {
301        self.load(Ordering::Relaxed).count_ones() == 0
302    }
303
304    /// Atomically sets a bit in the bitmap using fetch_or.
305    ///
306    /// This is the primary method for producers to signal that a queue has work available.
307    ///
308    /// # Parameters
309    ///
310    /// - `index`: Bit position to set (0-63)
311    ///
312    /// # Returns
313    ///
314    /// A tuple `(was_empty, was_set)`:
315    /// - `was_empty`: `true` if this was the first bit set (signal transitioned from empty to non-empty)
316    /// - `was_set`: `true` if the bit was successfully set (wasn't already set)
317    ///
318    /// # Use Cases
319    ///
320    /// The return values are used for summary bitmap updates:
321    /// ```ignore
322    /// let (was_empty, was_set) = signal.set(queue_bit);
323    /// if was_empty && was_set {
324    ///     // This signal was empty, now has work - update summary
325    ///     waker.mark_active(signal.index());
326    /// }
327    /// ```
328    ///
329    /// # Performance
330    ///
331    /// ~5-10 ns (one atomic fetch_or operation)
332    ///
333    /// # Example
334    ///
335    /// ```ignore
336    /// let signal = Signal::new();
337    /// let (was_empty, was_set) = signal.set(5);
338    /// assert!(was_empty);  // Signal was empty
339    /// assert!(was_set);    // Bit 5 was not previously set
340    ///
341    /// let (was_empty, was_set) = signal.set(5);
342    /// assert!(!was_empty); // Signal already had bits set
343    /// assert!(!was_set);   // Bit 5 was already set
344    /// ```
345    #[inline(always)]
346    pub fn set(&self, index: u64) -> (bool, bool) {
347        crate::bits::set(&self.inner.value, index)
348    }
349
350    /// Atomically sets a bit using a precomputed bitmask.
351    ///
352    /// Similar to `set()`, but takes a precomputed `1 << index` value for cases
353    /// where the bit position is computed once and reused.
354    ///
355    /// # Parameters
356    ///
357    /// - `bit`: Precomputed bitmask with exactly one bit set (e.g., `1 << 5`)
358    ///
359    /// # Returns
360    ///
361    /// The previous bitmap value before setting the bit.
362    ///
363    /// # Example
364    ///
365    /// ```ignore
366    /// let signal = Signal::new();
367    /// let bit_mask = 1u64 << 10;
368    /// let prev = signal.set_with_bit(bit_mask);
369    /// assert_eq!(prev, 0);  // Was empty
370    /// assert!(signal.is_set(10));
371    /// ```
372    #[inline(always)]
373    pub fn set_with_bit(&self, bit: u64) -> u64 {
374        crate::bits::set_with_bit(&self.inner.value, bit)
375    }
376
377    /// Atomically clears a bit if it is currently set (CAS-based).
378    ///
379    /// This is the primary method for the executor to claim ownership of a ready queue.
380    /// Uses a CAS loop to ensure the bit is cleared atomically.
381    ///
382    /// # Parameters
383    ///
384    /// - `index`: Bit position to clear (0-63)
385    ///
386    /// # Returns
387    ///
388    /// - `true`: Bit was set and has been successfully cleared (queue acquired)
389    /// - `false`: Bit was not set (queue not ready or already acquired)
390    ///
391    /// # Use Cases
392    ///
393    /// ```ignore
394    /// // Executor loop
395    /// if signal.acquire(queue_bit) {
396    ///     // Successfully acquired queue, process it
397    ///     process_queue(queue_id);
398    /// }
399    /// ```
400    ///
401    /// # Performance
402    ///
403    /// ~10-20 ns (CAS loop, typically succeeds on first iteration)
404    ///
405    /// # Example
406    ///
407    /// ```ignore
408    /// let signal = Signal::new();
409    /// signal.set(5);
410    /// assert!(signal.acquire(5));  // Successfully cleared bit 5
411    /// assert!(!signal.acquire(5)); // Bit already clear, returns false
412    /// ```
413    #[inline(always)]
414    pub fn acquire(&self, index: u64) -> bool {
415        crate::bits::acquire(&self.inner.value, index)
416    }
417
418    /// Attempts to atomically clear a bit, returning detailed state information.
419    ///
420    /// Similar to `acquire()`, but provides additional information about the
421    /// before/after state of the bitmap, useful for debugging or advanced scheduling.
422    ///
423    /// # Parameters
424    ///
425    /// - `index`: Bit position to clear (0-63)
426    ///
427    /// # Returns
428    ///
429    /// A tuple `(before, after, success)`:
430    /// - `before`: Bitmap value before the operation
431    /// - `after`: Bitmap value after the operation (if successful)
432    /// - `success`: `true` if the bit was cleared, `false` if it wasn't set
433    ///
434    /// # Example
435    ///
436    /// ```ignore
437    /// let signal = Signal::with_value(0, 0b101010);  // Bits 1, 3, 5 set
438    /// let (before, after, success) = signal.try_acquire(3);
439    /// assert_eq!(before, 0b101010);
440    /// assert_eq!(after, 0b100010);   // Bit 3 cleared
441    /// assert!(success);
442    /// ```
443    #[inline(always)]
444    pub fn try_acquire(&self, index: u64) -> (u64, u64, bool) {
445        crate::bits::try_acquire(&self.inner.value, index)
446    }
447
448    /// Checks if a specific bit is set without modifying the bitmap.
449    ///
450    /// Non-atomic read followed by bit test. Suitable for non-critical checks
451    /// where races are acceptable.
452    ///
453    /// # Parameters
454    ///
455    /// - `index`: Bit position to check (0-63)
456    ///
457    /// # Returns
458    ///
459    /// `true` if the bit is set, `false` otherwise.
460    ///
461    /// # Example
462    ///
463    /// ```ignore
464    /// let signal = Signal::new();
465    /// assert!(!signal.is_set(5));
466    /// signal.set(5);
467    /// assert!(signal.is_set(5));
468    /// ```
469    #[inline(always)]
470    pub fn is_set(&self, index: u64) -> bool {
471        crate::bits::is_set(&self.inner.value, index)
472    }
473}
474
475// ──────────────────────────────────────────────────────────────────────────────
476// SignalGate State Machine Constants
477// ──────────────────────────────────────────────────────────────────────────────
478
479/// Queue has no work scheduled and is not being processed.
480///
481/// This is the initial state. Transitions to SCHEDULED when work is enqueued.
482pub const IDLE: u8 = 0;
483
484/// Queue has work available and is waiting for the executor to process it.
485///
486/// Transitions from IDLE when `schedule()` is called. Transitions to EXECUTING
487/// when the executor calls `begin()`.
488pub const SCHEDULED: u8 = 1;
489
490/// Queue is currently being processed by the executor.
491///
492/// Transitions from SCHEDULED when executor calls `begin()`. Transitions back to
493/// IDLE (via `finish()`) or SCHEDULED (via `finish_and_schedule()`) when processing completes.
494pub const EXECUTING: u8 = 2;
495
496/// Per-queue gate coordinating scheduling between producers and executor.
497///
498/// `SignalGate` implements a lock-free state machine that prevents redundant scheduling
499/// and ensures proper handoff of work from producers to the executor. Each queue has
500/// exactly one `SignalGate` instance.
501///
502/// # State Machine
503///
504/// ```text
505/// ┌──────────────────────────────────────────────────────────────┐
506/// │                                                              │
507/// │  IDLE (0)  ──schedule()──▶  SCHEDULED (1)  ──begin()──▶  EXECUTING (2)
508/// │     ▲                            │                           │
509/// │     │                            │                           │
510/// │     └────────finish()────────────┴───────────────────────────┘
511/// │                                  │                           │
512/// │                                  └──finish_and_schedule()────┘
513/// │                                              │               │
514/// │                                              ▼               │
515/// │                                         SCHEDULED (1)        │
516/// └──────────────────────────────────────────────────────────────┘
517/// ```
518///
519/// # State Transitions
520///
521/// - **IDLE → SCHEDULED**: Producer calls `schedule()` after enqueuing items
522/// - **SCHEDULED → EXECUTING**: Executor calls `begin()` before processing
523/// - **EXECUTING → IDLE**: Executor calls `finish()` when done (queue empty)
524/// - **EXECUTING → SCHEDULED**: Executor calls `finish_and_schedule()` when more work remains
525/// - **Any → SCHEDULED**: Concurrent `schedule()` during EXECUTING sets flag, processed in `finish()`
526///
527/// # Concurrency Guarantees
528///
529/// - **Multiple producers**: Safe (atomic flags ensure only one schedule succeeds)
530/// - **Producer + executor**: Safe (state transitions are atomic and properly ordered)
531/// - **Multiple executors**: NOT SAFE (single-threaded consumption assumption)
532///
533/// # Integration with Signal and SignalWaker
534///
535/// When a queue transitions IDLE → SCHEDULED:
536/// 1. Sets bit in the associated `Signal` (64-bit bitmap)
537/// 2. If signal was empty, sets bit in `SignalWaker` summary (64-bit bitmap)
538/// 3. May wake sleeping executor thread via permit system
539///
540/// # Memory Layout
541///
542/// ```text
543/// SignalGate (40 bytes on x86_64)
544/// ┌─────────────┬───────────┬─────────┬─────────┐
545/// │ flags (1B)  │ bit_index │ signal  │ waker   │
546/// │ AtomicU8    │ u64 (8B)  │ Arc (8B)│ Arc (8B)│
547/// └─────────────┴───────────┴─────────┴─────────┘
548/// ```
549///
550/// # Example Usage
551///
552/// ```ignore
553/// // Setup
554/// let waker = Arc::new(SignalWaker::new());
555/// let signal = Signal::with_index(5);
556/// let gate = SignalGate::new(10, signal, waker);
557///
558/// // Producer thread
559/// queue.try_push(item)?;
560/// gate.schedule();  // Signal work available
561///
562/// // Executor thread
563/// gate.begin();     // Mark as executing
564/// while let Some(item) = queue.try_pop() {
565///     process(item);
566/// }
567/// if queue.is_empty() {
568///     gate.finish();  // Done, back to IDLE
569/// } else {
570///     gate.finish_and_schedule();  // More work, stay SCHEDULED
571/// }
572/// ```
573pub struct SignalGate {
574    /// Atomic state flags (IDLE, SCHEDULED, EXECUTING).
575    ///
576    /// Uses bitwise OR to combine flags, allowing detection of concurrent schedules
577    /// during execution (EXECUTING | SCHEDULED = 3).
578    flags: AtomicU8,
579
580    /// Bit position within the Signal's 64-bit bitmap (0-63).
581    ///
582    /// This queue's ready state is represented by bit `1 << bit_index` in the signal.
583    bit_index: u8,
584
585    /// Reference to the Signal word containing this queue's bit.
586    ///
587    /// Shared among up to 64 queues (all queues in the same signal group).
588    signal: Signal,
589
590    /// Reference to the top-level SignalWaker for summary updates.
591    ///
592    /// Shared among all queues in the executor (up to 3,968 queues).
593    waker: Arc<WorkerWaker>,
594}
595
596impl SignalGate {
597    /// Creates a new SignalGate in the IDLE state.
598    ///
599    /// # Parameters
600    ///
601    /// - `bit_index`: Position of this queue's bit within the signal (0-63)
602    /// - `signal`: Reference to the Signal word containing this queue's bit
603    /// - `waker`: Reference to the SignalWaker for summary updates
604    ///
605    /// # Example
606    ///
607    /// ```ignore
608    /// let waker = Arc::new(SignalWaker::new());
609    /// let signal = Signal::with_index(0);
610    /// let gate = SignalGate::new(5, signal, waker);
611    /// // This gate controls bit 5 in signal[0]
612    /// ```
613    pub fn new(bit_index: u8, signal: Signal, waker: Arc<WorkerWaker>) -> Self {
614        Self {
615            flags: AtomicU8::new(IDLE),
616            bit_index,
617            signal,
618            waker,
619        }
620    }
621
622    /// Attempts to schedule this queue for execution (IDLE → SCHEDULED transition).
623    ///
624    /// Called by producers after enqueuing items to notify the executor. Uses atomic
625    /// operations to ensure only one successful schedule per work batch.
626    ///
627    /// # Algorithm
628    ///
629    /// 1. **Fast check**: If already SCHEDULED, return false immediately (idempotent)
630    /// 2. **Atomic set**: `fetch_or(SCHEDULED)` to set the SCHEDULED flag
631    /// 3. **State check**: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
632    ///    - Set bit in signal word via `signal.set(bit_index)`
633    ///    - If signal transitioned from empty, update summary via `waker.mark_active()`
634    ///    - Return true (successful schedule)
635    /// 4. **Otherwise**: Return false (already scheduled or executing)
636    ///
637    /// # Returns
638    ///
639    /// - `true`: Successfully transitioned from IDLE to SCHEDULED (work will be processed)
640    /// - `false`: Already scheduled/executing, or concurrent schedule won (idempotent)
641    ///
642    /// # Concurrent Behavior
643    ///
644    /// - **Multiple producers**: Only the first `schedule()` succeeds (returns true)
645    /// - **During EXECUTING**: Sets SCHEDULED flag, which `finish()` will detect and reschedule
646    ///
647    /// # Memory Ordering
648    ///
649    /// - Initial load: `Acquire` (see latest state)
650    /// - `fetch_or`: `Release` (publish enqueued items to executor)
651    ///
652    /// # Performance
653    ///
654    /// - **Already scheduled**: ~2-3 ns (fast path, single atomic load)
655    /// - **Successful schedule**: ~10-20 ns (fetch_or + signal update + potential summary update)
656    ///
657    /// # Example
658    ///
659    /// ```ignore
660    /// // Producer 1
661    /// queue.try_push(item)?;
662    /// if gate.schedule() {
663    ///     println!("Successfully scheduled");  // First producer
664    /// }
665    ///
666    /// // Producer 2 (concurrent)
667    /// queue.try_push(another_item)?;
668    /// if !gate.schedule() {
669    ///     println!("Already scheduled");  // Idempotent, no action needed
670    /// }
671    /// ```
672    #[inline(always)]
673    pub fn schedule(&self) -> bool {
674        if (self.flags.load(Ordering::Acquire) & SCHEDULED) != IDLE {
675            return false;
676        }
677
678        let previous_flags = self.flags.fetch_or(SCHEDULED, Ordering::Release);
679        let scheduled_nor_executing = (previous_flags & (SCHEDULED | EXECUTING)) == IDLE;
680
681        if scheduled_nor_executing {
682            let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
683            if was_empty && was_set {
684                self.waker.mark_active(self.signal.index());
685            }
686            true
687        } else {
688            false
689        }
690    }
691
692    /// Marks the queue as EXECUTING (SCHEDULED → EXECUTING transition).
693    ///
694    /// Called by the executor when it begins processing this queue. This transition
695    /// prevents redundant scheduling while work is being processed.
696    ///
697    /// # State Transition
698    ///
699    /// Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.
700    /// ```text
701    /// Before: SCHEDULED (1)
702    /// After:  EXECUTING (2)
703    /// ```
704    ///
705    /// If a producer calls `schedule()` after `begin()` but before `finish()`, the
706    /// SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED),
707    /// which `finish()` detects and handles.
708    ///
709    /// # Memory Ordering
710    ///
711    /// Uses `Ordering::Release` to ensure the state change is visible to concurrent
712    /// producers calling `schedule()`.
713    ///
714    /// # Performance
715    ///
716    /// ~1-2 ns (single atomic store)
717    ///
718    /// # Example
719    ///
720    /// ```ignore
721    /// // Executor discovers ready queue
722    /// if signal.acquire(queue_bit) {
723    ///     gate.begin();  // Mark as executing
724    ///     process_queue();
725    ///     gate.finish();
726    /// }
727    /// ```
728    #[inline(always)]
729    pub fn mark(&self) {
730        self.flags.store(EXECUTING, Ordering::Release);
731    }
732
733    /// Marks the queue as IDLE and handles concurrent schedules (EXECUTING → IDLE/SCHEDULED).
734    ///
735    /// Called by the executor after processing a batch of items. Automatically detects
736    /// if new work arrived during processing (SCHEDULED flag set concurrently) and
737    /// reschedules if needed.
738    ///
739    /// # Algorithm
740    ///
741    /// 1. **Clear EXECUTING**: `fetch_sub(EXECUTING)` atomically transitions to IDLE
742    /// 2. **Check SCHEDULED**: If the SCHEDULED flag is set in the result:
743    ///    - Means a producer called `schedule()` during execution
744    ///    - Re-set the signal bit to ensure executor sees the work
745    ///    - Queue remains/becomes SCHEDULED
746    ///
747    /// # Automatic Rescheduling
748    ///
749    /// This method implements a key correctness property: if a producer enqueues work
750    /// while the executor is processing, that work will not be lost. The SCHEDULED flag
751    /// acts as a handoff mechanism.
752    ///
753    /// ```text
754    /// Timeline:
755    /// T0: Executor calls begin()           → EXECUTING (2)
756    /// T1: Producer calls schedule()        → EXECUTING | SCHEDULED (3)
757    /// T2: Executor calls finish()          → SCHEDULED (1) [bit re-set in signal]
758    /// T3: Executor sees bit, processes     → ...
759    /// ```
760    ///
761    /// # Memory Ordering
762    ///
763    /// Uses `Ordering::AcqRel`:
764    /// - **Acquire**: See all producer writes (enqueued items)
765    /// - **Release**: Publish state transition to future readers
766    ///
767    /// # Performance
768    ///
769    /// - **No concurrent schedule**: ~2-3 ns (fetch_sub only)
770    /// - **With concurrent schedule**: ~10-15 ns (fetch_sub + signal.set)
771    ///
772    /// # Example
773    ///
774    /// ```ignore
775    /// gate.begin();
776    /// while let Some(item) = queue.try_pop() {
777    ///     process(item);
778    /// }
779    /// gate.finish();  // Automatically reschedules if more work arrived
780    /// ```
781    #[inline(always)]
782    pub fn unmark(&self) {
783        let after_flags = self.flags.fetch_sub(EXECUTING, Ordering::AcqRel);
784        if after_flags & SCHEDULED != IDLE {
785            self.signal.set(self.bit_index as u64);
786        }
787    }
788
789    /// Atomically marks the queue as SCHEDULED, ensuring re-execution.
790    ///
791    /// Called by the executor when it knows more work exists but wants to yield the
792    /// timeslice for fairness. This is an optimization over `finish()` followed by
793    /// external `schedule()`.
794    ///
795    /// # Use Cases
796    ///
797    /// 1. **Batch size limiting**: Process N items, then yield to other queues
798    /// 2. **Fairness**: Prevent queue starvation by rotating execution
799    /// 3. **Latency control**: Ensure all queues get regular timeslices
800    ///
801    /// # Algorithm
802    ///
803    /// 1. **Set state**: Store SCHEDULED unconditionally
804    /// 2. **Update signal**: Set bit in signal word
805    /// 3. **Update summary**: If signal was empty, mark active in waker
806    ///
807    /// # Comparison with finish() + schedule()
808    ///
809    /// ```ignore
810    /// // Separate calls (2 atomic ops)
811    /// gate.finish();      // EXECUTING → IDLE
812    /// gate.schedule();    // IDLE → SCHEDULED
813    ///
814    /// // Combined call (1 atomic op + signal update)
815    /// gate.finish_and_schedule();  // EXECUTING → SCHEDULED
816    /// ```
817    ///
818    /// # Memory Ordering
819    ///
820    /// Uses `Ordering::Release` to publish both the state change and enqueued items.
821    ///
822    /// # Performance
823    ///
824    /// ~10-15 ns (store + signal.set + potential summary update)
825    ///
826    /// # Example
827    ///
828    /// ```ignore
829    /// gate.begin();
830    /// let mut processed = 0;
831    /// while processed < BATCH_SIZE {
832    ///     if let Some(item) = queue.try_pop() {
833    ///         process(item);
834    ///         processed += 1;
835    ///     } else {
836    ///         break;
837    ///     }
838    /// }
839    ///
840    /// if queue.len() > 0 {
841    ///     gate.finish_and_schedule();  // More work, stay scheduled
842    /// } else {
843    ///     gate.finish();  // Done, go idle
844    /// }
845    /// ```
846    #[inline(always)]
847    pub fn unmark_and_schedule(&self) {
848        self.flags.store(SCHEDULED, Ordering::Release);
849        let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
850        if was_empty && was_set {
851            self.waker.mark_active(self.signal.index());
852        }
853    }
854}
855
856impl SignalSchedule for SignalGate {
857    fn schedule(&self) -> bool {
858        SignalGate::schedule(self)
859    }
860
861    fn mark(&self) {
862        SignalGate::mark(self);
863    }
864
865    fn unmark(&self) {
866        SignalGate::unmark(self);
867    }
868
869    fn unmark_and_schedule(&self) {
870        SignalGate::unmark_and_schedule(self);
871    }
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877}