1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::{Duration, Instant};
4
5use crossbeam_utils::Backoff;
6use crossbeam_utils::sync::{Parker, Unparker};
7
8use crate::event_queue::{self, Events, Notifier, NotifyError, Poller, Token};
9
10const DEFAULT_SNOOZE_ITERS: usize = 8;
11
12struct ChannelShared {
17 receiver_parked: AtomicBool,
18}
19
20pub struct Sender {
33 notifier: Notifier,
34 unparker: Unparker,
35 shared: Arc<ChannelShared>,
36}
37
38impl Clone for Sender {
39 fn clone(&self) -> Self {
40 Self {
41 notifier: self.notifier.clone(),
42 unparker: self.unparker.clone(),
43 shared: Arc::clone(&self.shared),
44 }
45 }
46}
47
48impl Sender {
49 #[inline]
59 pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
60 self.notifier.notify(token)?;
61 if self.shared.receiver_parked.load(Ordering::SeqCst) {
62 self.unparker.unpark();
63 }
64 Ok(())
65 }
66}
67
68pub struct Receiver {
84 poller: Poller,
85 parker: Parker,
86 shared: Arc<ChannelShared>,
87 snooze_iters: usize,
88}
89
90impl Receiver {
91 pub fn recv(&self, events: &mut Events) {
96 self.recv_inner(events, usize::MAX);
97 }
98
99 pub fn recv_limit(&self, events: &mut Events, limit: usize) {
104 self.recv_inner(events, limit);
105 }
106
107 pub fn recv_timeout(&self, events: &mut Events, timeout: Duration) -> bool {
112 self.recv_timeout_inner(events, usize::MAX, timeout)
113 }
114
115 pub fn recv_timeout_limit(&self, events: &mut Events, limit: usize, timeout: Duration) -> bool {
120 self.recv_timeout_inner(events, limit, timeout)
121 }
122
123 #[inline]
125 pub fn try_recv(&self, events: &mut Events) {
126 self.poller.poll(events);
127 }
128
129 #[inline]
131 pub fn try_recv_limit(&self, events: &mut Events, limit: usize) {
132 self.poller.poll_limit(events, limit);
133 }
134
135 #[inline]
137 pub fn capacity(&self) -> usize {
138 self.poller.capacity()
139 }
140
141 fn recv_inner(&self, events: &mut Events, limit: usize) {
146 self.poller.poll_limit(events, limit);
148 if !events.is_empty() {
149 return;
150 }
151
152 let backoff = Backoff::new();
154 for _ in 0..self.snooze_iters {
155 backoff.snooze();
156 self.poller.poll_limit(events, limit);
157 if !events.is_empty() {
158 return;
159 }
160 }
161
162 loop {
164 self.shared.receiver_parked.store(true, Ordering::SeqCst);
167
168 self.poller.poll_limit(events, limit);
172 if !events.is_empty() {
173 self.shared.receiver_parked.store(false, Ordering::Relaxed);
174 return;
175 }
176
177 self.parker.park();
179 self.shared.receiver_parked.store(false, Ordering::Relaxed);
180
181 self.poller.poll_limit(events, limit);
183 if !events.is_empty() {
184 return;
185 }
186 }
188 }
189
190 fn recv_timeout_inner(&self, events: &mut Events, limit: usize, timeout: Duration) -> bool {
191 let deadline = Instant::now() + timeout;
192
193 self.poller.poll_limit(events, limit);
195 if !events.is_empty() {
196 return true;
197 }
198
199 let backoff = Backoff::new();
201 for _ in 0..self.snooze_iters {
202 if Instant::now() >= deadline {
203 return false;
204 }
205 backoff.snooze();
206 self.poller.poll_limit(events, limit);
207 if !events.is_empty() {
208 return true;
209 }
210 }
211
212 loop {
214 let now = Instant::now();
215 if now >= deadline {
216 return false;
217 }
218
219 self.shared.receiver_parked.store(true, Ordering::SeqCst);
220
221 self.poller.poll_limit(events, limit);
223 if !events.is_empty() {
224 self.shared.receiver_parked.store(false, Ordering::Relaxed);
225 return true;
226 }
227
228 let remaining = deadline - now;
229 self.parker.park_timeout(remaining);
230 self.shared.receiver_parked.store(false, Ordering::Relaxed);
231
232 self.poller.poll_limit(events, limit);
233 if !events.is_empty() {
234 return true;
235 }
236 }
237 }
238}
239
240#[cold]
281pub fn event_channel(max_tokens: usize) -> (Sender, Receiver) {
282 let (notifier, poller) = event_queue::event_queue(max_tokens);
283 let shared = Arc::new(ChannelShared {
284 receiver_parked: AtomicBool::new(false),
285 });
286 let parker = Parker::new();
287 let unparker = parker.unparker().clone();
288 (
289 Sender {
290 notifier,
291 unparker,
292 shared: Arc::clone(&shared),
293 },
294 Receiver {
295 poller,
296 parker,
297 shared,
298 snooze_iters: DEFAULT_SNOOZE_ITERS,
299 },
300 )
301}
302
303#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn try_recv_non_blocking() {
313 let (sender, receiver) = event_channel(64);
314 let mut events = Events::with_capacity(64);
315
316 receiver.try_recv(&mut events);
317 assert!(events.is_empty());
318
319 sender.notify(Token::new(5)).unwrap();
320 receiver.try_recv(&mut events);
321 assert_eq!(events.len(), 1);
322 assert_eq!(events.iter().next().unwrap().index(), 5);
323 }
324
325 #[test]
326 fn try_recv_limit() {
327 let (sender, receiver) = event_channel(64);
328 let mut events = Events::with_capacity(64);
329
330 for i in 0..10 {
331 sender.notify(Token::new(i)).unwrap();
332 }
333
334 receiver.try_recv_limit(&mut events, 3);
335 assert_eq!(events.len(), 3);
336
337 receiver.try_recv(&mut events);
338 assert_eq!(events.len(), 7);
339 }
340
341 #[test]
342 fn recv_returns_immediately_when_data_ready() {
343 let (sender, receiver) = event_channel(64);
344 let mut events = Events::with_capacity(64);
345
346 sender.notify(Token::new(10)).unwrap();
347 receiver.recv(&mut events);
348 assert_eq!(events.len(), 1);
349 assert_eq!(events.iter().next().unwrap().index(), 10);
350 }
351
352 #[test]
353 fn recv_blocks_and_wakes() {
354 let (sender, receiver) = event_channel(64);
355
356 let handle = std::thread::spawn(move || {
357 let mut events = Events::with_capacity(64);
358 receiver.recv(&mut events);
359 events.iter().map(|t| t.index()).collect::<Vec<_>>()
360 });
361
362 std::thread::sleep(Duration::from_millis(50));
364 sender.notify(Token::new(42)).unwrap();
365
366 let indices = handle.join().unwrap();
367 assert_eq!(indices, vec![42]);
368 }
369
370 #[test]
371 fn recv_limit_blocks_and_wakes() {
372 let (sender, receiver) = event_channel(64);
373
374 let handle = std::thread::spawn(move || {
375 let mut events = Events::with_capacity(64);
376 receiver.recv_limit(&mut events, 2);
377 events.iter().map(|t| t.index()).collect::<Vec<_>>()
378 });
379
380 std::thread::sleep(Duration::from_millis(50));
381 for i in 0..5 {
382 sender.notify(Token::new(i)).unwrap();
383 }
384
385 let indices = handle.join().unwrap();
386 assert!(indices.len() <= 2);
388 assert!(!indices.is_empty());
389 }
390
391 #[test]
392 fn recv_timeout_returns_true_on_data() {
393 let (sender, receiver) = event_channel(64);
394 let mut events = Events::with_capacity(64);
395
396 sender.notify(Token::new(7)).unwrap();
397 let got_data = receiver.recv_timeout(&mut events, Duration::from_secs(1));
398 assert!(got_data);
399 assert_eq!(events.len(), 1);
400 }
401
402 #[test]
403 fn recv_timeout_returns_false_on_timeout() {
404 let (_, receiver) = event_channel(64);
405 let mut events = Events::with_capacity(64);
406
407 let got_data = receiver.recv_timeout(&mut events, Duration::from_millis(10));
408 assert!(!got_data);
409 assert!(events.is_empty());
410 }
411
412 #[test]
413 fn recv_timeout_wakes_before_timeout() {
414 let (sender, receiver) = event_channel(64);
415
416 let handle = std::thread::spawn(move || {
417 let mut events = Events::with_capacity(64);
418 let got_data = receiver.recv_timeout(&mut events, Duration::from_secs(5));
419 (
420 got_data,
421 events.iter().map(|t| t.index()).collect::<Vec<_>>(),
422 )
423 });
424
425 std::thread::sleep(Duration::from_millis(50));
426 sender.notify(Token::new(42)).unwrap();
427
428 let (got_data, indices) = handle.join().unwrap();
429 assert!(got_data);
430 assert_eq!(indices, vec![42]);
431 }
432
433 #[test]
434 fn conflation() {
435 let (sender, receiver) = event_channel(64);
436 let mut events = Events::with_capacity(64);
437 let t = Token::new(7);
438
439 for _ in 0..100 {
440 sender.notify(t).unwrap();
441 }
442
443 receiver.recv(&mut events);
444 assert_eq!(events.len(), 1);
445 assert_eq!(events.iter().next().unwrap().index(), 7);
446 }
447
448 #[test]
449 fn fifo_ordering() {
450 let (sender, receiver) = event_channel(64);
451 let mut events = Events::with_capacity(64);
452
453 for i in 0..10 {
454 sender.notify(Token::new(i)).unwrap();
455 }
456
457 receiver.recv(&mut events);
458 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
459 assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
460 }
461
462 #[test]
463 fn multiple_recv_drains_incrementally() {
464 let (sender, receiver) = event_channel(64);
465 let mut events = Events::with_capacity(64);
466
467 for i in 0..10 {
468 sender.notify(Token::new(i)).unwrap();
469 }
470
471 receiver.recv_limit(&mut events, 3);
472 assert_eq!(events.len(), 3);
473
474 receiver.recv_limit(&mut events, 3);
475 assert_eq!(events.len(), 3);
476
477 receiver.try_recv(&mut events);
478 assert_eq!(events.len(), 4);
479 }
480
481 #[test]
482 fn capacity_1() {
483 let (sender, receiver) = event_channel(1);
484 let mut events = Events::with_capacity(1);
485
486 sender.notify(Token::new(0)).unwrap();
487 receiver.recv(&mut events);
488 assert_eq!(events.len(), 1);
489 }
490
491 #[test]
492 fn recv_timeout_zero_is_try_recv() {
493 let (_, receiver) = event_channel(64);
494 let mut events = Events::with_capacity(64);
495
496 let got_data = receiver.recv_timeout(&mut events, Duration::ZERO);
497 assert!(!got_data);
498 assert!(events.is_empty());
499 }
500}