AsyncSignalGate

Struct AsyncSignalGate 

Source
pub struct AsyncSignalGate { /* private fields */ }
Expand description

Per-queue gate coordinating scheduling between producers and executor.

SignalGate implements a lock-free state machine that prevents redundant scheduling and ensures proper handoff of work from producers to the executor. Each queue has exactly one SignalGate instance.

§State Machine

┌──────────────────────────────────────────────────────────────┐
│                                                              │
│  IDLE (0)  ──schedule()──▶  SCHEDULED (1)  ──begin()──▶  EXECUTING (2)
│     ▲                            │                           │
│     │                            │                           │
│     └────────finish()────────────┴───────────────────────────┘
│                                  │                           │
│                                  └──finish_and_schedule()────┘
│                                              │               │
│                                              ▼               │
│                                         SCHEDULED (1)        │
└──────────────────────────────────────────────────────────────┘

§State Transitions

  • IDLE → SCHEDULED: Producer calls schedule() after enqueuing items
  • SCHEDULED → EXECUTING: Executor calls begin() before processing
  • EXECUTING → IDLE: Executor calls finish() when done (queue empty)
  • EXECUTING → SCHEDULED: Executor calls finish_and_schedule() when more work remains
  • Any → SCHEDULED: Concurrent schedule() during EXECUTING sets flag, processed in finish()

§Concurrency Guarantees

  • Multiple producers: Safe (atomic flags ensure only one schedule succeeds)
  • Producer + executor: Safe (state transitions are atomic and properly ordered)
  • Multiple executors: NOT SAFE (single-threaded consumption assumption)

§Integration with Signal and SignalWaker

When a queue transitions IDLE → SCHEDULED:

  1. Sets bit in the associated Signal (64-bit bitmap)
  2. If signal was empty, sets bit in SignalWaker summary (64-bit bitmap)
  3. May wake sleeping executor thread via permit system

§Memory Layout

SignalGate (40 bytes on x86_64)
┌─────────────┬───────────┬─────────┬─────────┐
│ flags (1B)  │ bit_index │ signal  │ waker   │
│ AtomicU8    │ u64 (8B)  │ Arc (8B)│ Arc (8B)│
└─────────────┴───────────┴─────────┴─────────┘

§Example Usage

// Setup
let waker = Arc::new(SignalWaker::new());
let signal = Signal::with_index(5);
let gate = SignalGate::new(10, signal, waker);

// Producer thread
queue.try_push(item)?;
gate.schedule();  // Signal work available

// Executor thread
gate.begin();     // Mark as executing
while let Some(item) = queue.try_pop() {
    process(item);
}
if queue.is_empty() {
    gate.finish();  // Done, back to IDLE
} else {
    gate.finish_and_schedule();  // More work, stay SCHEDULED
}

Implementations§

Source§

impl AsyncSignalGate

Source

pub fn new(bit_index: u8, signal: Signal, waker: Arc<AsyncSignalWaker>) -> Self

Creates a new SignalGate in the IDLE state.

§Parameters
  • bit_index: Position of this queue’s bit within the signal (0-63)
  • signal: Reference to the Signal word containing this queue’s bit
  • waker: Reference to the SignalWaker for summary updates
§Example
let waker = Arc::new(SignalWaker::new());
let signal = Signal::with_index(0);
let gate = SignalGate::new(5, signal, waker);
// This gate controls bit 5 in signal[0]
Source

pub fn schedule(&self) -> bool

Attempts to schedule this queue for execution (IDLE → SCHEDULED transition).

Called by producers after enqueuing items to notify the executor. Uses atomic operations to ensure only one successful schedule per work batch.

§Algorithm
  1. Fast check: If already SCHEDULED, return false immediately (idempotent)
  2. Atomic set: fetch_or(SCHEDULED) to set the SCHEDULED flag
  3. State check: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
    • Set bit in signal word via signal.set(bit_index)
    • If signal transitioned from empty, update summary via waker.mark_active()
    • Return true (successful schedule)
  4. Otherwise: Return false (already scheduled or executing)
§Returns
  • true: Successfully transitioned from IDLE to SCHEDULED (work will be processed)
  • false: Already scheduled/executing, or concurrent schedule won (idempotent)
§Concurrent Behavior
  • Multiple producers: Only the first schedule() succeeds (returns true)
  • During EXECUTING: Sets SCHEDULED flag, which finish() will detect and reschedule
