WorkerWaker

Struct WorkerWaker 

Source
pub struct WorkerWaker { /* private fields */ }
Expand description

A cache-optimized waker that packs queue summaries and control flags into a single status word.

§Architecture

WorkerWaker implements a two-level hierarchy for efficient work discovery while keeping a single atomic source of truth for coordination data:

Level 1: Status Word (64 bits)
         ┌─────────────────────────────────────┐
         │ Bits 0-61  │ Bit 62 │ Bit 63        │
         │ Queue map  │ Part.  │ Yield         │
         └─────────────────────────────────────┘
                │           │
                ▼           ▼
Level 2: Signal Words (external AtomicU64s)
         Word 0  Word 1  ...  Word 61
         [64 q]  [64 q]  ...  [64 q]           Each bit = individual queue state

Total: 62 words × 64 bits = 3,968 queues

§Core Components

  1. Status Bitmap (status): Single u64 that stores queue-word summary bits (0‒61) plus control flags (partition/yield). Enables O(1) lookup without a second atomic.

  2. Counting Semaphore (permits): Tracks how many threads should be awake. Each queue transition from empty→non-empty adds exactly 1 permit, guaranteeing no lost wakeups.

  3. Sleeper Tracking (sleepers): Approximate count of parked threads. Used to throttle notifications (avoid waking more threads than necessary).

§Design Patterns

§Cache Optimization

  • CachePadded on struct for cache-line alignment
  • CachePadded on hot atomics to prevent false sharing
  • Producer/consumer paths access different cache lines

§Memory Ordering Strategy

  • Status summary bits: Relaxed - hint-based, false positives acceptable
  • Permits: AcqRel/Release - proper synchronization for wakeups
  • Sleepers: Relaxed - approximate count is sufficient

§Lazy Cleanup

Summary bits may remain set after queues empty (false positives). Consumers lazily clear bits via try_unmark_if_empty(). This trades occasional extra checks for lower overhead on the hot path.

§Guarantees

  • No lost wakeups: Permits accumulate even if no threads are sleeping
  • Bounded notifications: Never wakes more than sleepers threads
  • Lock-free fast path: try_acquire() uses only atomics
  • Summary consistency: false positives OK, false negatives impossible

§Trade-offs

  • False positives: Summary may indicate work when queues are empty (lazy cleanup)
  • Approximate sleeper count: May over-notify slightly (but safely)
  • 64-word limit: Summary is single u64 (extensible if needed)

§Usage Example

use std::sync::Arc;
use maniac::WorkerWaker;

let waker = Arc::new(WorkerWaker::new());

// Producer: mark queue 5 in word 0 as active
waker.mark_active(0);  // Adds 1 permit, wakes 1 sleeper

// Consumer: find work via summary
let summary = waker.snapshot_summary();
for word_idx in (0..64).filter(|i| summary & (1 << i) != 0) {
    // Process queues in word_idx
}

// Consumer: block when no work
waker.acquire();  // Waits for a permit

§Performance Characteristics

  • mark_active: O(1) atomic, fast if already set
  • mark_active_mask: O(1) batch update for multiple queues
  • try_acquire: O(1) lock-free
  • acquire: O(1) amortized (blocks on contention)
  • snapshot_summary: O(1) single atomic load

§Thread Safety

All methods are thread-safe. Producers and consumers can operate concurrently without coordination beyond the internal atomics and mutex/condvar for blocking.

Implementations§

Source§

impl WorkerWaker

Source

pub fn new() -> Self

Source

pub fn status(&self) -> u64

Returns the full 64-bit raw status word for this worker, which contains all control and summary bits.

§Details

The status word encodes:

  • Status control bits (e.g., yield, partition-ready)
  • Partition summary bits (track active leafs in this partition)

This is a low-level snapshot, useful for diagnostics, debugging, or fast checks on global/partition state.

§Memory Ordering

Uses relaxed ordering for performance, as consumers tolerate minor staleness and correctness is ensured elsewhere.

Source

pub fn status_bits(&self) -> (bool, bool)

Returns the current state of the primary control bits (“yield” and “partition”).

§Returns

A tuple (is_yield, is_partition_active) representing:

  • is_yield: Whether the yield control bit is set, instructing the worker to yield.
  • is_partition_active: Whether the partition summary bit is set, indicating there is pending work detected in this worker’s assigned partition.

This allows higher-level logic to react based on whether the worker should yield or has instant work available.

§Memory Ordering

Uses relaxed ordering for performance, as spurious staleness is benign and status is periodically refreshed.

Source

pub fn mark_yield(&self)

