maniac_runtime/runtime/signal.rs
1//! Signal coordination primitives for lock-free work scheduling.
2//!
3//! This module provides the building blocks for coordinating work between producers and
4//! an executor in a lock-free manner. The core abstraction is a two-level bitmap system
5//! where each bit represents a queue that has work available.
6//!
7//! # Architecture
8//!
9//! ```text
10//! SignalWaker (Status Layer)
11//! ┌──────────────────────────────────────────────┐
12//! │ Status: [bit0, bit1, ..., bit61, flags...] │ 62 queue bits + control flags
13//! └──────────────────────────────────────────────┘
14//! │ │ │
15//! ▼ ▼ ▼
16//! Signal[0] Signal[1] ... Signal[61] 62 signal words
17//! [64 bits] [64 bits] [64 bits] (one per group of 64 queues)
18//! │ │ │ │ │ │
19//! ▼ ▼ ▼ ▼ ▼ ▼
20//! Queue Queue Queue Queue ... Queue Queue Up to 3,968 queues total
21//! 0 1 64 65 3902 3903
22//! ```
23//!
24//! # Components
25//!
26//! ## Signal
27//!
28//! A cache-line padded 64-bit atomic bitmap representing up to 64 queues. Each bit
29//! corresponds to a queue's scheduled state. Setting a bit indicates the queue has
30//! work available and should be processed by the executor.
31//!
32//! ## SignalGate
33//!
34//! A per-queue coordination structure that manages the state machine for scheduling:
35//! - **IDLE**: Queue has no work scheduled
36//! - **SCHEDULED**: Queue has work and is waiting for executor
37//! - **EXECUTING**: Queue is currently being processed
38//!
39//! The state machine prevents redundant scheduling and ensures proper handoff between
40//! producer and executor.
41//!
42//! # Integration
43//!
44//! This module integrates with `SignalWaker` (waker.rs) to provide a complete
45//! work-stealing scheduler. Producers call `SignalGate::schedule()` to enqueue work,
46//! and the executor uses `SignalWaker` to discover and process ready queues.
47//!
48//! # Constants
49//!
50//! The design is optimized for x86_64 cache line sizes and common executor configurations.
51
52use super::waker::{STATUS_SUMMARY_WORDS, WorkerWaker};
53use crate::spsc::SignalSchedule;
54use crate::utils::CachePadded;
55use std::sync::Arc;
56use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
57
58/// Number of bits per signal word (64-bit atomic).
59///
60/// Each signal word can track up to 64 queues. This matches the width of
61/// `AtomicU64` and provides efficient bit manipulation via hardware instructions
62/// (POPCNT, BSF, etc.).
63pub const SIGNAL_CAPACITY: u64 = 64;
64
65/// Bitmask for extracting bit index within a signal word.
66///
67/// Equivalent to `index % 64`, used for fast modulo via bitwise AND:
68/// ```ignore
69/// bit_index = queue_id & SIGNAL_MASK;
70/// ```
71pub const SIGNAL_MASK: u64 = SIGNAL_CAPACITY - 1;
72
73/// A cache-line padded 64-bit atomic bitmap for tracking queue readiness.
74///
75/// Each `Signal` represents a group of up to 64 queues, where each bit indicates
76/// whether the corresponding queue has work available. Multiple `Signal` instances
77/// are coordinated via a `SignalWaker` to form a complete two-level bitmap.
78///
79/// # Design
80///
81/// ```text
82/// Signal (64-bit AtomicU64)
83/// ┌───┬───┬───┬───┬─────┬───┐
84/// │ 0 │ 1 │ 0 │ 1 │ ... │ 0 │ Each bit = one queue's scheduled state
85/// └───┴───┴───┴───┴─────┴───┘
86/// Q0 Q1 Q2 Q3 ... Q63
87/// ```
88///
89/// # Cache Optimization
90///
91/// The inner state is wrapped in `Arc<CachePadded<...>>` to:
92/// - Allow cheap cloning (single pointer copy)
93/// - Prevent false sharing between different signals
94/// - Optimize for hot paths (producers setting bits, executor clearing bits)
95///
96/// # Thread Safety
97///
98/// All operations use atomic instructions. Multiple producers can concurrently set
99/// bits (via `set()`), and the executor can concurrently acquire/clear bits (via
100/// `acquire()` or `try_acquire()`).
101///
102/// # Cloning
103///
104/// `Signal` is cheaply clonable via `Arc`. All clones share the same underlying
105/// atomic bitmap, making it suitable for distribution across multiple producer threads.
106#[derive(Clone)]
107pub struct Signal {
108 /// Shared, cache-line padded inner state.
109 inner: Arc<CachePadded<SignalInner>>,
110}
111
112impl Signal {
113 /// Returns the signal's index within the SignalWaker's signal array.
114 ///
115 /// This index is used to:
116 /// - Map this signal to the corresponding bit in the summary bitmap
117 /// - Identify which group of 64 queues this signal represents
118 ///
119 /// # Example
120 ///
121 /// ```ignore
122 /// let signal = Signal::with_index(5);
123 /// assert_eq!(signal.index(), 5);
124 /// // This signal controls queues 320-383 (5 * 64 through (5+1) * 64 - 1)
125 /// ```
126 #[inline(always)]
127 pub fn index(&self) -> u64 {
128 return self.inner.index;
129 }
130
131 /// Returns a reference to the underlying atomic value.
132 ///
133 /// Provides direct access to the 64-bit bitmap for advanced use cases
134 /// that need custom atomic operations beyond the provided methods.
135 ///
136 /// # Use Cases
137 ///
138 /// - Custom bit manipulation patterns
139 /// - Debugging (observing raw bitmap state)
140 /// - Integration with external synchronization primitives
141 ///
142 /// # Example
143 ///
144 /// ```ignore
145 /// let signal = Signal::new();
146 /// signal.set(10);
147 /// let raw_value = signal.value().load(Ordering::Relaxed);
148 /// assert_eq!(raw_value & (1 << 10), 1 << 10); // Bit 10 is set
149 /// ```
150 #[inline(always)]
151 pub fn value(&self) -> &AtomicU64 {
152 &self.inner.value
153 }
154}
155
156/// Internal state for a Signal, cache-line padded to prevent false sharing.
157///
158/// # Fields
159///
160/// - `index`: The signal's position in the SignalWaker's signal array (0-61)
161/// - `value`: 64-bit atomic bitmap where each bit represents a queue's readiness
162struct SignalInner {
163 /// Signal index in the SignalWaker array.
164 pub index: u64,
165 /// Atomic bitmap tracking up to 64 queues (bit N = queue ready state).
166 value: AtomicU64,
167}
168
169impl Signal {
170 /// Creates a new Signal with index 0 and all bits cleared.
171 ///
172 /// # Example
173 ///
174 /// ```ignore
175 /// let signal = Signal::new();
176 /// assert_eq!(signal.index(), 0);
177 /// assert!(signal.is_empty());
178 /// ```
179 pub fn new() -> Self {
180 Self::with_value(0, 0)
181 }
182
183 /// Creates a new Signal with the specified index and all bits cleared.
184 ///
185 /// # Parameters
186 ///
187 /// - `index`: Position in the SignalWaker's signal array (0-61)
188 ///
189 /// # Example
190 ///
191 /// ```ignore
192 /// let signal = Signal::with_index(10);
193 /// assert_eq!(signal.index(), 10);
194 /// assert!(signal.is_empty());
195 /// ```
196 pub fn with_index(index: u64) -> Self {
197 debug_assert!(
198 index < STATUS_SUMMARY_WORDS as u64,
199 "signal index {} exceeds status summary capacity {}",
200 index,
201 STATUS_SUMMARY_WORDS
202 );
203 Self::with_value(index, 0)
204 }
205
206 /// Creates a new Signal with the specified index and initial bitmap value.
207 ///
208 /// This is primarily used for testing or restoring state. In normal operation,
209 /// signals start with all bits cleared.
210 ///
211 /// # Parameters
212 ///
213 /// - `index`: Position in the SignalWaker's signal array (0-61)
214 /// - `value`: Initial 64-bit bitmap value
215 ///
216 /// # Example
217 ///
218 /// ```ignore
219 /// // Create signal with bits 0, 5, and 10 already set
220 /// let signal = Signal::with_value(3, (1 << 0) | (1 << 5) | (1 << 10));
221 /// assert_eq!(signal.size(), 3);
222 /// assert!(signal.is_set(0));
223 /// assert!(signal.is_set(5));
224 /// assert!(signal.is_set(10));
225 /// ```
226 pub fn with_value(index: u64, value: u64) -> Self {
227 debug_assert!(
228 index < STATUS_SUMMARY_WORDS as u64,
229 "signal index {} exceeds status summary capacity {}",
230 index,
231 STATUS_SUMMARY_WORDS
232 );
233 Self {
234 inner: Arc::new(CachePadded::new(SignalInner {
235 index,
236 value: AtomicU64::new(value),
237 })),
238 }
239 }
240
241 /// Loads the current bitmap value with the specified memory ordering.
242 ///
243 /// # Parameters
244 ///
245 /// - `ordering`: Memory ordering for the load operation
246 ///
247 /// # Returns
248 ///
249 /// The 64-bit bitmap value where each set bit represents a ready queue.
250 ///
251 /// # Example
252 ///
253 /// ```ignore
254 /// signal.set(5);
255 /// signal.set(10);
256 /// let value = signal.load(Ordering::Acquire);
257 /// assert_eq!(value, (1 << 5) | (1 << 10));
258 /// ```
259 #[inline(always)]
260 pub fn load(&self, ordering: Ordering) -> u64 {
261 self.inner.value.load(ordering)
262 }
263
264 /// Returns the number of set bits (ready queues) in this signal.
265 ///
266 /// Equivalent to `popcount(bitmap)`, this counts how many queues in this
267 /// signal group currently have work available.
268 ///
269 /// # Performance
270 ///
271 /// Uses the `POPCNT` instruction on x86_64 (~3 cycles), making it very efficient.
272 ///
273 /// # Example
274 ///
275 /// ```ignore
276 /// let signal = Signal::new();
277 /// signal.set(0);
278 /// signal.set(5);
279 /// signal.set(63);
280 /// assert_eq!(signal.size(), 3);
281 /// ```
282 #[inline(always)]
283 pub fn size(&self) -> u64 {
284 self.load(Ordering::Relaxed).count_ones() as u64
285 }
286
287 /// Returns `true` if no bits are set (no ready queues).
288 ///
289 /// This is more efficient than `size() == 0` for checking emptiness.
290 ///
291 /// # Example
292 ///
293 /// ```ignore
294 /// let signal = Signal::new();
295 /// assert!(signal.is_empty());
296 /// signal.set(10);
297 /// assert!(!signal.is_empty());
298 /// ```
299 #[inline(always)]
300 pub fn is_empty(&self) -> bool {
301 self.load(Ordering::Relaxed).count_ones() == 0
302 }
303
304 /// Atomically sets a bit in the bitmap using fetch_or.
305 ///
306 /// This is the primary method for producers to signal that a queue has work available.
307 ///
308 /// # Parameters
309 ///
310 /// - `index`: Bit position to set (0-63)
311 ///
312 /// # Returns
313 ///
314 /// A tuple `(was_empty, was_set)`:
315 /// - `was_empty`: `true` if this was the first bit set (signal transitioned from empty to non-empty)
316 /// - `was_set`: `true` if the bit was successfully set (wasn't already set)
317 ///
318 /// # Use Cases
319 ///
320 /// The return values are used for summary bitmap updates:
321 /// ```ignore
322 /// let (was_empty, was_set) = signal.set(queue_bit);
323 /// if was_empty && was_set {
324 /// // This signal was empty, now has work - update summary
325 /// waker.mark_active(signal.index());
326 /// }
327 /// ```
328 ///
329 /// # Performance
330 ///
331 /// ~5-10 ns (one atomic fetch_or operation)
332 ///
333 /// # Example
334 ///
335 /// ```ignore
336 /// let signal = Signal::new();
337 /// let (was_empty, was_set) = signal.set(5);
338 /// assert!(was_empty); // Signal was empty
339 /// assert!(was_set); // Bit 5 was not previously set
340 ///
341 /// let (was_empty, was_set) = signal.set(5);
342 /// assert!(!was_empty); // Signal already had bits set
343 /// assert!(!was_set); // Bit 5 was already set
344 /// ```
345 #[inline(always)]
346 pub fn set(&self, index: u64) -> (bool, bool) {
347 crate::bits::set(&self.inner.value, index)
348 }
349
350 /// Atomically sets a bit using a precomputed bitmask.
351 ///
352 /// Similar to `set()`, but takes a precomputed `1 << index` value for cases
353 /// where the bit position is computed once and reused.
354 ///
355 /// # Parameters
356 ///
357 /// - `bit`: Precomputed bitmask with exactly one bit set (e.g., `1 << 5`)
358 ///
359 /// # Returns
360 ///
361 /// The previous bitmap value before setting the bit.
362 ///
363 /// # Example
364 ///
365 /// ```ignore
366 /// let signal = Signal::new();
367 /// let bit_mask = 1u64 << 10;
368 /// let prev = signal.set_with_bit(bit_mask);
369 /// assert_eq!(prev, 0); // Was empty
370 /// assert!(signal.is_set(10));
371 /// ```
372 #[inline(always)]
373 pub fn set_with_bit(&self, bit: u64) -> u64 {
374 crate::bits::set_with_bit(&self.inner.value, bit)
375 }
376
377 /// Atomically clears a bit if it is currently set (CAS-based).
378 ///
379 /// This is the primary method for the executor to claim ownership of a ready queue.
380 /// Uses a CAS loop to ensure the bit is cleared atomically.
381 ///
382 /// # Parameters
383 ///
384 /// - `index`: Bit position to clear (0-63)
385 ///
386 /// # Returns
387 ///
388 /// - `true`: Bit was set and has been successfully cleared (queue acquired)
389 /// - `false`: Bit was not set (queue not ready or already acquired)
390 ///
391 /// # Use Cases
392 ///
393 /// ```ignore
394 /// // Executor loop
395 /// if signal.acquire(queue_bit) {
396 /// // Successfully acquired queue, process it
397 /// process_queue(queue_id);
398 /// }
399 /// ```
400 ///
401 /// # Performance
402 ///
403 /// ~10-20 ns (CAS loop, typically succeeds on first iteration)
404 ///
405 /// # Example
406 ///
407 /// ```ignore
408 /// let signal = Signal::new();
409 /// signal.set(5);
410 /// assert!(signal.acquire(5)); // Successfully cleared bit 5
411 /// assert!(!signal.acquire(5)); // Bit already clear, returns false
412 /// ```
413 #[inline(always)]
414 pub fn acquire(&self, index: u64) -> bool {
415 crate::bits::acquire(&self.inner.value, index)
416 }
417
418 /// Attempts to atomically clear a bit, returning detailed state information.
419 ///
420 /// Similar to `acquire()`, but provides additional information about the
421 /// before/after state of the bitmap, useful for debugging or advanced scheduling.
422 ///
423 /// # Parameters
424 ///
425 /// - `index`: Bit position to clear (0-63)
426 ///
427 /// # Returns
428 ///
429 /// A tuple `(before, after, success)`:
430 /// - `before`: Bitmap value before the operation
431 /// - `after`: Bitmap value after the operation (if successful)
432 /// - `success`: `true` if the bit was cleared, `false` if it wasn't set
433 ///
434 /// # Example
435 ///
436 /// ```ignore
437 /// let signal = Signal::with_value(0, 0b101010); // Bits 1, 3, 5 set
438 /// let (before, after, success) = signal.try_acquire(3);
439 /// assert_eq!(before, 0b101010);
440 /// assert_eq!(after, 0b100010); // Bit 3 cleared
441 /// assert!(success);
442 /// ```
443 #[inline(always)]
444 pub fn try_acquire(&self, index: u64) -> (u64, u64, bool) {
445 crate::bits::try_acquire(&self.inner.value, index)
446 }
447
448 /// Checks if a specific bit is set without modifying the bitmap.
449 ///
450 /// Non-atomic read followed by bit test. Suitable for non-critical checks
451 /// where races are acceptable.
452 ///
453 /// # Parameters
454 ///
455 /// - `index`: Bit position to check (0-63)
456 ///
457 /// # Returns
458 ///
459 /// `true` if the bit is set, `false` otherwise.
460 ///
461 /// # Example
462 ///
463 /// ```ignore
464 /// let signal = Signal::new();
465 /// assert!(!signal.is_set(5));
466 /// signal.set(5);
467 /// assert!(signal.is_set(5));
468 /// ```
469 #[inline(always)]
470 pub fn is_set(&self, index: u64) -> bool {
471 crate::bits::is_set(&self.inner.value, index)
472 }
473}
474
475// ──────────────────────────────────────────────────────────────────────────────
476// SignalGate State Machine Constants
477// ──────────────────────────────────────────────────────────────────────────────
478
479/// Queue has no work scheduled and is not being processed.
480///
481/// This is the initial state. Transitions to SCHEDULED when work is enqueued.
482pub const IDLE: u8 = 0;
483
484/// Queue has work available and is waiting for the executor to process it.
485///
486/// Transitions from IDLE when `schedule()` is called. Transitions to EXECUTING
487/// when the executor calls `begin()`.
488pub const SCHEDULED: u8 = 1;
489
490/// Queue is currently being processed by the executor.
491///
492/// Transitions from SCHEDULED when executor calls `begin()`. Transitions back to
493/// IDLE (via `finish()`) or SCHEDULED (via `finish_and_schedule()`) when processing completes.
494pub const EXECUTING: u8 = 2;
495
496/// Per-queue gate coordinating scheduling between producers and executor.
497///
498/// `SignalGate` implements a lock-free state machine that prevents redundant scheduling
499/// and ensures proper handoff of work from producers to the executor. Each queue has
500/// exactly one `SignalGate` instance.
501///
502/// # State Machine
503///
504/// ```text
505/// ┌──────────────────────────────────────────────────────────────┐
506/// │ │
507/// │ IDLE (0) ──schedule()──▶ SCHEDULED (1) ──begin()──▶ EXECUTING (2)
508/// │ ▲ │ │
509/// │ │ │ │
510/// │ └────────finish()────────────┴───────────────────────────┘
511/// │ │ │
512/// │ └──finish_and_schedule()────┘
513/// │ │ │
514/// │ ▼ │
515/// │ SCHEDULED (1) │
516/// └──────────────────────────────────────────────────────────────┘
517/// ```
518///
519/// # State Transitions
520///
521/// - **IDLE → SCHEDULED**: Producer calls `schedule()` after enqueuing items
522/// - **SCHEDULED → EXECUTING**: Executor calls `begin()` before processing
523/// - **EXECUTING → IDLE**: Executor calls `finish()` when done (queue empty)
524/// - **EXECUTING → SCHEDULED**: Executor calls `finish_and_schedule()` when more work remains
525/// - **Any → SCHEDULED**: Concurrent `schedule()` during EXECUTING sets flag, processed in `finish()`
526///
527/// # Concurrency Guarantees
528///
529/// - **Multiple producers**: Safe (atomic flags ensure only one schedule succeeds)
530/// - **Producer + executor**: Safe (state transitions are atomic and properly ordered)
531/// - **Multiple executors**: NOT SAFE (single-threaded consumption assumption)
532///
533/// # Integration with Signal and SignalWaker
534///
535/// When a queue transitions IDLE → SCHEDULED:
536/// 1. Sets bit in the associated `Signal` (64-bit bitmap)
537/// 2. If signal was empty, sets bit in `SignalWaker` summary (64-bit bitmap)
538/// 3. May wake sleeping executor thread via permit system
539///
540/// # Memory Layout
541///
542/// ```text
543/// SignalGate (40 bytes on x86_64)
544/// ┌─────────────┬───────────┬─────────┬─────────┐
545/// │ flags (1B) │ bit_index │ signal │ waker │
546/// │ AtomicU8 │ u64 (8B) │ Arc (8B)│ Arc (8B)│
547/// └─────────────┴───────────┴─────────┴─────────┘
548/// ```
549///
550/// # Example Usage
551///
552/// ```ignore
553/// // Setup
554/// let waker = Arc::new(SignalWaker::new());
555/// let signal = Signal::with_index(5);
556/// let gate = SignalGate::new(10, signal, waker);
557///
558/// // Producer thread
559/// queue.try_push(item)?;
560/// gate.schedule(); // Signal work available
561///
562/// // Executor thread
563/// gate.begin(); // Mark as executing
564/// while let Some(item) = queue.try_pop() {
565/// process(item);
566/// }
567/// if queue.is_empty() {
568/// gate.finish(); // Done, back to IDLE
569/// } else {
570/// gate.finish_and_schedule(); // More work, stay SCHEDULED
571/// }
572/// ```
573pub struct SignalGate {
574 /// Atomic state flags (IDLE, SCHEDULED, EXECUTING).
575 ///
576 /// Uses bitwise OR to combine flags, allowing detection of concurrent schedules
577 /// during execution (EXECUTING | SCHEDULED = 3).
578 flags: AtomicU8,
579
580 /// Bit position within the Signal's 64-bit bitmap (0-63).
581 ///
582 /// This queue's ready state is represented by bit `1 << bit_index` in the signal.
583 bit_index: u8,
584
585 /// Reference to the Signal word containing this queue's bit.
586 ///
587 /// Shared among up to 64 queues (all queues in the same signal group).
588 signal: Signal,
589
590 /// Reference to the top-level SignalWaker for summary updates.
591 ///
592 /// Shared among all queues in the executor (up to 3,968 queues).
593 waker: Arc<WorkerWaker>,
594}
595
596impl SignalGate {
597 /// Creates a new SignalGate in the IDLE state.
598 ///
599 /// # Parameters
600 ///
601 /// - `bit_index`: Position of this queue's bit within the signal (0-63)
602 /// - `signal`: Reference to the Signal word containing this queue's bit
603 /// - `waker`: Reference to the SignalWaker for summary updates
604 ///
605 /// # Example
606 ///
607 /// ```ignore
608 /// let waker = Arc::new(SignalWaker::new());
609 /// let signal = Signal::with_index(0);
610 /// let gate = SignalGate::new(5, signal, waker);
611 /// // This gate controls bit 5 in signal[0]
612 /// ```
613 pub fn new(bit_index: u8, signal: Signal, waker: Arc<WorkerWaker>) -> Self {
614 Self {
615 flags: AtomicU8::new(IDLE),
616 bit_index,
617 signal,
618 waker,
619 }
620 }
621
622 /// Attempts to schedule this queue for execution (IDLE → SCHEDULED transition).
623 ///
624 /// Called by producers after enqueuing items to notify the executor. Uses atomic
625 /// operations to ensure only one successful schedule per work batch.
626 ///
627 /// # Algorithm
628 ///
629 /// 1. **Fast check**: If already SCHEDULED, return false immediately (idempotent)
630 /// 2. **Atomic set**: `fetch_or(SCHEDULED)` to set the SCHEDULED flag
631 /// 3. **State check**: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
632 /// - Set bit in signal word via `signal.set(bit_index)`
633 /// - If signal transitioned from empty, update summary via `waker.mark_active()`
634 /// - Return true (successful schedule)
635 /// 4. **Otherwise**: Return false (already scheduled or executing)
636 ///
637 /// # Returns
638 ///
639 /// - `true`: Successfully transitioned from IDLE to SCHEDULED (work will be processed)
640 /// - `false`: Already scheduled/executing, or concurrent schedule won (idempotent)
641 ///
642 /// # Concurrent Behavior
643 ///
644 /// - **Multiple producers**: Only the first `schedule()` succeeds (returns true)
645 /// - **During EXECUTING**: Sets SCHEDULED flag, which `finish()` will detect and reschedule
646 ///
647 /// # Memory Ordering
648 ///
649 /// - Initial load: `Acquire` (see latest state)
650 /// - `fetch_or`: `Release` (publish enqueued items to executor)
651 ///
652 /// # Performance
653 ///
654 /// - **Already scheduled**: ~2-3 ns (fast path, single atomic load)
655 /// - **Successful schedule**: ~10-20 ns (fetch_or + signal update + potential summary update)
656 ///
657 /// # Example
658 ///
659 /// ```ignore
660 /// // Producer 1
661 /// queue.try_push(item)?;
662 /// if gate.schedule() {
663 /// println!("Successfully scheduled"); // First producer
664 /// }
665 ///
666 /// // Producer 2 (concurrent)
667 /// queue.try_push(another_item)?;
668 /// if !gate.schedule() {
669 /// println!("Already scheduled"); // Idempotent, no action needed
670 /// }
671 /// ```
672 #[inline(always)]
673 pub fn schedule(&self) -> bool {
674 if (self.flags.load(Ordering::Acquire) & SCHEDULED) != IDLE {
675 return false;
676 }
677
678 let previous_flags = self.flags.fetch_or(SCHEDULED, Ordering::Release);
679 let scheduled_nor_executing = (previous_flags & (SCHEDULED | EXECUTING)) == IDLE;
680
681 if scheduled_nor_executing {
682 let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
683 if was_empty && was_set {
684 self.waker.mark_active(self.signal.index());
685 }
686 true
687 } else {
688 false
689 }
690 }
691
692 /// Marks the queue as EXECUTING (SCHEDULED → EXECUTING transition).
693 ///
694 /// Called by the executor when it begins processing this queue. This transition
695 /// prevents redundant scheduling while work is being processed.
696 ///
697 /// # State Transition
698 ///
699 /// Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.
700 /// ```text
701 /// Before: SCHEDULED (1)
702 /// After: EXECUTING (2)
703 /// ```
704 ///
705 /// If a producer calls `schedule()` after `begin()` but before `finish()`, the
706 /// SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED),
707 /// which `finish()` detects and handles.
708 ///
709 /// # Memory Ordering
710 ///
711 /// Uses `Ordering::Release` to ensure the state change is visible to concurrent
712 /// producers calling `schedule()`.
713 ///
714 /// # Performance
715 ///
716 /// ~1-2 ns (single atomic store)
717 ///
718 /// # Example
719 ///
720 /// ```ignore
721 /// // Executor discovers ready queue
722 /// if signal.acquire(queue_bit) {
723 /// gate.begin(); // Mark as executing
724 /// process_queue();
725 /// gate.finish();
726 /// }
727 /// ```
728 #[inline(always)]
729 pub fn mark(&self) {
730 self.flags.store(EXECUTING, Ordering::Release);
731 }
732
733 /// Marks the queue as IDLE and handles concurrent schedules (EXECUTING → IDLE/SCHEDULED).
734 ///
735 /// Called by the executor after processing a batch of items. Automatically detects
736 /// if new work arrived during processing (SCHEDULED flag set concurrently) and
737 /// reschedules if needed.
738 ///
739 /// # Algorithm
740 ///
741 /// 1. **Clear EXECUTING**: `fetch_sub(EXECUTING)` atomically transitions to IDLE
742 /// 2. **Check SCHEDULED**: If the SCHEDULED flag is set in the result:
743 /// - Means a producer called `schedule()` during execution
744 /// - Re-set the signal bit to ensure executor sees the work
745 /// - Queue remains/becomes SCHEDULED
746 ///
747 /// # Automatic Rescheduling
748 ///
749 /// This method implements a key correctness property: if a producer enqueues work
750 /// while the executor is processing, that work will not be lost. The SCHEDULED flag
751 /// acts as a handoff mechanism.
752 ///
753 /// ```text
754 /// Timeline:
755 /// T0: Executor calls begin() → EXECUTING (2)
756 /// T1: Producer calls schedule() → EXECUTING | SCHEDULED (3)
757 /// T2: Executor calls finish() → SCHEDULED (1) [bit re-set in signal]
758 /// T3: Executor sees bit, processes → ...
759 /// ```
760 ///
761 /// # Memory Ordering
762 ///
763 /// Uses `Ordering::AcqRel`:
764 /// - **Acquire**: See all producer writes (enqueued items)
765 /// - **Release**: Publish state transition to future readers
766 ///
767 /// # Performance
768 ///
769 /// - **No concurrent schedule**: ~2-3 ns (fetch_sub only)
770 /// - **With concurrent schedule**: ~10-15 ns (fetch_sub + signal.set)
771 ///
772 /// # Example
773 ///
774 /// ```ignore
775 /// gate.begin();
776 /// while let Some(item) = queue.try_pop() {
777 /// process(item);
778 /// }
779 /// gate.finish(); // Automatically reschedules if more work arrived
780 /// ```
781 #[inline(always)]
782 pub fn unmark(&self) {
783 let after_flags = self.flags.fetch_sub(EXECUTING, Ordering::AcqRel);
784 if after_flags & SCHEDULED != IDLE {
785 self.signal.set(self.bit_index as u64);
786 }
787 }
788
789 /// Atomically marks the queue as SCHEDULED, ensuring re-execution.
790 ///
791 /// Called by the executor when it knows more work exists but wants to yield the
792 /// timeslice for fairness. This is an optimization over `finish()` followed by
793 /// external `schedule()`.
794 ///
795 /// # Use Cases
796 ///
797 /// 1. **Batch size limiting**: Process N items, then yield to other queues
798 /// 2. **Fairness**: Prevent queue starvation by rotating execution
799 /// 3. **Latency control**: Ensure all queues get regular timeslices
800 ///
801 /// # Algorithm
802 ///
803 /// 1. **Set state**: Store SCHEDULED unconditionally
804 /// 2. **Update signal**: Set bit in signal word
805 /// 3. **Update summary**: If signal was empty, mark active in waker
806 ///
807 /// # Comparison with finish() + schedule()
808 ///
809 /// ```ignore
810 /// // Separate calls (2 atomic ops)
811 /// gate.finish(); // EXECUTING → IDLE
812 /// gate.schedule(); // IDLE → SCHEDULED
813 ///
814 /// // Combined call (1 atomic op + signal update)
815 /// gate.finish_and_schedule(); // EXECUTING → SCHEDULED
816 /// ```
817 ///
818 /// # Memory Ordering
819 ///
820 /// Uses `Ordering::Release` to publish both the state change and enqueued items.
821 ///
822 /// # Performance
823 ///
824 /// ~10-15 ns (store + signal.set + potential summary update)
825 ///
826 /// # Example
827 ///
828 /// ```ignore
829 /// gate.begin();
830 /// let mut processed = 0;
831 /// while processed < BATCH_SIZE {
832 /// if let Some(item) = queue.try_pop() {
833 /// process(item);
834 /// processed += 1;
835 /// } else {
836 /// break;
837 /// }
838 /// }
839 ///
840 /// if queue.len() > 0 {
841 /// gate.finish_and_schedule(); // More work, stay scheduled
842 /// } else {
843 /// gate.finish(); // Done, go idle
844 /// }
845 /// ```
846 #[inline(always)]
847 pub fn unmark_and_schedule(&self) {
848 self.flags.store(SCHEDULED, Ordering::Release);
849 let (was_empty, was_set) = self.signal.set(self.bit_index as u64);
850 if was_empty && was_set {
851 self.waker.mark_active(self.signal.index());
852 }
853 }
854}
855
856impl SignalSchedule for SignalGate {
857 fn schedule(&self) -> bool {
858 SignalGate::schedule(self)
859 }
860
861 fn mark(&self) {
862 SignalGate::mark(self);
863 }
864
865 fn unmark(&self) {
866 SignalGate::unmark(self);
867 }
868
869 fn unmark_and_schedule(&self) {
870 SignalGate::unmark_and_schedule(self);
871 }
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877}