async_fifo/channel/
oneshot.rs

1//! # One shot SPSC channels
2//! 
3//! Very simple single-use channels with a sync and async API, blocking/non-blocking.
4//! This builds on [`Slot<T>`], so the item has to be boxed.
5//! Most useful for the transfer of return values in the context of remote procedure calling, as "reply pipes".
6//! 
7//! ```rust
8//! # async_fifo::block_on(async {
9//! let (tx, rx) = async_fifo::oneshot();
10//! 
11//! let item = 72u8;
12//! tx.send(Box::new(item));
13//! 
14//! assert_eq!(*rx.await, item);
15//! # });
16//! ```
17
18use core::task::{Waker, Context, Poll};
19use core::future::Future;
20use core::pin::Pin;
21
22use alloc::boxed::Box;
23use alloc::sync::Arc;
24
25use crate::slot::Slot;
26
27struct OneShot<T> {
28    payload: Slot<T>,
29    consumer_waker: Slot<Waker>,
30}
31
32/// The writing side of the oneshot channel
33pub struct Sender<T> {
34    inner: Arc<OneShot<T>>,
35}
36
37/// The reading side of the oneshot channel
38pub struct Receiver<T> {
39    inner: Arc<OneShot<T>>,
40}
41
42/// Create a new oneshot channel, returning a sender/receiver pair
43pub fn new<T>() -> (Sender<T>, Receiver<T>) {
44    let inner = OneShot {
45        payload: Slot::NONE,
46        consumer_waker: Slot::NONE,
47    };
48
49    let inner = Arc::new(inner);
50
51    let sender = Sender {
52        inner: inner.clone(),
53    };
54
55    let receiver = Receiver {
56        inner: inner.clone(),
57    };
58
59    (sender, receiver)
60}
61
62impl<T> Sender<T> {
63    /// Sends a value into this channel, waking up the receiving side.
64    pub fn send(self, item: Box<T>) {
65        self.inner.payload.insert(item);
66        if let Some(waker_box) = self.inner.consumer_waker.try_take(false) {
67            waker_box.wake_by_ref();
68        }
69    }
70}
71
72#[cfg(feature = "blocking")]
73impl<T> Receiver<T> {
74    /// Waits for a value to be received from the channel.
75    ///
76    /// Do not use this in asynchronous code, instead directly use the receiver
77    /// as a future, which will yield the item once it has been received.
78    ///
79    /// This method is only available if you enable the `blocking` feature.
80    pub fn recv_blocking(self) -> T {
81        *crate::block_on(self)
82    }
83}
84
85impl<T> Future for Receiver<T> {
86    type Output = Box<T>;
87
88    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        if let Some(item) = self.inner.payload.try_take(false) {
90            return Poll::Ready(item);
91        }
92
93        let waker = Box::new(cx.waker().clone());
94        self.inner.consumer_waker.insert(waker);
95
96        match self.inner.payload.try_take(false) {
97            Some(item) => Poll::Ready(item),
98            None => Poll::Pending,
99        }
100    }
101}