ndless_async/mpsc.rs
1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected.
10//!
11//! # Disconnection
12//!
13//! When all [`Sender`] handles have been dropped, it is no longer
14//! possible to send values into the channel. This is considered the termination
15//! event of the stream. As such, [`Receiver::poll_next`][Stream::poll_next]
16//! will return `Ok(Ready(None))`.
17
18use alloc::rc::Rc;
19use core::pin::Pin;
20
21use crossbeam_queue::{ArrayQueue, PushError};
22use futures_util::stream::Stream;
23use futures_util::task::{AtomicWaker, Context, Poll};
24
25struct Queue<T> {
26 queue: ArrayQueue<T>,
27 waker: AtomicWaker,
28}
29
30/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
31///
32/// Being bounded, this channel provides backpressure to ensure that the sender
33/// outpaces the receiver by only a limited amount. The channel's capacity is
34/// equal to `buffer`. In other words, there are
35/// `buffer` "first come, first serve" slots available to all senders.
36///
37/// The [`Receiver`](Receiver) returned implements the
38/// [`Stream`](Stream) trait, while [`Sender`](Sender)
39/// has its own method, [`send`][Sender::send].
40pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
41 let queue = Rc::new(Queue {
42 queue: ArrayQueue::new(buffer),
43 waker: Default::default(),
44 });
45 (
46 Sender {
47 queue: queue.clone(),
48 },
49 Receiver { queue },
50 )
51}
52
53/// The transmission end of a bounded mpsc channel.
54///
55/// This value is created by the [`channel`] function.
56#[derive(Clone)]
57pub struct Sender<T> {
58 queue: Rc<Queue<T>>,
59}
60
61impl<T> Sender<T> {
62 /// Sends data across to the receiver.
63 pub fn send(&self, data: T) -> Result<(), PushError<T>> {
64 self.queue.waker.wake();
65 self.queue.queue.push(data)
66 }
67 pub fn capacity(&self) -> usize {
68 self.queue.queue.capacity()
69 }
70 pub fn is_empty(&self) -> bool {
71 self.queue.queue.is_empty()
72 }
73 pub fn is_full(&self) -> bool {
74 self.queue.queue.is_full()
75 }
76 pub fn len(&self) -> usize {
77 self.queue.queue.len()
78 }
79}
80
81/// The receiving end of a bounded mpsc channel.
82///
83/// This value is created by the [`channel`] function.
84pub struct Receiver<T> {
85 queue: Rc<Queue<T>>,
86}
87
88impl<T> Stream for Receiver<T> {
89 type Item = T;
90
91 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
92 self.queue.waker.register(&cx.waker());
93 match self.queue.queue.pop() {
94 Ok(data) => Poll::Ready(Some(data)),
95 _ => {
96 if Rc::strong_count(&self.queue) < 2 {
97 Poll::Ready(None)
98 } else {
99 Poll::Pending
100 }
101 }
102 }
103 }
104}