Skip to main content

Crate nexus_notify

Crate nexus_notify 

Source
Expand description

Cross-thread notification with conflation and FIFO delivery.

Two primitives:

  • event_queue()(Notifier, Poller) — non-blocking. The consumer polls when it chooses.

  • event_channel()(Sender, Receiver) — blocking. The consumer blocks when idle and is woken by the producer. Wraps the event queue with crossbeam parker/unparker.

event_queue() creates a (Notifier, Poller) pair for signaling which items are ready for processing. An IO thread writes data into shared storage (e.g., a conflation slot) and calls Notifier::notify. The main event loop calls Poller::poll or Poller::poll_limit to discover which tokens fired.

§Architecture

Two concerns, cleanly separated:

  • Dedup flags — one AtomicBool per token. If the flag is already true when a producer calls notify(), the notification is a no-op (conflated). Single atomic swap, no queue interaction.

  • Delivery queue — a nexus_queue MPSC ring buffer. When a producer wins the flag swap (false → true), it pushes the token index into the FIFO queue. The consumer pops and clears the flag, re-arming it for future notifications.

Both Notifier and Poller store the flags as Arc<[AtomicBool]> — a single pointer deref to reach the flag array on every operation.

§Operations

Only three operations matter:

  • notify(token) — signal readiness. Conflated if already flagged. Returns Result so the caller owns the error policy (.unwrap() to crash, .ok() to swallow, or match to log).

  • poll(events) — drain all ready tokens into the events buffer.

  • poll_limit(events, limit) — drain up to limit tokens. Remaining items stay in the queue for the next call. Oldest notifications drain first (FIFO) — no starvation under budget.

§Invariants

  • Flag = true ⟺ token is in the queue (or being pushed). The consumer clears the flag on pop. Producers only set flags.

  • At most one queue entry per token. The flag gates admission. Two producers racing on the same token: one wins (pushes), the other sees true (conflated). Never two entries for the same index.

  • Queue cannot overflow. The flag ensures at most one entry per token. The underlying queue is sized to hold at least max_tokens entries. Overflow is an invariant violation (logic bug), reported via NotifyError.

  • FIFO delivery. The MPSC ring buffer preserves push order. The consumer sees tokens in the order they were first notified.

§Spurious Wakeups

If a slab key is freed and reassigned to a new item, a notify() in-flight for the old item fires the token for the new item. The consumer must tolerate spurious wakeups during the transition.

The user’s responsibilities:

  1. Stop calling notify() for a token before its key is reused.
  2. A callback’s token cannot change without informing the producer.
  3. Tolerate spurious wakeups during the deregister window.

Same contract as mio.

§Memory Ordering

The producer’s flag swap uses Acquire. The consumer’s flag clear uses Release. This establishes a happens-before chain: when the producer sees the flag cleared (false), the queue slot freed by the consumer’s pop is guaranteed to be visible. Without this, the producer could see the flag cleared but the queue slot still occupied under weak memory models (validated by MIRI).

§Performance (p50 cycles, measured)

OperationCyclesNotes
notify (conflated)16flag swap only
notify (new)16flag swap + CAS push
poll empty2single failed pop
poll N=848
poll N=128684~5.3 cy/token
poll_limit=32 (4096 ready)162O(limit)
cross-thread roundtrip362~100ns @ 3.5GHz

§Memory

For max_tokens = 4096: flags = 4 KB, MPSC queue = 64 KB (rounded to power-of-two), total ~68 KB.

§Example

use nexus_notify::{event_queue, Token};

// Setup
let (notifier, poller) = event_queue(64);
let mut events = nexus_notify::Events::with_capacity(64);

// Producer: signal readiness
let token = Token::new(0);
notifier.notify(token).unwrap();

// Consumer: discover what's ready
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.as_slice()[0].index(), 0);

§With poll_limit (budgeted drain)

use nexus_notify::{event_queue, Token};

let (notifier, poller) = event_queue(256);
let mut events = nexus_notify::Events::with_capacity(256);

// Many tokens ready
for i in 0..100 {
    notifier.notify(Token::new(i)).unwrap();
}

// Drain only 10 per iteration (oldest first)
poller.poll_limit(&mut events, 10);
assert_eq!(events.len(), 10);
assert_eq!(events.as_slice()[0].index(), 0);  // FIFO: oldest first

// Remaining 90 stay in the queue
poller.poll(&mut events);
assert_eq!(events.len(), 90);

Structs§

Events
Pre-allocated buffer of tokens returned by Poller::poll.
Notifier
Producer handle for signaling token readiness.
NotifyError
Push failed — internal queue was unexpectedly full.
Poller
Consumer handle for polling ready tokens.
Receiver
Consumer handle for the blocking event channel.
Sender
Producer handle for the blocking event channel.
Token
Opaque handle identifying a notification source.

Functions§

event_channel
Create a blocking event channel with capacity for max_tokens unique tokens.
event_queue
Create a notification channel with capacity for max_tokens unique tokens.