pub struct AsyncSignalGate { /* private fields */ }Expand description
Per-queue gate coordinating scheduling between producers and executor.
SignalGate implements a lock-free state machine that prevents redundant scheduling
and ensures proper handoff of work from producers to the executor. Each queue has
exactly one SignalGate instance.
§State Machine
┌──────────────────────────────────────────────────────────────┐
│ │
│ IDLE (0) ──schedule()──▶ SCHEDULED (1) ──begin()──▶ EXECUTING (2)
│ ▲ │ │
│ │ │ │
│ └────────finish()────────────┴───────────────────────────┘
│ │ │
│ └──finish_and_schedule()────┘
│ │ │
│ ▼ │
│ SCHEDULED (1) │
└──────────────────────────────────────────────────────────────┘§State Transitions
- IDLE → SCHEDULED: Producer calls
schedule()after enqueuing items - SCHEDULED → EXECUTING: Executor calls
begin()before processing - EXECUTING → IDLE: Executor calls
finish()when done (queue empty) - EXECUTING → SCHEDULED: Executor calls
finish_and_schedule()when more work remains - Any → SCHEDULED: Concurrent
schedule()during EXECUTING sets flag, processed infinish()
§Concurrency Guarantees
- Multiple producers: Safe (atomic flags ensure only one schedule succeeds)
- Producer + executor: Safe (state transitions are atomic and properly ordered)
- Multiple executors: NOT SAFE (single-threaded consumption assumption)
§Integration with Signal and SignalWaker
When a queue transitions IDLE → SCHEDULED:
- Sets bit in the associated
Signal(64-bit bitmap) - If signal was empty, sets bit in
SignalWakersummary (64-bit bitmap) - May wake sleeping executor thread via permit system
§Memory Layout
SignalGate (40 bytes on x86_64)
┌─────────────┬───────────┬─────────┬─────────┐
│ flags (1B) │ bit_index │ signal │ waker │
│ AtomicU8 │ u64 (8B) │ Arc (8B)│ Arc (8B)│
└─────────────┴───────────┴─────────┴─────────┘§Example Usage
// Setup
let waker = Arc::new(SignalWaker::new());
let signal = Signal::with_index(5);
let gate = SignalGate::new(10, signal, waker);
// Producer thread
queue.try_push(item)?;
gate.schedule(); // Signal work available
// Executor thread
gate.begin(); // Mark as executing
while let Some(item) = queue.try_pop() {
process(item);
}
if queue.is_empty() {
gate.finish(); // Done, back to IDLE
} else {
gate.finish_and_schedule(); // More work, stay SCHEDULED
}Implementations§
Source§impl AsyncSignalGate
impl AsyncSignalGate
Sourcepub fn new(bit_index: u8, signal: Signal, waker: Arc<AsyncSignalWaker>) -> Self
pub fn new(bit_index: u8, signal: Signal, waker: Arc<AsyncSignalWaker>) -> Self
Creates a new SignalGate in the IDLE state.
§Parameters
bit_index: Position of this queue’s bit within the signal (0-63)signal: Reference to the Signal word containing this queue’s bitwaker: Reference to the SignalWaker for summary updates
§Example
let waker = Arc::new(SignalWaker::new());
let signal = Signal::with_index(0);
let gate = SignalGate::new(5, signal, waker);
// This gate controls bit 5 in signal[0]Sourcepub fn schedule(&self) -> bool
pub fn schedule(&self) -> bool
Attempts to schedule this queue for execution (IDLE → SCHEDULED transition).
Called by producers after enqueuing items to notify the executor. Uses atomic operations to ensure only one successful schedule per work batch.
§Algorithm
- Fast check: If already SCHEDULED, return false immediately (idempotent)
- Atomic set:
fetch_or(SCHEDULED)to set the SCHEDULED flag - State check: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
- Set bit in signal word via
signal.set(bit_index) - If signal transitioned from empty, update summary via
waker.mark_active() - Return true (successful schedule)
- Set bit in signal word via
- Otherwise: Return false (already scheduled or executing)
§Returns
true: Successfully transitioned from IDLE to SCHEDULED (work will be processed)false: Already scheduled/executing, or concurrent schedule won (idempotent)
§Concurrent Behavior
- Multiple producers: Only the first
schedule()succeeds (returns true) - During EXECUTING: Sets SCHEDULED flag, which
finish()will detect and reschedule
§Memory Ordering
- Initial load:
Acquire(see latest state) fetch_or:Release(publish enqueued items to executor)
§Performance
- Already scheduled: ~2-3 ns (fast path, single atomic load)
- Successful schedule: ~10-20 ns (fetch_or + signal update + potential summary update)
§Example
// Producer 1
queue.try_push(item)?;
if gate.schedule() {
println!("Successfully scheduled"); // First producer
}
// Producer 2 (concurrent)
queue.try_push(another_item)?;
if !gate.schedule() {
println!("Already scheduled"); // Idempotent, no action needed
}Sourcepub fn mark(&self)
pub fn mark(&self)
Marks the queue as EXECUTING (SCHEDULED → EXECUTING transition).
Called by the executor when it begins processing this queue. This transition prevents redundant scheduling while work is being processed.
§State Transition
Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.
Before: SCHEDULED (1)
After: EXECUTING (2)If a producer calls schedule() after begin() but before finish(), the
SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED),
which finish() detects and handles.
§Memory Ordering
Uses Ordering::Release to ensure the state change is visible to concurrent
producers calling schedule().
§Performance
~1-2 ns (single atomic store)
§Example
// Executor discovers ready queue
if signal.acquire(queue_bit) {
gate.begin(); // Mark as executing
process_queue();
gate.finish();
}Sourcepub fn unmark(&self)
pub fn unmark(&self)
Marks the queue as IDLE and handles concurrent schedules (EXECUTING → IDLE/SCHEDULED).
Called by the executor after processing a batch of items. Automatically detects if new work arrived during processing (SCHEDULED flag set concurrently) and reschedules if needed.
§Algorithm
- Clear EXECUTING:
fetch_sub(EXECUTING)atomically transitions to IDLE - Check SCHEDULED: If the SCHEDULED flag is set in the result:
- Means a producer called
schedule()during execution - Re-set the signal bit to ensure executor sees the work
- Queue remains/becomes SCHEDULED
- Means a producer called
§Automatic Rescheduling
This method implements a key correctness property: if a producer enqueues work while the executor is processing, that work will not be lost. The SCHEDULED flag acts as a handoff mechanism.
Timeline:
T0: Executor calls begin() → EXECUTING (2)
T1: Producer calls schedule() → EXECUTING | SCHEDULED (3)
T2: Executor calls finish() → SCHEDULED (1) [bit re-set in signal]
T3: Executor sees bit, processes → ...§Memory Ordering
Uses Ordering::AcqRel:
- Acquire: See all producer writes (enqueued items)
- Release: Publish state transition to future readers
§Performance
- No concurrent schedule: ~2-3 ns (fetch_sub only)
- With concurrent schedule: ~10-15 ns (fetch_sub + signal.set)
§Example
gate.begin();
while let Some(item) = queue.try_pop() {
process(item);
}
gate.finish(); // Automatically reschedules if more work arrivedSourcepub fn unmark_and_schedule(&self)
pub fn unmark_and_schedule(&self)
Atomically marks the queue as SCHEDULED, ensuring re-execution.
Called by the executor when it knows more work exists but wants to yield the
timeslice for fairness. This is an optimization over finish() followed by
external schedule().
§Use Cases
- Batch size limiting: Process N items, then yield to other queues
- Fairness: Prevent queue starvation by rotating execution
- Latency control: Ensure all queues get regular timeslices
§Algorithm
- Set state: Store SCHEDULED unconditionally
- Update signal: Set bit in signal word
- Update summary: If signal was empty, mark active in waker
§Comparison with finish() + schedule()
// Separate calls (2 atomic ops)
gate.finish(); // EXECUTING → IDLE
gate.schedule(); // IDLE → SCHEDULED
// Combined call (1 atomic op + signal update)
gate.finish_and_schedule(); // EXECUTING → SCHEDULED§Memory Ordering
Uses Ordering::Release to publish both the state change and enqueued items.
§Performance
~10-15 ns (store + signal.set + potential summary update)
§Example
gate.begin();
let mut processed = 0;
while processed < BATCH_SIZE {
if let Some(item) = queue.try_pop() {
process(item);
processed += 1;
} else {
break;
}
}
if queue.len() > 0 {
gate.finish_and_schedule(); // More work, stay scheduled
} else {
gate.finish(); // Done, go idle
}