Sets the STATUS_BIT_YIELD flag for this worker and releases a permit if it was not previously set.

§Purpose

Requests the worker to yield (i.e., temporarily relinquish active scheduling) so that other workers can take priority or perform balancing. This enables cooperative multitasking among workers in high-contention or handoff scenarios.

§Behavior
  • If the yield bit was previously unset (i.e., this is the first request to yield), this method also releases one permit to ensure the sleeping worker receives a wakeup.
  • If already set, does nothing except marking the yield flag again (idempotent).
§Concurrency

Safe for concurrent use: races to set the yield bit and release permits are benign.

§Memory Ordering

Uses Acquire/Release ordering to ensure that the yield bit is visible to consumers before subsequent state changes or wakeups.

Source

pub fn try_unmark_yield(&self)

Attempts to clear the yield bit (STATUS_BIT_YIELD) in the status word.

§Purpose

This function is used to indicate that the current worker should stop yielding, i.e., it is no longer in a yielded state and is eligible to process new work. The yield bit is typically set to signal a worker to yield and released to allow the worker to resume normal operation. Clearing this bit is a coordinated operation to avoid spurious lost work or premature reactivation.

§Concurrency

The method uses a loop with atomic compare-and-exchange to guarantee that the yield bit is only cleared if it was previously set, handling concurrent attempts to manipulate this bit. In case there is a race and the bit has already been cleared by another thread, this function will exit quietly and make no changes.

§Behavior
  • If the yield bit is already clear, the function returns immediately.
  • Otherwise, it performs a compare-and-exchange to clear the bit. If this succeeds, it exits; if not, it reloads the word and repeats the process, only trying again if the yield bit is still set.
Source

pub fn mark_tasks(&self)

Marks the partition bit as active, indicating there is work in the partition.

This sets the STATUS_BIT_PARTITION bit in the status word. If this was a transition from no active partition to active (i.e., the bit was previously clear), it releases one permit to wake up a worker to process tasks in this partition.

§Example
// Called when a leaf in the partition becomes non-empty
waker.mark_tasks();
Source

pub fn try_unmark_tasks(&self)

Attempts to clear the partition active bit (STATUS_BIT_PARTITION) in the status word if no leaves in the partition are active.

This is typically called after a partition leaf transitions to empty. If the bit was set (partition was active), this function clears it, indicating no more work is present in the partition. If new work becomes available (i.e., the partition_summary is nonzero) after the bit is cleared, it immediately re-arms the bit to avoid lost wakeups.

This function is safe to call spuriously and will exit without making changes if the partition bit is already clear.

§Concurrency

Uses a loop with atomic compare-and-exchange to ensure the bit is only cleared if no other thread has concurrently set it again. If racing with a producer, the bit will be re-armed as needed to prevent missing new work.

Source

pub fn mark_active(&self, index: u64)

Marks a signal word at index (0..63) as active in the summary.

Called by producers when a queue transitions from empty to non-empty. If this is a 0→1 transition (bit was previously clear), adds 1 permit and wakes 1 sleeping thread.

§Fast Path

If the bit is already set, returns immediately without touching atomics. This is the common case when multiple producers push to the same word group.

§Arguments
  • index - Word index (0..63) to mark as active
§Example
// Producer pushes to queue 5 in word 0
let (was_empty, was_set) = signal.set(5);
if was_empty && was_set {
    waker.mark_active(0);  // Wake 1 consumer
}
Source

pub fn mark_active_mask(&self, mask: u64)

Batch version of mark_active(): marks multiple words as active at once.

Efficiently handles multiple queues becoming active simultaneously. Releases exactly k permits, where k is the number of 0→1 transitions (newly-active words).

§Optimization

Uses a single fetch_or instead of calling mark_active() in a loop, reducing atomic contention when many queues activate together.

§Arguments
  • mask - Bitmap of words to mark active (bit i = word i)
§Example
// Multiple queues became active
let mut active_words = 0u64;
for word_idx in 0..64 {
    if word_became_active(word_idx) {
        active_words |= 1 << word_idx;
    }
}
waker.mark_active_mask(active_words);  // Single atomic op
Source

pub fn try_unmark_if_empty(&self, bit_index: u64, signal: &AtomicU64)

Clears the summary bit for bit_index if the corresponding signal word is empty.

This is lazy cleanup - consumers call this after draining a word to prevent false positives in future snapshot_summary() calls. However, it’s safe to skip this; the system remains correct with stale summary bits.

§Arguments
  • bit_index - Word index (0..63) to potentially clear
  • signal - The actual signal word to check for emptiness
§Example
// After draining all queues in word 3
waker.try_unmark_if_empty(3, &signal_word_3);
Source

