odem_rs_sync/
channel.rs

1//! This module contains support for MPMC-channels which synchronize sender and
2//! receiver continuations over a message queue.
3
4use core::{
5	cell::{Cell, UnsafeCell},
6	task::Waker,
7};
8
9#[cfg(feature = "tracing")]
10use tracing::trace;
11
12use crate::{
13	Subscriber,
14	chain::Chain,
15	error::{SendError, TryRecvError, TrySendError},
16};
17
18#[cfg(feature = "alloc")]
19#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
20pub mod shared;
21
22pub mod pinned;
23
24pub mod backing;
25
26pub use pinned::{Channel, ChannelExt};
27
28pub use backing::{ArrayBuf, Rendezvous};
29
30#[cfg(feature = "alloc")]
31pub use shared::channel;
32
33/* ******************************************************** Ring Buffer Trait */
34
35/// The trait that qualifies a buffer to be used as a backing store for
36/// [Channels](Channel).
37pub trait RingBuf {
38	/// The type of item stored inside the ring buffer.
39	type Item;
40
41	/// Places an item into the buffer, returning an `Ok` result if successful
42	/// and the item wrapped in an `Err` result if the buffer is full.
43	fn enqueue(&mut self, item: Self::Item) -> Result<(), Self::Item>;
44
45	/// Extracts an item from the buffer, returning `None` if the buffer is
46	/// empty.
47	fn dequeue(&mut self) -> Option<Self::Item>;
48
49	/// Removes all stored elements from the buffer.
50	fn clear(&mut self);
51
52	/// Returns the number of elements that are currently stored in the buffer.
53	fn len(&self) -> usize;
54
55	/// Returns whether the buffer is currently empty.
56	fn is_empty(&self) -> bool {
57		self.len() == 0
58	}
59}
60
61/* ****************************************************** Channel Inner Parts */
62
63/// Helper structure to allow the unsizing coercion to function in stable Rust.
64///
65/// The reason this indirection is needed is that unsizing doesn't work if the
66/// unsized template argument is referred to more than once in the struct.
67/// Using this structure allows us to separate the associated type of the
68/// [`RingBuf`]-trait from the potentially unsized ring buffer.
69struct Inner<T, R: ?Sized + RingBuf<Item = T>> {
70	/// A chain of continuations waiting for reactivation as soon as there is a
71	/// message to send or to receive.
72	chain: Chain<OneShot<T, Waker>>,
73	/// The number of continuations waiting to receive or to send a message.
74	/// Negative values indicate continuations waiting to receive a message,
75	/// positive values indicate continuations waiting to send a message,
76	/// and 0 indicates that no continuations are waiting to send or receive
77	/// messages.
78	pending: Cell<isize>,
79	/// A ring buffer containing messages to be received. May have capacity `0`,
80	/// in which case the corresponding channel is a rendezvous channel.
81	buffer: UnsafeCell<R>,
82}
83
84impl<R: ?Sized + RingBuf> Inner<R::Item, R> {
85	/// Creates a new channel that uses the backing store passed as an argument.
86	fn new(buffer: R) -> Self
87	where
88		R: Sized,
89	{
90		Self {
91			chain: Chain::new(),
92			pending: Cell::new(0),
93			buffer: UnsafeCell::new(buffer),
94		}
95	}
96
97	/// Closes the channel causing any waiting sender or receiver to return
98	/// immediately with an error indicating a disconnect. Any additional send
99	/// operation after the channel is closed fails immediately. Receive
100	/// operations fail once the message buffer is empty.
101	fn close(&self) {
102		if !self.is_closed() {
103			#[cfg(feature = "tracing")]
104			trace!("closing channel");
105
106			self.pending.set(isize::MIN);
107			self.chain.notify_all().go();
108		}
109	}
110
111	/// Returns whether this channel is closed or not.
112	fn is_closed(&self) -> bool {
113		self.pending.get() == isize::MIN
114	}
115
116	fn add_pending_sender(&self) {
117		self.pending.set(self.pending.get() + 1);
118	}
119
120	fn sub_pending_sender(&self) {
121		self.pending.set(self.pending.get() - 1);
122	}
123
124	fn add_pending_receiver(&self) {
125		self.sub_pending_sender();
126	}
127
128	fn sub_pending_receiver(&self) {
129		self.add_pending_sender();
130	}
131
132	fn has_pending_sender(&self) -> bool {
133		self.pending.get() > 0
134	}
135
136	fn has_pending_receiver(&self) -> bool {
137		self.pending.get() < 0
138	}
139
140	/// Resets the channel by clearing the message buffer and re-opening it if
141	/// it has been closed before. Panics if there are still [send](Self::send)
142	/// or [recv](Self::recv) operations pending.
143	fn reset(&self) {
144		// reset the number of pending senders/receivers
145		match self.pending.get() {
146			0 | isize::MIN => self.pending.set(0),
147			n => panic!(
148				"resetting the channel failed because there are {} pending \
149				 {}-operations",
150				n.unsigned_abs(),
151				if n > 0 { "send" } else { "recv" }
152			),
153		}
154
155		// clear the message buffer
156		unsafe { &mut *self.buffer.get() }.clear();
157	}
158
159	/// Attempt to send the value without waiting.
160	///
161	/// This can fail for two reasons:
162	/// 1. The channel is closed, i.e. [`close`] has been called.
163	/// 2. The message buffer is full.
164	///
165	/// The `Err`-result contains the initially provided item in either case.
166	///
167	/// [`close`]: Channel::close
168	fn try_send(&self, item: R::Item) -> Result<(), TrySendError<R::Item>> {
169		// test if there are receivers waiting for a message
170		if self.has_pending_receiver() {
171			// test if the channel is still open
172			if !self.is_closed() {
173				// decrease the number of pending messages
174				self.pending.set(self.pending.get() + 1);
175
176				#[cfg(feature = "tracing")]
177				trace!(
178					remaining = self.pending.get().unsigned_abs(),
179					"passing message to the first receiver"
180				);
181
182				// transmit the item to the first receiver
183				self.chain
184					.notify_one()
185					.then(|c| c.give(item))
186					.go()
187					.expect("receiver waiting for message");
188
189				Ok(())
190			} else {
191				// closed: don't accept any more messages
192				Err(TrySendError::Disconnected(item)).inspect_err(|_err| {
193					#[cfg(feature = "tracing")]
194					trace!("message cannot be sent: {_err}");
195				})
196			}
197		} else {
198			// insert the message into the buffer
199			self.give(item)
200				.map_err(TrySendError::Full)
201				.inspect(|_| {
202					#[cfg(feature = "tracing")]
203					trace!(
204						buffer_len = self.buffer_len(),
205						"message buffered in channel"
206					);
207				})
208				.inspect_err(|_err| {
209					#[cfg(feature = "tracing")]
210					trace!("message cannot be sent: {_err}");
211				})
212		}
213	}
214
215	/// Attempts to receive a value of the channel without waiting.
216	///
217	/// This can fail for two reasons:
218	/// 1. The channel is closed, i.e. [`close`] has been called.
219	/// 2. The message buffer is empty.
220	///
221	/// [`close`]: Channel::close
222	fn try_recv(&self) -> Result<R::Item, TryRecvError> {
223		// attempt to extract the next message from the underlying buffer
224		match self.take() {
225			Some(msg) => {
226				#[cfg(feature = "tracing")]
227				trace!(
228					buffer_len = self.buffer_len(),
229					"extracted message from the channel's buffer"
230				);
231
232				// test if there are senders waiting to transmit a message
233				if self.has_pending_sender() {
234					// receive a message from the longest waiting sender
235					let item = unsafe {
236						self.chain
237							.notify_one()
238							.then(OneShot::take)
239							.go()
240							.unwrap_unchecked()
241							.unwrap_unchecked()
242					};
243
244					// decrease the number of waiting senders
245					self.sub_pending_sender();
246
247					// add the message to the buffer
248					self.give(item)
249						.ok()
250						.expect("buffer should not be full after an element has been extracted");
251				}
252
253				// return the message extracted from the buffer
254				Ok(msg)
255			}
256			_ => {
257				if self.has_pending_sender() {
258					// decrease the number of waiting senders
259					self.sub_pending_sender();
260
261					#[cfg(feature = "tracing")]
262					trace!(
263						remaining = self.pending.get(),
264						"received message from a waiting sender"
265					);
266
267					let Some(Some(msg)) = self.chain.notify_one().then(OneShot::take).go() else {
268						unreachable!("pending senders but no message")
269					};
270
271					Ok(msg)
272				} else {
273					if self.is_closed() {
274						Err(TryRecvError::Disconnected)
275					} else {
276						Err(TryRecvError::Empty)
277					}
278					.inspect_err(|_err| {
279						#[cfg(feature = "tracing")]
280						trace!("message could not be received: {_err}");
281					})
282				}
283			}
284		}
285	}
286
287	/// Returns the number of elements currently queued to be received.
288	///
289	/// This number can be negative if there are [`RecvFuture`] currently
290	/// waiting to receive messages.
291	fn count(&self) -> isize {
292		self.buffer_len() as isize + self.pending.get()
293	}
294
295	/// Returns the number of elements currently stored in the channel's buffer.
296	fn buffer_len(&self) -> usize {
297		unsafe { &*self.buffer.get() }.len()
298	}
299
300	/// Places an item into the internal message buffer. If the buffer is
301	/// already full, the item is returned to the caller through the `Err`
302	/// variant.
303	fn give(&self, item: R::Item) -> Result<(), R::Item> {
304		unsafe { &mut *self.buffer.get() }.enqueue(item)
305	}
306
307	/// Takes the first item out of the message buffer. Returns `None` if the
308	/// buffer is empty.
309	fn take(&self) -> Option<R::Item> {
310		unsafe { &mut *self.buffer.get() }.dequeue()
311	}
312}
313
314/* ****************************************************** OneShot Signal Type */
315
316/// Wrapper for [Subscriber] objects that allow transmission of a single
317/// value as part of the notification.
318struct OneShot<T, N> {
319	/// The value being transmitted.
320	payload: Cell<Option<T>>,
321	/// The object to be notified when the value has been sent or received.
322	notify: N,
323}
324
325impl<T, N> OneShot<T, N> {
326	/// Creates a new signal with the payload either left empty or initialized
327	/// with some value.
328	pub const fn new(notify: N, payload: Option<T>) -> Self {
329		OneShot {
330			payload: Cell::new(payload),
331			notify,
332		}
333	}
334
335	/// Removes the payload and returns it.
336	pub fn take(&self) -> Option<T> {
337		self.payload.replace(None)
338	}
339
340	/// Overwrites the current payload with an item.
341	pub fn give(&self, item: T) {
342		self.payload.set(Some(item));
343	}
344}
345
346impl<T, N: Subscriber> Subscriber for OneShot<T, N> {
347	fn notify(&self) {
348		self.notify.notify();
349	}
350}