Skip to main content

nexus_notify/
event_channel.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::{Duration, Instant};
4
5use crossbeam_utils::Backoff;
6use crossbeam_utils::sync::{Parker, Unparker};
7
8use crate::event_queue::{self, Events, Notifier, NotifyError, Poller, Token};
9
10const DEFAULT_SNOOZE_ITERS: usize = 8;
11
12// ========================================================
13// ChannelShared (parking coordination)
14// ========================================================
15
16struct ChannelShared {
17    receiver_parked: AtomicBool,
18}
19
20// ========================================================
21// Sender (producer)
22// ========================================================
23
24/// Producer handle for the blocking event channel.
25///
26/// Cloneable for MPSC. Same conflation semantics as [`Notifier`] —
27/// duplicate notifications are suppressed. Automatically wakes a
28/// parked [`Receiver`] on any successful notification (including
29/// conflated — a spurious wakeup is safe and self-correcting).
30///
31/// Obtained from [`event_channel()`].
32pub struct Sender {
33    notifier: Notifier,
34    unparker: Unparker,
35    shared: Arc<ChannelShared>,
36}
37
38impl Clone for Sender {
39    fn clone(&self) -> Self {
40        Self {
41            notifier: self.notifier.clone(),
42            unparker: self.unparker.clone(),
43            shared: Arc::clone(&self.shared),
44        }
45    }
46}
47
48impl Sender {
49    /// Signal that a token is ready.
50    ///
51    /// Same semantics as [`Notifier::notify`] — conflated if already
52    /// flagged. Additionally, wakes the receiver if it's parked.
53    ///
54    /// Every successful notify (including conflated) checks the parked
55    /// flag. A conflated notify may cause a spurious wakeup (receiver
56    /// wakes, polls, finds nothing new, re-parks). This is safe and
57    /// self-correcting. Correctness beats cleverness.
58    #[inline]
59    pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
60        self.notifier.notify(token)?;
61        if self.shared.receiver_parked.load(Ordering::SeqCst) {
62            self.unparker.unpark();
63        }
64        Ok(())
65    }
66}
67
68// ========================================================
69// Receiver (consumer)
70// ========================================================
71
72/// Consumer handle for the blocking event channel.
73///
74/// Not cloneable — single consumer. Provides blocking [`recv`](Receiver::recv)
75/// and [`recv_timeout`](Receiver::recv_timeout) in addition to non-blocking
76/// [`try_recv`](Receiver::try_recv).
77///
78/// Blocking methods use a three-phase wait: fast poll → backoff (snooze)
79/// → park. The receiver parks when idle and is woken by the sender on
80/// new notifications.
81///
82/// Obtained from [`event_channel()`].
83pub struct Receiver {
84    poller: Poller,
85    parker: Parker,
86    shared: Arc<ChannelShared>,
87    snooze_iters: usize,
88}
89
90impl Receiver {
91    /// Block until events are ready, then drain all into the buffer.
92    ///
93    /// Three-phase wait: poll (fast path) → backoff (snooze) → park.
94    /// Returns when at least one event is available.
95    pub fn recv(&self, events: &mut Events) {
96        self.recv_inner(events, usize::MAX);
97    }
98
99    /// Block until events are ready, then drain up to `limit`.
100    ///
101    /// Same three-phase wait. Returns when at least one event is
102    /// available. Oldest notifications drain first (FIFO).
103    pub fn recv_limit(&self, events: &mut Events, limit: usize) {
104        self.recv_inner(events, limit);
105    }
106
107    /// Block until events are ready, with timeout.
108    ///
109    /// Returns `true` if events were received, `false` if the timeout
110    /// elapsed with no events.
111    pub fn recv_timeout(&self, events: &mut Events, timeout: Duration) -> bool {
112        self.recv_timeout_inner(events, usize::MAX, timeout)
113    }
114
115    /// Block until events are ready, with timeout and limit.
116    ///
117    /// Returns `true` if events were received, `false` if the timeout
118    /// elapsed with no events.
119    pub fn recv_timeout_limit(&self, events: &mut Events, limit: usize, timeout: Duration) -> bool {
120        self.recv_timeout_inner(events, limit, timeout)
121    }
122
123    /// Non-blocking poll. Same as [`Poller::poll`].
124    #[inline]
125    pub fn try_recv(&self, events: &mut Events) {
126        self.poller.poll(events);
127    }
128
129    /// Non-blocking poll with limit. Same as [`Poller::poll_limit`].
130    #[inline]
131    pub fn try_recv_limit(&self, events: &mut Events, limit: usize) {
132        self.poller.poll_limit(events, limit);
133    }
134
135    /// The maximum token index (exclusive).
136    #[inline]
137    pub fn capacity(&self) -> usize {
138        self.poller.capacity()
139    }
140
141    // ========================================================
142    // Internal: three-phase recv
143    // ========================================================
144
145    fn recv_inner(&self, events: &mut Events, limit: usize) {
146        // Phase 1: fast path
147        self.poller.poll_limit(events, limit);
148        if !events.is_empty() {
149            return;
150        }
151
152        // Phase 2: backoff (snooze)
153        let backoff = Backoff::new();
154        for _ in 0..self.snooze_iters {
155            backoff.snooze();
156            self.poller.poll_limit(events, limit);
157            if !events.is_empty() {
158                return;
159            }
160        }
161
162        // Phase 3: park
163        loop {
164            // Set parked flag BEFORE re-checking the queue.
165            // SeqCst synchronizes with the sender's SeqCst load.
166            self.shared.receiver_parked.store(true, Ordering::SeqCst);
167
168            // Re-check after setting flag — prevents lost wakeup.
169            // If a producer pushed between our last poll and setting
170            // the flag, we catch it here instead of sleeping forever.
171            self.poller.poll_limit(events, limit);
172            if !events.is_empty() {
173                self.shared.receiver_parked.store(false, Ordering::Relaxed);
174                return;
175            }
176
177            // Safe to park — flag is set, queue is empty.
178            self.parker.park();
179            self.shared.receiver_parked.store(false, Ordering::Relaxed);
180
181            // Re-check after waking.
182            self.poller.poll_limit(events, limit);
183            if !events.is_empty() {
184                return;
185            }
186            // Spurious wakeup or conflated notify — loop and re-park.
187        }
188    }
189
190    fn recv_timeout_inner(&self, events: &mut Events, limit: usize, timeout: Duration) -> bool {
191        let deadline = Instant::now() + timeout;
192
193        // Phase 1: fast path
194        self.poller.poll_limit(events, limit);
195        if !events.is_empty() {
196            return true;
197        }
198
199        // Phase 2: backoff (snooze)
200        let backoff = Backoff::new();
201        for _ in 0..self.snooze_iters {
202            if Instant::now() >= deadline {
203                return false;
204            }
205            backoff.snooze();
206            self.poller.poll_limit(events, limit);
207            if !events.is_empty() {
208                return true;
209            }
210        }
211
212        // Phase 3: park with timeout
213        loop {
214            let now = Instant::now();
215            if now >= deadline {
216                return false;
217            }
218
219            self.shared.receiver_parked.store(true, Ordering::SeqCst);
220
221            // Re-check after setting flag.
222            self.poller.poll_limit(events, limit);
223            if !events.is_empty() {
224                self.shared.receiver_parked.store(false, Ordering::Relaxed);
225                return true;
226            }
227
228            let remaining = deadline - now;
229            self.parker.park_timeout(remaining);
230            self.shared.receiver_parked.store(false, Ordering::Relaxed);
231
232            self.poller.poll_limit(events, limit);
233            if !events.is_empty() {
234                return true;
235            }
236        }
237    }
238}
239
240// ========================================================
241// Constructor
242// ========================================================
243
244/// Create a blocking event channel with capacity for `max_tokens`
245/// unique tokens.
246///
247/// Returns a `(Sender, Receiver)` pair. The `Sender` is cloneable
248/// for multiple producers. The `Receiver` is single-consumer.
249///
250/// The sender automatically wakes a parked receiver on new
251/// notifications. Conflated notifications may cause spurious
252/// wakeups (safe and self-correcting).
253///
254/// # Panics
255///
256/// Panics if `max_tokens` is 0.
257///
258/// # Examples
259///
260/// ```
261/// use nexus_notify::{event_channel, Token, Events};
262/// use std::thread;
263///
264/// let (sender, receiver) = event_channel(64);
265/// let mut events = Events::with_capacity(64);
266///
267/// // Producer thread
268/// let s = sender.clone();
269/// let handle = thread::spawn(move || {
270///     s.notify(Token::new(42)).unwrap();
271/// });
272///
273/// // Consumer blocks until event arrives
274/// receiver.recv(&mut events);
275/// assert_eq!(events.len(), 1);
276/// assert_eq!(events.as_slice()[0].index(), 42);
277///
278/// handle.join().unwrap();
279/// ```
280#[cold]
281pub fn event_channel(max_tokens: usize) -> (Sender, Receiver) {
282    let (notifier, poller) = event_queue::event_queue(max_tokens);
283    let shared = Arc::new(ChannelShared {
284        receiver_parked: AtomicBool::new(false),
285    });
286    let parker = Parker::new();
287    let unparker = parker.unparker().clone();
288    (
289        Sender {
290            notifier,
291            unparker,
292            shared: Arc::clone(&shared),
293        },
294        Receiver {
295            poller,
296            parker,
297            shared,
298            snooze_iters: DEFAULT_SNOOZE_ITERS,
299        },
300    )
301}
302
303// ========================================================
304// Tests
305// ========================================================
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn try_recv_non_blocking() {
313        let (sender, receiver) = event_channel(64);
314        let mut events = Events::with_capacity(64);
315
316        receiver.try_recv(&mut events);
317        assert!(events.is_empty());
318
319        sender.notify(Token::new(5)).unwrap();
320        receiver.try_recv(&mut events);
321        assert_eq!(events.len(), 1);
322        assert_eq!(events.iter().next().unwrap().index(), 5);
323    }
324
325    #[test]
326    fn try_recv_limit() {
327        let (sender, receiver) = event_channel(64);
328        let mut events = Events::with_capacity(64);
329
330        for i in 0..10 {
331            sender.notify(Token::new(i)).unwrap();
332        }
333
334        receiver.try_recv_limit(&mut events, 3);
335        assert_eq!(events.len(), 3);
336
337        receiver.try_recv(&mut events);
338        assert_eq!(events.len(), 7);
339    }
340
341    #[test]
342    fn recv_returns_immediately_when_data_ready() {
343        let (sender, receiver) = event_channel(64);
344        let mut events = Events::with_capacity(64);
345
346        sender.notify(Token::new(10)).unwrap();
347        receiver.recv(&mut events);
348        assert_eq!(events.len(), 1);
349        assert_eq!(events.iter().next().unwrap().index(), 10);
350    }
351
352    #[test]
353    fn recv_blocks_and_wakes() {
354        let (sender, receiver) = event_channel(64);
355
356        let handle = std::thread::spawn(move || {
357            let mut events = Events::with_capacity(64);
358            receiver.recv(&mut events);
359            events.iter().map(|t| t.index()).collect::<Vec<_>>()
360        });
361
362        // Small delay to let receiver park
363        std::thread::sleep(Duration::from_millis(50));
364        sender.notify(Token::new(42)).unwrap();
365
366        let indices = handle.join().unwrap();
367        assert_eq!(indices, vec![42]);
368    }
369
370    #[test]
371    fn recv_limit_blocks_and_wakes() {
372        let (sender, receiver) = event_channel(64);
373
374        let handle = std::thread::spawn(move || {
375            let mut events = Events::with_capacity(64);
376            receiver.recv_limit(&mut events, 2);
377            events.iter().map(|t| t.index()).collect::<Vec<_>>()
378        });
379
380        std::thread::sleep(Duration::from_millis(50));
381        for i in 0..5 {
382            sender.notify(Token::new(i)).unwrap();
383        }
384
385        let indices = handle.join().unwrap();
386        // Should get at most 2 (limit)
387        assert!(indices.len() <= 2);
388        assert!(!indices.is_empty());
389    }
390
391    #[test]
392    fn recv_timeout_returns_true_on_data() {
393        let (sender, receiver) = event_channel(64);
394        let mut events = Events::with_capacity(64);
395
396        sender.notify(Token::new(7)).unwrap();
397        let got_data = receiver.recv_timeout(&mut events, Duration::from_secs(1));
398        assert!(got_data);
399        assert_eq!(events.len(), 1);
400    }
401
402    #[test]
403    fn recv_timeout_returns_false_on_timeout() {
404        let (_, receiver) = event_channel(64);
405        let mut events = Events::with_capacity(64);
406
407        let got_data = receiver.recv_timeout(&mut events, Duration::from_millis(10));
408        assert!(!got_data);
409        assert!(events.is_empty());
410    }
411
412    #[test]
413    fn recv_timeout_wakes_before_timeout() {
414        let (sender, receiver) = event_channel(64);
415
416        let handle = std::thread::spawn(move || {
417            let mut events = Events::with_capacity(64);
418            let got_data = receiver.recv_timeout(&mut events, Duration::from_secs(5));
419            (
420                got_data,
421                events.iter().map(|t| t.index()).collect::<Vec<_>>(),
422            )
423        });
424
425        std::thread::sleep(Duration::from_millis(50));
426        sender.notify(Token::new(42)).unwrap();
427
428        let (got_data, indices) = handle.join().unwrap();
429        assert!(got_data);
430        assert_eq!(indices, vec![42]);
431    }
432
433    #[test]
434    fn conflation() {
435        let (sender, receiver) = event_channel(64);
436        let mut events = Events::with_capacity(64);
437        let t = Token::new(7);
438
439        for _ in 0..100 {
440            sender.notify(t).unwrap();
441        }
442
443        receiver.recv(&mut events);
444        assert_eq!(events.len(), 1);
445        assert_eq!(events.iter().next().unwrap().index(), 7);
446    }
447
448    #[test]
449    fn fifo_ordering() {
450        let (sender, receiver) = event_channel(64);
451        let mut events = Events::with_capacity(64);
452
453        for i in 0..10 {
454            sender.notify(Token::new(i)).unwrap();
455        }
456
457        receiver.recv(&mut events);
458        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
459        assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
460    }
461
462    #[test]
463    fn multiple_recv_drains_incrementally() {
464        let (sender, receiver) = event_channel(64);
465        let mut events = Events::with_capacity(64);
466
467        for i in 0..10 {
468            sender.notify(Token::new(i)).unwrap();
469        }
470
471        receiver.recv_limit(&mut events, 3);
472        assert_eq!(events.len(), 3);
473
474        receiver.recv_limit(&mut events, 3);
475        assert_eq!(events.len(), 3);
476
477        receiver.try_recv(&mut events);
478        assert_eq!(events.len(), 4);
479    }
480
481    #[test]
482    fn capacity_1() {
483        let (sender, receiver) = event_channel(1);
484        let mut events = Events::with_capacity(1);
485
486        sender.notify(Token::new(0)).unwrap();
487        receiver.recv(&mut events);
488        assert_eq!(events.len(), 1);
489    }
490
491    #[test]
492    fn recv_timeout_zero_is_try_recv() {
493        let (_, receiver) = event_channel(64);
494        let mut events = Events::with_capacity(64);
495
496        let got_data = receiver.recv_timeout(&mut events, Duration::ZERO);
497        assert!(!got_data);
498        assert!(events.is_empty());
499    }
500}