pub fn try_unmark(&self, bit_index: u64)

Unconditionally clears the summary bit for bit_index.

Faster than try_unmark_if_empty() when the caller already knows the word is empty (avoids checking the signal word).

§Arguments
  • bit_index - Word index (0..63) to clear
Source

pub fn snapshot_summary(&self) -> u64

Returns a snapshot of the current summary bitmap.

Consumers use this to quickly identify which word groups have potential work. If bit i is set, word i may have active queues (false positives possible due to lazy cleanup).

§Memory Ordering

Uses Relaxed because this is a hint, not a synchronization point. The actual queue data is synchronized via acquire/release on the permits counter.

§Returns

A u64 bitmap where bit i indicates word i has potential work.

§Example
let summary = waker.snapshot_summary();
for word_idx in 0..64 {
    if summary & (1 << word_idx) != 0 {
        // Check queues in word_idx
    }
}
Source

pub fn summary_select(&self, nearest_to_index: u64) -> u64

Finds the nearest set bit to nearest_to_index in the summary.

Useful for maintaining locality: continue working on queues near the last processed index, improving cache behavior.

§Arguments
  • nearest_to_index - Preferred starting point (0..63)
§Returns

The index of the nearest set bit, or undefined if summary is empty.

§Example
let mut last_word = 0;
loop {
    last_word = waker.summary_select(last_word);
    // Process queues in word last_word
}
Source

pub fn try_acquire(&self) -> bool

Non-blocking attempt to acquire a permit.

Atomically decrements the permit counter if available. This is the lock-free fast path used by consumers before resorting to blocking.

§Returns
  • true if a permit was consumed (consumer should process work)
  • false if no permits available (queue likely empty)
§Memory Ordering

Uses AcqRel to synchronize with producers’ Release in release(). This ensures queue data written by producers is visible to this consumer.

§Example
if waker.try_acquire() {
    // Process work (permit guarantees something is available)
} else {
    // No work, maybe park or spin
}
Source

pub fn acquire(&self)

Blocking acquire: parks the thread until a permit becomes available.

Tries the fast path first (try_acquire()), then falls back to parking on a condvar. Handles spurious wakeups by rechecking permits in a loop.

§Blocking Behavior
  1. Increment sleepers count
  2. Wait on condvar (releases mutex)
  3. Recheck permits after wakeup
  4. Decrement sleepers on exit
§Panics

Panics if the mutex or condvar is poisoned (indicates a panic in another thread while holding the lock).

§Example
loop {
    waker.acquire();  // Blocks until work available
    process_work();
}
Source

pub fn acquire_timeout(&self, timeout: Duration) -> bool

Blocking acquire with timeout.

Like acquire(), but returns after timeout if no permit becomes available. Useful for implementing shutdown or periodic maintenance.

§Arguments
  • timeout - Maximum duration to wait
§Returns
  • true if a permit was acquired
  • false if timed out without acquiring
§Example
use std::time::Duration;

loop {
    if waker.acquire_timeout(Duration::from_secs(1)) {
        process_work();
    } else {
        // Timeout - check for shutdown signal
        if should_shutdown() { break; }
    }
}
Source

pub fn release(&self, n: usize)

Releases n permits and wakes up to n sleeping threads.

Called by producers (indirectly via mark_active) when queues become active. Uses targeted wakeups: only notifies up to min(n, sleepers) threads, avoiding unnecessary notify_one() calls.

§Permit Accumulation

If no threads are sleeping, permits accumulate for future consumers. This guarantees no lost wakeups: late-arriving consumers find work immediately.

§Arguments
  • n - Number of permits to release (typically 1 or count of newly-active queues)
§Memory Ordering

Uses Release to ensure queue data is visible to consumers who Acquire via try_acquire().

§Example
// Producer activates 3 queues
waker.release(3);  // Wakes up to 3 sleeping consumers
Source

pub fn summary_bits(&self) -> u64

Returns the current summary bitmap.

Useful for debugging or metrics. Equivalent to snapshot_summary() but uses Acquire ordering for stronger visibility guarantees.

Source

pub fn permits(&self) -> u64

Returns the current number of available permits.

Useful for monitoring queue health or load. A high permit count may indicate consumers are falling behind.

Source

pub fn sleepers(&self) -> usize

Returns the approximate number of sleeping threads.

Best-effort count (uses Relaxed ordering). Useful for debugging or understanding system utilization.

Source

pub fn register_sleeper(&self)

Increments the sleeper count, indicating a thread is about to park.

Should be called BEFORE checking for work the final time, to prevent lost wakeups. The calling thread must unregister via unregister_sleeper() after waking up.

