odem_rs_sync/channel/shared.rs
1//! This module provides dynamically allocated, reference-counted channels for
2//! exchanging values between concurrent processes in a discrete-event
3//! simulation.
4//!
5//! Unlike stack-bound channels, these channels use [`Rc`] for shared ownership,
6//! allowing senders and receivers to be freely cloned and distributed.
7//!
8//! ## Components
9//!
10//! - **[`channel`]**: Creates a new channel, returning a [`Sender`] and a
11//! [`Receiver`] pair. Messages are delivered in the order they were sent,
12//! and the channel closes automatically when both ends are dropped.
13//! - **[`Sender`]**: The sending half of the channel, which can be cloned.
14//! Messages sent through it become available to receivers.
15//! - **[`Receiver`]**: The receiving half of the channel, which can be cloned.
16//! Receives messages sent by any sender.
17//!
18//! ## Features
19//!
20//! - **Dynamically Allocated**: Uses [`Rc`] and [`Weak`] for reference-counted
21//! channel management.
22//! - **Non-blocking**: Sending and receiving operations return futures that
23//! can be awaited asynchronously within a simulation.
24//! - **Reference Counting**: The channel closes automatically when all senders
25//! and receivers are dropped.
26
27use core::{
28 future::Future,
29 pin::Pin,
30 task::{Context, Poll},
31};
32
33use alloc::rc::{Rc, Weak};
34
35use super::{
36 Inner, RingBuf, SendError, TryRecvError, TrySendError,
37 pinned::{SendFutureState, SendFutureStateProject},
38};
39
40#[doc(inline)]
41pub use super::pinned::RecvFuture;
42
43/* **************************************************** Free Channel Function */
44
45/// Creates a new channel, returning the sender/receiver halves.
46///
47/// All data sent on the [`Sender`] half will become available on the
48/// [`Receiver`] in the same order as it was sent. If the underlying [`RingBuf`]
49/// is dynamically sized, no [`send`] will block. The [`recv`]-method will block
50/// until a message is available while there is at least one `Sender` alive.
51///
52/// Both `Sender` and `Receiver` may be cloned, allowing to receive and send
53/// messages over the same channel from different parts of the simulation.
54///
55/// The underlying channel is closed automatically when the last `Sender`
56/// or `Receiver` drops.
57///
58/// ## Examples
59///
60/// An ordinary channel transferring items between two jobs.
61///
62/// ```
63/// # use core::pin::pin;
64/// # use odem_rs_core::simulator::{Sim, simulation};
65/// # use odem_rs_sync::{channel::{channel, Rendezvous}, fork::ForkExt};
66/// #[derive(Debug)]
67/// struct Message(usize);
68///
69/// async fn sim_main(sim: &Sim) {
70/// let (sx, rx) = channel(Rendezvous::new());
71///
72/// sim.fork(async move {
73/// sx.send(Message(42)).await.unwrap();
74/// })
75/// .and(async move {
76/// let msg = rx.recv().await.unwrap();
77/// println!("Received {:?}", msg);
78/// })
79/// .await;
80/// }
81/// # fn main() { simulation(sim_main).ok(); }
82/// ```
83///
84/// Sending references to items is allowed, as long as they outlive the channel.
85///
86/// ```
87/// # use core::pin::pin;
88/// # use odem_rs_core::simulator::{Sim, simulation};
89/// # use odem_rs_sync::{channel::{channel, Rendezvous}, fork::ForkExt};
90/// async fn sim_main(sim: &Sim) {
91/// let item = 3;
92/// let (sx, rx) = channel(Rendezvous::new());
93///
94/// sim.fork(async {
95/// sx.send(&item).await.unwrap();
96/// })
97/// .and(async {
98/// let msg = rx.recv().await.unwrap();
99/// println!("Received {:?}", msg);
100/// })
101/// .await;
102/// }
103/// # fn main() { simulation(sim_main).ok(); }
104/// ```
105///
106/// [`send`]: Sender::send
107/// [`recv`]: Receiver::recv
108pub fn channel<R: RingBuf>(buffer: R) -> (Sender<R::Item>, Receiver<R::Item>) {
109 use alloc::rc::Rc;
110 let channel = Rc::new(Inner::new(buffer));
111
112 // SAFETY: Any lifetime bound on `R` can only result from a bound on
113 // `R::Item`, since `Rc<Channel<R>>` safely erases the bound on the
114 // underlying ring buffer itself. The bound on `R::Item` is still
115 // retained within the `Sender`/`Receiver` pairs generic argument,
116 // making it subject to the borrow checker.
117 // Therefore, erasing the lifetime here is safe.
118 let channel = unsafe {
119 core::mem::transmute::<
120 Rc<Inner<R::Item, dyn RingBuf<Item = R::Item> + '_>>,
121 Rc<Inner<R::Item, dyn RingBuf<Item = R::Item> + 'static>>,
122 >(channel)
123 };
124
125 (Sender(Rc::downgrade(&channel)), Receiver(channel))
126}
127
128/* ***************************************************** Sender/Receiver Pair */
129
130/// The sending half of the channel. This half can be cloned and is
131/// bound to the hosting channel through the reference counted [`Rc`].
132pub struct Sender<T>(pub(super) Weak<Inner<T, dyn RingBuf<Item = T>>>);
133
134impl<T> Sender<T> {
135 /// Attempt to send the value without waiting.
136 ///
137 /// This can fail for two reasons:
138 /// 1. The channel is closed, i.e. [`close`] has been called.
139 /// 2. The message buffer is full.
140 ///
141 /// The `Err`-result contains the initially provided item in either case.
142 ///
143 /// [`close`]: Self::close
144 pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
145 match self.0.upgrade() {
146 Some(inner) => inner.try_send(item),
147 _ => Err(TrySendError::Disconnected(item)),
148 }
149 }
150
151 /// Returns a future that gets fulfilled when the value has been written to
152 /// the channel. If the channel gets closed while the sending is in
153 /// progress, sending the value will fail, and the future will deliver the
154 /// value back.
155 pub fn send(&self, item: T) -> SendFuture<'_, T> {
156 SendFuture::new(self, item)
157 }
158
159 /// Returns the number of elements currently inside the channel of this
160 /// sender.
161 pub fn count(&self) -> isize {
162 self.0.upgrade().map_or(0, |inner| inner.count())
163 }
164
165 /// Closes the channel from the sending side. Already sent values may
166 /// still be received.
167 pub fn close(&self) {
168 if let Some(inner) = self.0.upgrade() {
169 inner.close();
170 }
171 }
172}
173
174impl<T> Clone for Sender<T> {
175 fn clone(&self) -> Self {
176 Sender(self.0.clone())
177 }
178}
179
180impl<T> Drop for Sender<T> {
181 fn drop(&mut self) {
182 if Weak::weak_count(&self.0) == 1 {
183 if let Some(inner) = self.0.upgrade() {
184 inner.close();
185 }
186 }
187 }
188}
189
190/// The receiving half of the channel. This half can be cloned and is
191/// bound to the hosting channel through the reference counted [`Rc`].
192pub struct Receiver<T>(pub(super) Rc<Inner<T, dyn RingBuf<Item = T>>>);
193
194impl<T> Receiver<T> {
195 /// Attempts to receive a value of the channel without waiting.
196 ///
197 /// This can fail for two reasons:
198 /// 1. The channel is closed, i.e. [`close`] has been called.
199 /// 2. The message buffer is empty.
200 ///
201 /// [`close`]: Self::close
202 pub fn try_recv(&self) -> Result<T, TryRecvError> {
203 self.0.try_recv()
204 }
205
206 /// Returns a future that gets fulfilled when a value is written to the
207 /// channel. If the channels get closed, the future will resolve to an
208 /// `Err` variant.
209 pub fn recv(&self) -> RecvFuture<'_, dyn RingBuf<Item = T>> {
210 RecvFuture::new(&self.0)
211 }
212
213 /// Returns the number of elements currently inside the channel of this
214 /// receiver.
215 pub fn count(&self) -> isize {
216 self.0.count()
217 }
218
219 /// Closes the channel from the receiving side. Already sent values may
220 /// still be received.
221 pub fn close(&self) {
222 self.0.close();
223 }
224}
225
226impl<T> Clone for Receiver<T> {
227 fn clone(&self) -> Self {
228 Receiver(self.0.clone())
229 }
230}
231
232impl<T> Drop for Receiver<T> {
233 fn drop(&mut self) {
234 // notify all the senders of the channel's imminent closing
235 if Rc::strong_count(&self.0) == 1 {
236 self.close();
237 }
238 }
239}
240
241/* *************************************************** Specialized SendFuture */
242
243/// Future for the [send](Sender::send) operation of the shared channel.
244///
245/// We cannot re-use the SendFuture of the channel because our sender only
246/// contains a weak reference to the channel.
247#[pin_project::pin_project(PinnedDrop)]
248#[must_use = "futures do nothing unless you `.await` or poll them"]
249pub struct SendFuture<'r, T> {
250 /// The sender used to send the message.
251 chan: &'r Sender<T>,
252 /// Either the message or the link into the pending chain.
253 #[pin]
254 state: SendFutureState<T>,
255}
256
257impl<'r, T> SendFuture<'r, T> {
258 /// Creates a [send](Sender::send)-future and primes it with a message.
259 ///
260 /// The send-operation is attempted once this future is resolved.
261 const fn new(chan: &'r Sender<T>, msg: T) -> Self {
262 SendFuture {
263 chan,
264 state: SendFutureState::Primed(Some(msg)),
265 }
266 }
267}
268
269impl<T> Future for SendFuture<'_, T> {
270 type Output = Result<(), SendError<T>>;
271
272 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273 let mut inner = self.project();
274
275 match inner.state.as_mut().project() {
276 SendFutureStateProject::Primed(msg) => {
277 // extract the message
278 let msg = msg.take().unwrap();
279
280 // attempt to get an owning reference to the channel
281 match inner.chan.0.upgrade() {
282 Some(chan) => {
283 // attempt to send the message
284 match chan.try_send(msg) {
285 Err(TrySendError::Full(msg)) => {
286 // if the message queue is already full,
287 // create a link and subscribe
288 unsafe {
289 inner.state.link(&chan.chain, cx.waker().clone(), msg);
290 }
291
292 // increase the number of pending senders
293 chan.add_pending_sender();
294
295 // defer
296 Poll::Pending
297 }
298 Err(TrySendError::Disconnected(msg)) => {
299 // the channel has been closed
300 // return an error
301 Poll::Ready(Err(SendError(msg)))
302 }
303 Ok(()) => {
304 // the message was sent successfully
305 // return with success
306 Poll::Ready(Ok(()))
307 }
308 }
309 }
310 _ => {
311 // failed: the channel is closed and dropped
312 // return with an error
313 Poll::Ready(Err(SendError(msg)))
314 }
315 }
316 }
317 SendFutureStateProject::Linked(link) => {
318 // this future has been reactivated after subscribing
319 // to the pending chain
320
321 // either the message has been received (success)
322 // or the channel has been closed (failure)
323 Poll::Ready(match link.note.take() {
324 Some(msg) => Err(SendError(msg)),
325 _ => Ok(()),
326 })
327 }
328 }
329 }
330}
331
332#[pin_project::pinned_drop]
333impl<T> PinnedDrop for SendFuture<'_, T> {
334 fn drop(self: Pin<&mut Self>) {
335 // make sure to unlink the soon-to-be-dangling link if the future has
336 // been dropped after subscribing but before reactivation
337 if let Some(chan) = self.chan.0.upgrade() {
338 if unsafe { self.as_ref().project_ref().state.unlink(&chan.chain) } {
339 chan.sub_pending_sender();
340 }
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::{
349 channel::{ArrayBuf, Rendezvous},
350 fork::ForkExt as _,
351 };
352 use odem_rs_core::simulator::{Sim, Simulator};
353 use proptest::prelude::*;
354
355 async fn test_insertion_order(
356 sim: &Sim,
357 buf: impl RingBuf<Item = usize> + 'static,
358 elems: usize,
359 ) {
360 let (sx, rx) = channel(buf);
361
362 sim.fork(async move {
363 for i in 0..elems {
364 sx.send(i).await.unwrap();
365 sim.advance(1.0).await;
366 }
367 })
368 .and(async move {
369 sim.advance(10.0).await;
370 for exp in 0..elems {
371 let val = rx.recv().await.unwrap();
372 assert_eq!(val, exp);
373 }
374 })
375 .await;
376 }
377
378 proptest! {
379 #[test]
380 #[cfg_attr(miri, ignore)]
381 fn rendezvous_channels_respect_insertion_order(elems in 1..100usize) {
382 Simulator::default()
383 .run(async |sim| {
384 test_insertion_order(sim, Rendezvous::default(), elems).await;
385 })
386 .expect("unexpected deadlock");
387 }
388
389 #[test]
390 #[cfg_attr(miri, ignore)]
391 fn array_channels_respect_insertion_order(elems in 1..100usize) {
392 Simulator::default()
393 .run(async |sim| {
394 test_insertion_order(sim, ArrayBuf::<2,_>::new(), elems).await;
395 })
396 .expect("unexpected deadlock");
397 }
398
399 #[test]
400 #[cfg_attr(miri, ignore)]
401 fn deque_channels_respect_insertion_order(elems in 1..100usize) {
402 use alloc::collections::VecDeque;
403 Simulator::default()
404 .run(async |sim| {
405 test_insertion_order(sim, VecDeque::new(), elems).await;
406 })
407 .expect("unexpected deadlock");
408 }
409 }
410}