§Memory Ordering
  • Initial load: Acquire (see latest state)
  • fetch_or: Release (publish enqueued items to executor)
§Performance
  • Already scheduled: ~2-3 ns (fast path, single atomic load)
  • Successful schedule: ~10-20 ns (fetch_or + signal update + potential summary update)
§Example
// Producer 1
queue.try_push(item)?;
if gate.schedule() {
    println!("Successfully scheduled");  // First producer
}

// Producer 2 (concurrent)
queue.try_push(another_item)?;
if !gate.schedule() {
    println!("Already scheduled");  // Idempotent, no action needed
}
Source

pub fn mark(&self)

Marks the queue as EXECUTING (SCHEDULED → EXECUTING transition).

Called by the executor when it begins processing this queue. This transition prevents redundant scheduling while work is being processed.

§State Transition

Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.

Before: SCHEDULED (1)
After:  EXECUTING (2)

If a producer calls schedule() after begin() but before finish(), the SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED), which finish() detects and handles.

§Memory Ordering

Uses Ordering::Release to ensure the state change is visible to concurrent producers calling schedule().

§Performance

~1-2 ns (single atomic store)

§Example
// Executor discovers ready queue
if signal.acquire(queue_bit) {
    gate.begin();  // Mark as executing
    process_queue();
    gate.finish();
}
Source

pub fn unmark(&self)

Marks the queue as IDLE and handles concurrent schedules (EXECUTING → IDLE/SCHEDULED).

Called by the executor after processing a batch of items. Automatically detects if new work arrived during processing (SCHEDULED flag set concurrently) and reschedules if needed.

§Algorithm
  1. Clear EXECUTING: fetch_sub(EXECUTING) atomically transitions to IDLE
  2. Check SCHEDULED: If the SCHEDULED flag is set in the result:
    • Means a producer called schedule() during execution
    • Re-set the signal bit to ensure executor sees the work
    • Queue remains/becomes SCHEDULED
§Automatic Rescheduling

This method implements a key correctness property: if a producer enqueues work while the executor is processing, that work will not be lost. The SCHEDULED flag acts as a handoff mechanism.

Timeline:
T0: Executor calls begin()           → EXECUTING (2)
T1: Producer calls schedule()        → EXECUTING | SCHEDULED (3)
T2: Executor calls finish()          → SCHEDULED (1) [bit re-set in signal]
T3: Executor sees bit, processes     → ...
§Memory Ordering

Uses Ordering::AcqRel:

  • Acquire: See all producer writes (enqueued items)
  • Release: Publish state transition to future readers
§Performance
  • No concurrent schedule: ~2-3 ns (fetch_sub only)
  • With concurrent schedule: ~10-15 ns (fetch_sub + signal.set)
§Example
gate.begin();
while let Some(item) = queue.try_pop() {
    process(item);
}
gate.finish();  // Automatically reschedules if more work arrived
Source

pub fn unmark_and_schedule(&self)

Atomically marks the queue as SCHEDULED, ensuring re-execution.

Called by the executor when it knows more work exists but wants to yield the timeslice for fairness. This is an optimization over finish() followed by external schedule().

§Use Cases
  1. Batch size limiting: Process N items, then yield to other queues
  2. Fairness: Prevent queue starvation by rotating execution
  3. Latency control: Ensure all queues get regular timeslices
§Algorithm
  1. Set state: Store SCHEDULED unconditionally
  2. Update signal: Set bit in signal word
  3. Update summary: If signal was empty, mark active in waker
§Comparison with finish() + schedule()
// Separate calls (2 atomic ops)
gate.finish();      // EXECUTING → IDLE
gate.schedule();    // IDLE → SCHEDULED

// Combined call (1 atomic op + signal update)
gate.finish_and_schedule();  // EXECUTING → SCHEDULED
§Memory Ordering

Uses Ordering::Release to publish both the state change and enqueued items.

§Performance

~10-15 ns (store + signal.set + potential summary update)

§Example
gate.begin();
let mut processed = 0;
while processed < BATCH_SIZE {
    if let Some(item) = queue.try_pop() {
        process(item);
        processed += 1;
    } else {
        break;
    }
}

if queue.len() > 0 {
    gate.finish_and_schedule();  // More work, stay scheduled
} else {
    gate.finish();  // Done, go idle
}

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V