§Example
waker.register_sleeper();
// Final check for work
if has_work() {
    waker.unregister_sleeper();
    return; // Found work, don't park
}
// Actually park...
waker.acquire();
waker.unregister_sleeper();
Source

pub fn unregister_sleeper(&self)

Decrements the sleeper count, indicating a thread has woken up.

Should be called after waking up from acquire() or if aborting a park attempt after register_sleeper().

§Example
waker.register_sleeper();
// ... park ...
waker.unregister_sleeper(); // Woke up
Source

pub fn register_worker(&self) -> usize

Registers a new worker thread and returns the new total worker count.

Should be called when a FastTaskWorker thread starts. Workers use this count to partition the signal space for optimal load distribution.

§Returns

The new total worker count after registration.

§Example
let waker = arena.waker();
let total_workers = unsafe { (*waker).register_worker() };
println!("Now have {} workers", total_workers);
Source

pub fn unregister_worker(&self) -> usize

Unregisters a worker thread and returns the new total worker count.

Should be called when a FastTaskWorker thread stops. This allows remaining workers to reconfigure their partitions.

§Returns

The new total worker count after unregistration.

§Example
// Worker stopping
let waker = arena.waker();
let remaining_workers = unsafe { (*waker).unregister_worker() };
println!("{} workers remaining", remaining_workers);
Source

pub fn get_worker_count(&self) -> usize

Returns the current number of active worker threads.

Workers periodically check this value to detect when the worker count has changed and reconfigure their signal partitions accordingly.

Uses Relaxed ordering since workers can tolerate slightly stale values and will eventually see the update on their next check.

§Example
let waker = arena.waker();
let count = unsafe { (*waker).get_worker_count() };
if count != cached_count {
    // Reconfigure partition
}
Source

pub fn sync_partition_summary( &self, partition_start: usize, partition_end: usize, leaf_words: &[AtomicU64], ) -> bool

Synchronize partition summary from SummaryTree leaf range.

Samples the worker’s assigned partition of the SummaryTree and updates the local partition_summary bitmap. When the partition transitions from empty to non-empty, sets STATUS_BIT_PARTITION and adds a permit to wake the worker.

This should be called before parking to ensure the worker doesn’t sleep when tasks are available in its partition.

§Arguments
  • partition_start - First leaf index in this worker’s partition
  • partition_end - One past the last leaf index (exclusive)
  • leaf_words - Slice of AtomicU64 leaf words from SummaryTree
§Returns

true if the partition currently has work, false otherwise

§Panics

Panics in debug mode if partition is larger than 64 leafs

§Example
// Before parking, sync partition status
let waker = &service.wakers[worker_id];
let has_work = waker.sync_partition_summary(
    self.partition_start,
    self.partition_end,
    &self.arena.active_tree().leaf_words,
);
Source

pub fn partition_summary(&self) -> u64

Get current partition summary bitmap.

Returns a bitmap where bit i indicates whether leaf partition_start + i has active tasks. This is a snapshot and may become stale immediately.

Uses Relaxed ordering since this is a hint for optimization purposes.

§Returns

Bitmap of active leafs in this worker’s partition

Source

pub fn partition_leaf_has_work(&self, local_leaf_idx: usize) -> bool

Check if a specific leaf in the partition has work.

§Arguments
  • local_leaf_idx - Leaf index relative to partition start (0-63)
§Returns

true if the leaf appears to have work based on the cached summary

§Example
// Check if first leaf in partition has work
if waker.partition_leaf_has_work(0) {
    // Try to acquire from that leaf
}
Source

pub fn mark_partition_leaf_active(&self, local_leaf_idx: usize) -> bool

Directly update partition summary for a specific leaf.

This is called when a task is scheduled into a leaf to immediately update the partition owner’s summary without waiting for the next sync.

§Arguments
  • local_leaf_idx - Leaf index relative to partition start (0-63)
§Returns

true if this was the first active leaf (partition was empty before)

§Example
// When scheduling a task, immediately update owner's partition summary
let owner_waker = &service.wakers[owner_id];
if owner_waker.mark_partition_leaf_active(local_leaf_idx) {
    // This was the first task - worker will be woken by the partition flag
}
Source

pub fn clear_partition_leaf(&self, local_leaf_idx: usize)

Clear partition summary for a specific leaf.

Called when a leaf becomes empty. If this was the last active leaf, attempts to clear the partition status bit.

§Arguments
  • local_leaf_idx - Leaf index relative to partition start (0-63)

Trait Implementations§

Source§

impl Default for WorkerWaker

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V