Cross-thread notification with conflation and FIFO delivery.
Two primitives:
-
[
event_queue()] →(Notifier, Poller)— non-blocking. The consumer polls when it chooses. -
[
event_channel()] →(Sender, Receiver)— blocking. The consumer blocks when idle and is woken by the producer. Wraps the event queue with crossbeam parker/unparker.
[event_queue()] creates a (Notifier, Poller) pair for signaling which
items are ready for processing. An IO thread writes data into shared
storage (e.g., a conflation slot) and calls [Notifier::notify]. The
main event loop calls [Poller::poll] or [Poller::poll_limit] to
discover which tokens fired.
Architecture
Two concerns, cleanly separated:
-
Dedup flags — one
AtomicBoolper token. If the flag is alreadytruewhen a producer callsnotify(), the notification is a no-op (conflated). Single atomic swap, no queue interaction. -
Delivery queue — a [
nexus_queue] MPSC ring buffer. When a producer wins the flag swap (false → true), it pushes the token index into the FIFO queue. The consumer pops and clears the flag, re-arming it for future notifications.
Both [Notifier] and [Poller] store the flags as Arc<[AtomicBool]> —
a single pointer deref to reach the flag array on every operation.
Operations
Only three operations matter:
-
notify(token)— signal readiness. Conflated if already flagged. ReturnsResultso the caller owns the error policy (.unwrap()to crash,.ok()to swallow, or match to log). -
poll(events)— drain all ready tokens into the events buffer. -
poll_limit(events, limit)— drain up tolimittokens. Remaining items stay in the queue for the next call. Oldest notifications drain first (FIFO) — no starvation under budget.
Invariants
-
Flag =
true⟺ token is in the queue (or being pushed). The consumer clears the flag on pop. Producers only set flags. -
At most one queue entry per token. The flag gates admission. Two producers racing on the same token: one wins (pushes), the other sees
true(conflated). Never two entries for the same index. -
Queue cannot overflow. The flag ensures at most one entry per token. The underlying queue is sized to hold at least
max_tokensentries. Overflow is an invariant violation (logic bug), reported via [NotifyError]. -
FIFO delivery. The MPSC ring buffer preserves push order. The consumer sees tokens in the order they were first notified.
Spurious Wakeups
If a slab key is freed and reassigned to a new item, a notify()
in-flight for the old item fires the token for the new item. The
consumer must tolerate spurious wakeups during the transition.
The user's responsibilities:
- Stop calling
notify()for a token before its key is reused. - A callback's token cannot change without informing the producer.
- Tolerate spurious wakeups during the deregister window.
Same contract as mio.
Memory Ordering
The producer's flag swap uses Acquire. The consumer's flag clear
uses Release. This establishes a happens-before chain: when the
producer sees the flag cleared (false), the queue slot freed by
the consumer's pop is guaranteed to be visible. Without this,
the producer could see the flag cleared but the queue slot still
occupied under weak memory models (validated by MIRI).
Performance (p50 cycles, measured)
| Operation | Cycles | Notes |
|---|---|---|
| notify (conflated) | 16 | flag swap only |
| notify (new) | 16 | flag swap + CAS push |
| poll empty | 2 | single failed pop |
| poll N=8 | 48 | |
| poll N=128 | 684 | ~5.3 cy/token |
| poll_limit=32 (4096 ready) | 162 | O(limit) |
| cross-thread roundtrip | 362 | ~100ns @ 3.5GHz |
Memory
For max_tokens = 4096: flags = 4 KB, MPSC queue = 64 KB (rounded
to power-of-two), total ~68 KB.
Example
use ;
// Setup
let = event_queue;
let mut events = with_capacity;
// Producer: signal readiness
let token = new;
notifier.notify.unwrap;
// Consumer: discover what's ready
poller.poll;
assert_eq!;
assert_eq!;
With poll_limit (budgeted drain)
use ;
let = event_queue;
let mut events = with_capacity;
// Many tokens ready
for i in 0..100
// Drain only 10 per iteration (oldest first)
poller.poll_limit;
assert_eq!;
assert_eq!; // FIFO: oldest first
// Remaining 90 stay in the queue
poller.poll;
assert_eq!;