odem_rs_sync/channel/
shared.rs

1//! This module provides dynamically allocated, reference-counted channels for
2//! exchanging values between concurrent processes in a discrete-event
3//! simulation.
4//!
5//! Unlike stack-bound channels, these channels use [`Rc`] for shared ownership,
6//! allowing senders and receivers to be freely cloned and distributed.
7//!
8//! ## Components
9//!
10//! - **[`channel`]**: Creates a new channel, returning a [`Sender`] and a
11//!   [`Receiver`] pair. Messages are delivered in the order they were sent,
12//!   and the channel closes automatically when both ends are dropped.
13//! - **[`Sender`]**: The sending half of the channel, which can be cloned.
14//!   Messages sent through it become available to receivers.
15//! - **[`Receiver`]**: The receiving half of the channel, which can be cloned.
16//!   Receives messages sent by any sender.
17//!
18//! ## Features
19//!
20//! - **Dynamically Allocated**: Uses [`Rc`] and [`Weak`] for reference-counted
21//!   channel management.
22//! - **Non-blocking**: Sending and receiving operations return futures that
23//!   can be awaited asynchronously within a simulation.
24//! - **Reference Counting**: The channel closes automatically when all senders
25//!   and receivers are dropped.
26
27use core::{
28	future::Future,
29	pin::Pin,
30	task::{Context, Poll},
31};
32
33use alloc::rc::{Rc, Weak};
34
35use super::{
36	Inner, RingBuf, SendError, TryRecvError, TrySendError,
37	pinned::{SendFutureState, SendFutureStateProject},
38};
39
40#[doc(inline)]
41pub use super::pinned::RecvFuture;
42
43/* **************************************************** Free Channel Function */
44
45/// Creates a new channel, returning the sender/receiver halves.
46///
47/// All data sent on the [`Sender`] half will become available on the
48/// [`Receiver`] in the same order as it was sent. If the underlying [`RingBuf`]
49/// is dynamically sized, no [`send`] will block. The [`recv`]-method will block
50/// until a message is available while there is at least one `Sender` alive.
51///
52/// Both `Sender` and `Receiver` may be cloned, allowing to receive and send
53/// messages over the same channel from different parts of the simulation.
54///
55/// The underlying channel is closed automatically when the last `Sender`
56/// or `Receiver` drops.
57///
58/// ## Examples
59///
60/// An ordinary channel transferring items between two jobs.
61///
62/// ```
63/// # use core::pin::pin;
64/// # use odem_rs_core::simulator::{Sim, simulation};
65/// # use odem_rs_sync::{channel::{channel, Rendezvous}, fork::ForkExt};
66/// #[derive(Debug)]
67/// struct Message(usize);
68///
69/// async fn sim_main(sim: &Sim) {
70///     let (sx, rx) = channel(Rendezvous::new());
71///
72///     sim.fork(async move {
73///         sx.send(Message(42)).await.unwrap();
74///     })
75///     .and(async move {
76///         let msg = rx.recv().await.unwrap();
77///         println!("Received {:?}", msg);
78///     })
79///     .await;
80/// }
81/// # fn main() { simulation(sim_main).ok(); }
82/// ```
83///
84/// Sending references to items is allowed, as long as they outlive the channel.
85///
86/// ```
87/// # use core::pin::pin;
88/// # use odem_rs_core::simulator::{Sim, simulation};
89/// # use odem_rs_sync::{channel::{channel, Rendezvous}, fork::ForkExt};
90/// async fn sim_main(sim: &Sim) {
91///     let item = 3;
92///     let (sx, rx) = channel(Rendezvous::new());
93///
94///     sim.fork(async {
95///         sx.send(&item).await.unwrap();
96///     })
97///     .and(async {
98///         let msg = rx.recv().await.unwrap();
99///         println!("Received {:?}", msg);
100///     })
101///     .await;
102/// }
103/// # fn main() { simulation(sim_main).ok(); }
104/// ```
105///
106/// [`send`]: Sender::send
107/// [`recv`]: Receiver::recv
108pub fn channel<R: RingBuf>(buffer: R) -> (Sender<R::Item>, Receiver<R::Item>) {
109	use alloc::rc::Rc;
110	let channel = Rc::new(Inner::new(buffer));
111
112	// SAFETY: Any lifetime bound on `R` can only result from a bound on
113	// `R::Item`, since `Rc<Channel<R>>` safely erases the bound on the
114	// underlying ring buffer itself. The bound on `R::Item` is still
115	// retained within the `Sender`/`Receiver` pairs generic argument,
116	// making it subject to the borrow checker.
117	// Therefore, erasing the lifetime here is safe.
118	let channel = unsafe {
119		core::mem::transmute::<
120			Rc<Inner<R::Item, dyn RingBuf<Item = R::Item> + '_>>,
121			Rc<Inner<R::Item, dyn RingBuf<Item = R::Item> + 'static>>,
122		>(channel)
123	};
124
125	(Sender(Rc::downgrade(&channel)), Receiver(channel))
126}
127
128/* ***************************************************** Sender/Receiver Pair */
129
130/// The sending half of the channel. This half can be cloned and is
131/// bound to the hosting channel through the reference counted [`Rc`].
132pub struct Sender<T>(pub(super) Weak<Inner<T, dyn RingBuf<Item = T>>>);
133
134impl<T> Sender<T> {
135	/// Attempt to send the value without waiting.
136	///
137	/// This can fail for two reasons:
138	/// 1. The channel is closed, i.e. [`close`] has been called.
139	/// 2. The message buffer is full.
140	///
141	/// The `Err`-result contains the initially provided item in either case.
142	///
143	/// [`close`]: Self::close
144	pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
145		match self.0.upgrade() {
146			Some(inner) => inner.try_send(item),
147			_ => Err(TrySendError::Disconnected(item)),
148		}
149	}
150
151	/// Returns a future that gets fulfilled when the value has been written to
152	/// the channel. If the channel gets closed while the sending is in
153	/// progress, sending the value will fail, and the future will deliver the
154	/// value back.
155	pub fn send(&self, item: T) -> SendFuture<'_, T> {
156		SendFuture::new(self, item)
157	}
158
159	/// Returns the number of elements currently inside the channel of this
160	/// sender.
161	pub fn count(&self) -> isize {
162		self.0.upgrade().map_or(0, |inner| inner.count())
163	}
164
165	/// Closes the channel from the sending side. Already sent values may
166	/// still be received.
167	pub fn close(&self) {
168		if let Some(inner) = self.0.upgrade() {
169			inner.close();
170		}
171	}
172}
173
174impl<T> Clone for Sender<T> {
175	fn clone(&self) -> Self {
176		Sender(self.0.clone())
177	}
178}
179
180impl<T> Drop for Sender<T> {
181	fn drop(&mut self) {
182		if Weak::weak_count(&self.0) == 1 {
183			if let Some(inner) = self.0.upgrade() {
184				inner.close();
185			}
186		}
187	}
188}
189
190/// The receiving half of the channel. This half can be cloned and is
191/// bound to the hosting channel through the reference counted [`Rc`].
192pub struct Receiver<T>(pub(super) Rc<Inner<T, dyn RingBuf<Item = T>>>);
193
194impl<T> Receiver<T> {
195	/// Attempts to receive a value of the channel without waiting.
196	///
197	/// This can fail for two reasons:
198	/// 1. The channel is closed, i.e. [`close`] has been called.
199	/// 2. The message buffer is empty.
200	///
201	/// [`close`]: Self::close
202	pub fn try_recv(&self) -> Result<T, TryRecvError> {
203		self.0.try_recv()
204	}
205
206	/// Returns a future that gets fulfilled when a value is written to the
207	/// channel. If the channels get closed, the future will resolve to an
208	/// `Err` variant.
209	pub fn recv(&self) -> RecvFuture<'_, dyn RingBuf<Item = T>> {
210		RecvFuture::new(&self.0)
211	}
212
213	/// Returns the number of elements currently inside the channel of this
214	/// receiver.
215	pub fn count(&self) -> isize {
216		self.0.count()
217	}
218
219	/// Closes the channel from the receiving side. Already sent values may
220	/// still be received.
221	pub fn close(&self) {
222		self.0.close();
223	}
224}
225
226impl<T> Clone for Receiver<T> {
227	fn clone(&self) -> Self {
228		Receiver(self.0.clone())
229	}
230}
231
232impl<T> Drop for Receiver<T> {
233	fn drop(&mut self) {
234		// notify all the senders of the channel's imminent closing
235		if Rc::strong_count(&self.0) == 1 {
236			self.close();
237		}
238	}
239}
240
241/* *************************************************** Specialized SendFuture */
242
243/// Future for the [send](Sender::send) operation of the shared channel.
244///
245/// We cannot re-use the SendFuture of the channel because our sender only
246/// contains a weak reference to the channel.
247#[pin_project::pin_project(PinnedDrop)]
248#[must_use = "futures do nothing unless you `.await` or poll them"]
249pub struct SendFuture<'r, T> {
250	/// The sender used to send the message.
251	chan: &'r Sender<T>,
252	/// Either the message or the link into the pending chain.
253	#[pin]
254	state: SendFutureState<T>,
255}
256
257impl<'r, T> SendFuture<'r, T> {
258	/// Creates a [send](Sender::send)-future and primes it with a message.
259	///
260	/// The send-operation is attempted once this future is resolved.
261	const fn new(chan: &'r Sender<T>, msg: T) -> Self {
262		SendFuture {
263			chan,
264			state: SendFutureState::Primed(Some(msg)),
265		}
266	}
267}
268
269impl<T> Future for SendFuture<'_, T> {
270	type Output = Result<(), SendError<T>>;
271
272	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273		let mut inner = self.project();
274
275		match inner.state.as_mut().project() {
276			SendFutureStateProject::Primed(msg) => {
277				// extract the message
278				let msg = msg.take().unwrap();
279
280				// attempt to get an owning reference to the channel
281				match inner.chan.0.upgrade() {
282					Some(chan) => {
283						// attempt to send the message
284						match chan.try_send(msg) {
285							Err(TrySendError::Full(msg)) => {
286								// if the message queue is already full,
287								// create a link and subscribe
288								unsafe {
289									inner.state.link(&chan.chain, cx.waker().clone(), msg);
290								}
291
292								// increase the number of pending senders
293								chan.add_pending_sender();
294
295								// defer
296								Poll::Pending
297							}
298							Err(TrySendError::Disconnected(msg)) => {
299								// the channel has been closed
300								// return an error
301								Poll::Ready(Err(SendError(msg)))
302							}
303							Ok(()) => {
304								// the message was sent successfully
305								// return with success
306								Poll::Ready(Ok(()))
307							}
308						}
309					}
310					_ => {
311						// failed: the channel is closed and dropped
312						// return with an error
313						Poll::Ready(Err(SendError(msg)))
314					}
315				}
316			}
317			SendFutureStateProject::Linked(link) => {
318				// this future has been reactivated after subscribing
319				// to the pending chain
320
321				// either the message has been received (success)
322				// or the channel has been closed (failure)
323				Poll::Ready(match link.note.take() {
324					Some(msg) => Err(SendError(msg)),
325					_ => Ok(()),
326				})
327			}
328		}
329	}
330}
331
332#[pin_project::pinned_drop]
333impl<T> PinnedDrop for SendFuture<'_, T> {
334	fn drop(self: Pin<&mut Self>) {
335		// make sure to unlink the soon-to-be-dangling link if the future has
336		// been dropped after subscribing but before reactivation
337		if let Some(chan) = self.chan.0.upgrade() {
338			if unsafe { self.as_ref().project_ref().state.unlink(&chan.chain) } {
339				chan.sub_pending_sender();
340			}
341		}
342	}
343}
344
345#[cfg(test)]
346mod tests {
347	use super::*;
348	use crate::{
349		channel::{ArrayBuf, Rendezvous},
350		fork::ForkExt as _,
351	};
352	use odem_rs_core::simulator::{Sim, Simulator};
353	use proptest::prelude::*;
354
355	async fn test_insertion_order(
356		sim: &Sim,
357		buf: impl RingBuf<Item = usize> + 'static,
358		elems: usize,
359	) {
360		let (sx, rx) = channel(buf);
361
362		sim.fork(async move {
363			for i in 0..elems {
364				sx.send(i).await.unwrap();
365				sim.advance(1.0).await;
366			}
367		})
368		.and(async move {
369			sim.advance(10.0).await;
370			for exp in 0..elems {
371				let val = rx.recv().await.unwrap();
372				assert_eq!(val, exp);
373			}
374		})
375		.await;
376	}
377
378	proptest! {
379		#[test]
380		#[cfg_attr(miri, ignore)]
381		fn rendezvous_channels_respect_insertion_order(elems in 1..100usize) {
382			Simulator::default()
383				.run(async |sim| {
384					test_insertion_order(sim, Rendezvous::default(), elems).await;
385				})
386				.expect("unexpected deadlock");
387		}
388
389		#[test]
390		#[cfg_attr(miri, ignore)]
391		fn array_channels_respect_insertion_order(elems in 1..100usize) {
392			Simulator::default()
393				.run(async |sim| {
394					test_insertion_order(sim, ArrayBuf::<2,_>::new(), elems).await;
395				})
396				.expect("unexpected deadlock");
397		}
398
399		#[test]
400		#[cfg_attr(miri, ignore)]
401		fn deque_channels_respect_insertion_order(elems in 1..100usize) {
402			use alloc::collections::VecDeque;
403			Simulator::default()
404				.run(async |sim| {
405					test_insertion_order(sim, VecDeque::new(), elems).await;
406				})
407				.expect("unexpected deadlock");
408		}
409	}
410}