Skip to main content

nexus_notify/
lib.rs

1//! Cross-thread notification with conflation and FIFO delivery.
2//!
3//! Two primitives:
4//!
5//! - **[`event_queue()`]** → `(Notifier, Poller)` — non-blocking.
6//!   The consumer polls when it chooses.
7//!
8//! - **[`event_channel()`]** → `(Sender, Receiver)` — blocking.
9//!   The consumer blocks when idle and is woken by the producer.
10//!   Wraps the event queue with crossbeam parker/unparker.
11//!
12//! [`event_queue()`] creates a `(Notifier, Poller)` pair for signaling which
13//! items are ready for processing. An IO thread writes data into shared
14//! storage (e.g., a conflation slot) and calls [`Notifier::notify`]. The
15//! main event loop calls [`Poller::poll`] or [`Poller::poll_limit`] to
16//! discover which tokens fired.
17//!
18//! # Architecture
19//!
20//! Two concerns, cleanly separated:
21//!
22//! - **Dedup flags** — one [`AtomicBool`](core::sync::atomic::AtomicBool)
23//!   per token. If the flag is already `true` when a producer calls
24//!   [`notify()`](Notifier::notify), the notification is a no-op (conflated).
25//!   Single atomic swap, no queue interaction.
26//!
27//! - **Delivery queue** — a [`nexus_queue`] MPSC ring buffer. When a
28//!   producer wins the flag swap (`false → true`), it pushes the token
29//!   index into the FIFO queue. The consumer pops and clears the flag,
30//!   re-arming it for future notifications.
31//!
32//! Both [`Notifier`] and [`Poller`] store the flags as `Arc<[AtomicBool]>` —
33//! a single pointer deref to reach the flag array on every operation.
34//!
35//! # Operations
36//!
37//! Only three operations matter:
38//!
39//! - **[`notify(token)`](Notifier::notify)** — signal readiness. Conflated
40//!   if already flagged. Returns `Result` so the caller owns the error
41//!   policy (`.unwrap()` to crash, `.ok()` to swallow, or match to log).
42//!
43//! - **[`poll(events)`](Poller::poll)** — drain all ready tokens into the
44//!   events buffer.
45//!
46//! - **[`poll_limit(events, limit)`](Poller::poll_limit)** — drain up to
47//!   `limit` tokens. Remaining items stay in the queue for the next call.
48//!   Oldest notifications drain first (FIFO) — no starvation under budget.
49//!
50//! # Invariants
51//!
52//! - **Flag = `true` ⟺ token is in the queue (or being pushed).** The
53//!   consumer clears the flag on pop. Producers only set flags.
54//!
55//! - **At most one queue entry per token.** The flag gates admission.
56//!   Two producers racing on the same token: one wins (pushes), the other
57//!   sees `true` (conflated). Never two entries for the same index.
58//!
59//! - **Queue cannot overflow.** The flag ensures at most one entry per
60//!   token. The underlying queue is sized to hold at least `max_tokens`
61//!   entries. Overflow is an invariant violation (logic bug), reported
62//!   via [`NotifyError`].
63//!
64//! - **FIFO delivery.** The MPSC ring buffer preserves push order. The
65//!   consumer sees tokens in the order they were first notified.
66//!
67//! # Spurious Wakeups
68//!
69//! If a slab key is freed and reassigned to a new item, a [`notify()`](Notifier::notify)
70//! in-flight for the old item fires the token for the new item. The
71//! consumer must tolerate spurious wakeups during the transition.
72//!
73//! The user's responsibilities:
74//! 1. Stop calling `notify()` for a token before its key is reused.
75//! 2. A callback's token cannot change without informing the producer.
76//! 3. Tolerate spurious wakeups during the deregister window.
77//!
78//! Same contract as mio.
79//!
80//! # Memory Ordering
81//!
82//! The producer's flag swap uses `Acquire`. The consumer's flag clear
83//! uses `Release`. This establishes a happens-before chain: when the
84//! producer sees the flag cleared (`false`), the queue slot freed by
85//! the consumer's pop is guaranteed to be visible. Without this,
86//! the producer could see the flag cleared but the queue slot still
87//! occupied under weak memory models (validated by MIRI).
88//!
89//! # Performance (p50 cycles, measured)
90//!
91//! | Operation | Cycles | Notes |
92//! |-----------|--------|-------|
93//! | notify (conflated) | 16 | flag swap only |
94//! | notify (new) | 16 | flag swap + CAS push |
95//! | poll empty | 2 | single failed pop |
96//! | poll N=8 | 48 | |
97//! | poll N=128 | 684 | ~5.3 cy/token |
98//! | poll_limit=32 (4096 ready) | 162 | O(limit) |
99//! | cross-thread roundtrip | 362 | ~100ns @ 3.5GHz |
100//!
101//! # Memory
102//!
103//! For `max_tokens = 4096`: flags = 4 KB, MPSC queue = 64 KB (rounded
104//! to power-of-two), total ~68 KB.
105//!
106//! # Example
107//!
108//! ```
109//! use nexus_notify::{event_queue, Token};
110//!
111//! // Setup
112//! let (notifier, poller) = event_queue(64);
113//! let mut events = nexus_notify::Events::with_capacity(64);
114//!
115//! // Producer: signal readiness
116//! let token = Token::new(0);
117//! notifier.notify(token).unwrap();
118//!
119//! // Consumer: discover what's ready
120//! poller.poll(&mut events);
121//! assert_eq!(events.len(), 1);
122//! assert_eq!(events.as_slice()[0].index(), 0);
123//! ```
124//!
125//! ## With poll_limit (budgeted drain)
126//!
127//! ```
128//! use nexus_notify::{event_queue, Token};
129//!
130//! let (notifier, poller) = event_queue(256);
131//! let mut events = nexus_notify::Events::with_capacity(256);
132//!
133//! // Many tokens ready
134//! for i in 0..100 {
135//!     notifier.notify(Token::new(i)).unwrap();
136//! }
137//!
138//! // Drain only 10 per iteration (oldest first)
139//! poller.poll_limit(&mut events, 10);
140//! assert_eq!(events.len(), 10);
141//! assert_eq!(events.as_slice()[0].index(), 0);  // FIFO: oldest first
142//!
143//! // Remaining 90 stay in the queue
144//! poller.poll(&mut events);
145//! assert_eq!(events.len(), 90);
146//! ```
147
148mod event_channel;
149mod event_queue;
150
151pub use event_channel::{Receiver, Sender, event_channel};
152pub use event_queue::{Events, Notifier, NotifyError, Poller, Token, event_queue};