ndless_async/
mpsc.rs

1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected.
10//!
11//! # Disconnection
12//!
13//! When all [`Sender`] handles have been dropped, it is no longer
14//! possible to send values into the channel. This is considered the termination
15//! event of the stream. As such, [`Receiver::poll_next`][Stream::poll_next]
16//! will return `Ok(Ready(None))`.
17
18use alloc::rc::Rc;
19use core::pin::Pin;
20
21use crossbeam_queue::{ArrayQueue, PushError};
22use futures_util::stream::Stream;
23use futures_util::task::{AtomicWaker, Context, Poll};
24
25struct Queue<T> {
26	queue: ArrayQueue<T>,
27	waker: AtomicWaker,
28}
29
30/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
31///
32/// Being bounded, this channel provides backpressure to ensure that the sender
33/// outpaces the receiver by only a limited amount. The channel's capacity is
34/// equal to `buffer`. In other words, there are
35/// `buffer` "first come, first serve" slots available to all senders.
36///
37/// The [`Receiver`](Receiver) returned implements the
38/// [`Stream`](Stream) trait, while [`Sender`](Sender)
39/// has its own method, [`send`][Sender::send].
40pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
41	let queue = Rc::new(Queue {
42		queue: ArrayQueue::new(buffer),
43		waker: Default::default(),
44	});
45	(
46		Sender {
47			queue: queue.clone(),
48		},
49		Receiver { queue },
50	)
51}
52
53/// The transmission end of a bounded mpsc channel.
54///
55/// This value is created by the [`channel`] function.
56#[derive(Clone)]
57pub struct Sender<T> {
58	queue: Rc<Queue<T>>,
59}
60
61impl<T> Sender<T> {
62	/// Sends data across to the receiver.
63	pub fn send(&self, data: T) -> Result<(), PushError<T>> {
64		self.queue.waker.wake();
65		self.queue.queue.push(data)
66	}
67	pub fn capacity(&self) -> usize {
68		self.queue.queue.capacity()
69	}
70	pub fn is_empty(&self) -> bool {
71		self.queue.queue.is_empty()
72	}
73	pub fn is_full(&self) -> bool {
74		self.queue.queue.is_full()
75	}
76	pub fn len(&self) -> usize {
77		self.queue.queue.len()
78	}
79}
80
81/// The receiving end of a bounded mpsc channel.
82///
83/// This value is created by the [`channel`] function.
84pub struct Receiver<T> {
85	queue: Rc<Queue<T>>,
86}
87
88impl<T> Stream for Receiver<T> {
89	type Item = T;
90
91	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
92		self.queue.waker.register(&cx.waker());
93		match self.queue.queue.pop() {
94			Ok(data) => Poll::Ready(Some(data)),
95			_ => {
96				if Rc::strong_count(&self.queue) < 2 {
97					Poll::Ready(None)
98				} else {
99					Poll::Pending
100				}
101			}
102		}
103	}
104}