Expand description
Cross-thread notification with conflation and FIFO delivery.
Two primitives:
-
event_queue()→(Notifier, Poller)— non-blocking. The consumer polls when it chooses. -
event_channel()→(Sender, Receiver)— blocking. The consumer blocks when idle and is woken by the producer. Wraps the event queue with crossbeam parker/unparker.
event_queue() creates a (Notifier, Poller) pair for signaling which
items are ready for processing. An IO thread writes data into shared
storage (e.g., a conflation slot) and calls Notifier::notify. The
main event loop calls Poller::poll or Poller::poll_limit to
discover which tokens fired.
§Architecture
Two concerns, cleanly separated:
-
Dedup flags — one
AtomicBoolper token. If the flag is alreadytruewhen a producer callsnotify(), the notification is a no-op (conflated). Single atomic swap, no queue interaction. -
Delivery queue — a
nexus_queueMPSC ring buffer. When a producer wins the flag swap (false → true), it pushes the token index into the FIFO queue. The consumer pops and clears the flag, re-arming it for future notifications.
Both Notifier and Poller store the flags as Arc<[AtomicBool]> —
a single pointer deref to reach the flag array on every operation.
§Operations
Only three operations matter:
-
notify(token)— signal readiness. Conflated if already flagged. ReturnsResultso the caller owns the error policy (.unwrap()to crash,.ok()to swallow, or match to log). -
poll(events)— drain all ready tokens into the events buffer. -
poll_limit(events, limit)— drain up tolimittokens. Remaining items stay in the queue for the next call. Oldest notifications drain first (FIFO) — no starvation under budget.
§Invariants
-
Flag =
true⟺ token is in the queue (or being pushed). The consumer clears the flag on pop. Producers only set flags. -
At most one queue entry per token. The flag gates admission. Two producers racing on the same token: one wins (pushes), the other sees
true(conflated). Never two entries for the same index. -
Queue cannot overflow. The flag ensures at most one entry per token. The underlying queue is sized to hold at least
max_tokensentries. Overflow is an invariant violation (logic bug), reported viaNotifyError. -
FIFO delivery. The MPSC ring buffer preserves push order. The consumer sees tokens in the order they were first notified.
§Spurious Wakeups
If a slab key is freed and reassigned to a new item, a notify()
in-flight for the old item fires the token for the new item. The
consumer must tolerate spurious wakeups during the transition.
The user’s responsibilities:
- Stop calling
notify()for a token before its key is reused. - A callback’s token cannot change without informing the producer.
- Tolerate spurious wakeups during the deregister window.
Same contract as mio.
§Memory Ordering
The producer’s flag swap uses Acquire. The consumer’s flag clear
uses Release. This establishes a happens-before chain: when the
producer sees the flag cleared (false), the queue slot freed by
the consumer’s pop is guaranteed to be visible. Without this,
the producer could see the flag cleared but the queue slot still
occupied under weak memory models (validated by MIRI).
§Performance (p50 cycles, measured)
| Operation | Cycles | Notes |
|---|---|---|
| notify (conflated) | 16 | flag swap only |
| notify (new) | 16 | flag swap + CAS push |
| poll empty | 2 | single failed pop |
| poll N=8 | 48 | |
| poll N=128 | 684 | ~5.3 cy/token |
| poll_limit=32 (4096 ready) | 162 | O(limit) |
| cross-thread roundtrip | 362 | ~100ns @ 3.5GHz |
§Memory
For max_tokens = 4096: flags = 4 KB, MPSC queue = 64 KB (rounded
to power-of-two), total ~68 KB.
§Example
use nexus_notify::{event_queue, Token};
// Setup
let (notifier, poller) = event_queue(64);
let mut events = nexus_notify::Events::with_capacity(64);
// Producer: signal readiness
let token = Token::new(0);
notifier.notify(token).unwrap();
// Consumer: discover what's ready
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.as_slice()[0].index(), 0);§With poll_limit (budgeted drain)
use nexus_notify::{event_queue, Token};
let (notifier, poller) = event_queue(256);
let mut events = nexus_notify::Events::with_capacity(256);
// Many tokens ready
for i in 0..100 {
notifier.notify(Token::new(i)).unwrap();
}
// Drain only 10 per iteration (oldest first)
poller.poll_limit(&mut events, 10);
assert_eq!(events.len(), 10);
assert_eq!(events.as_slice()[0].index(), 0); // FIFO: oldest first
// Remaining 90 stay in the queue
poller.poll(&mut events);
assert_eq!(events.len(), 90);Structs§
- Events
- Pre-allocated buffer of tokens returned by
Poller::poll. - Notifier
- Producer handle for signaling token readiness.
- Notify
Error - Push failed — internal queue was unexpectedly full.
- Poller
- Consumer handle for polling ready tokens.
- Receiver
- Consumer handle for the blocking event channel.
- Sender
- Producer handle for the blocking event channel.
- Token
- Opaque handle identifying a notification source.
Functions§
- event_
channel - Create a blocking event channel with capacity for
max_tokensunique tokens. - event_
queue - Create a notification channel with capacity for
max_tokensunique tokens.