async_fifo/channel/
oneshot.rs1use core::task::{Waker, Context, Poll};
19use core::future::Future;
20use core::pin::Pin;
21
22use alloc::boxed::Box;
23use alloc::sync::Arc;
24
25use crate::slot::Slot;
26
27struct OneShot<T> {
28 payload: Slot<T>,
29 consumer_waker: Slot<Waker>,
30}
31
32pub struct Sender<T> {
34 inner: Arc<OneShot<T>>,
35}
36
37pub struct Receiver<T> {
39 inner: Arc<OneShot<T>>,
40}
41
42pub fn new<T>() -> (Sender<T>, Receiver<T>) {
44 let inner = OneShot {
45 payload: Slot::NONE,
46 consumer_waker: Slot::NONE,
47 };
48
49 let inner = Arc::new(inner);
50
51 let sender = Sender {
52 inner: inner.clone(),
53 };
54
55 let receiver = Receiver {
56 inner: inner.clone(),
57 };
58
59 (sender, receiver)
60}
61
62impl<T> Sender<T> {
63 pub fn send(self, item: Box<T>) {
65 self.inner.payload.insert(item);
66 if let Some(waker_box) = self.inner.consumer_waker.try_take(false) {
67 waker_box.wake_by_ref();
68 }
69 }
70}
71
72#[cfg(feature = "blocking")]
73impl<T> Receiver<T> {
74 pub fn recv_blocking(self) -> T {
81 *crate::block_on(self)
82 }
83}
84
85impl<T> Future for Receiver<T> {
86 type Output = Box<T>;
87
88 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 if let Some(item) = self.inner.payload.try_take(false) {
90 return Poll::Ready(item);
91 }
92
93 let waker = Box::new(cx.waker().clone());
94 self.inner.consumer_waker.insert(waker);
95
96 match self.inner.payload.try_take(false) {
97 Some(item) => Poll::Ready(item),
98 None => Poll::Pending,
99 }
100 }
101}