ntex_util/channel/
oneshot.rs

1//! A one-shot, futures-aware channel.
2use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll};
3
4use super::{cell::Cell, Canceled};
5use crate::task::LocalWaker;
6
7/// Creates a new futures-aware, one-shot channel.
8pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
9    let inner = Cell::new(Inner {
10        value: None,
11        rx_task: LocalWaker::new(),
12    });
13    let tx = Sender {
14        inner: inner.clone(),
15    };
16    let rx = Receiver { inner };
17    (tx, rx)
18}
19
20/// Represents the completion half of a oneshot through which the result of a
21/// computation is signaled.
22#[derive(Debug)]
23pub struct Sender<T> {
24    inner: Cell<Inner<T>>,
25}
26
27/// A future representing the completion of a computation happening elsewhere in
28/// memory.
29#[derive(Debug)]
30#[must_use = "futures do nothing unless polled"]
31pub struct Receiver<T> {
32    inner: Cell<Inner<T>>,
33}
34
35// The channels do not ever project Pin to the inner T
36impl<T> Unpin for Receiver<T> {}
37impl<T> Unpin for Sender<T> {}
38
39#[derive(Debug)]
40struct Inner<T> {
41    value: Option<T>,
42    rx_task: LocalWaker,
43}
44
45impl<T> Sender<T> {
46    /// Completes this oneshot with a successful result.
47    ///
48    /// This function will consume `self` and indicate to the other end, the
49    /// `Receiver`, that the error provided is the result of the computation this
50    /// represents.
51    ///
52    /// If the value is successfully enqueued for the remote end to receive,
53    /// then `Ok(())` is returned. If the receiving end was dropped before
54    /// this function was called, however, then `Err` is returned with the value
55    /// provided.
56    pub fn send(self, val: T) -> Result<(), T> {
57        if self.inner.strong_count() == 2 {
58            let inner = self.inner.get_mut();
59            inner.value = Some(val);
60            inner.rx_task.wake();
61            Ok(())
62        } else {
63            Err(val)
64        }
65    }
66
67    /// Tests to see whether this `Sender`'s corresponding `Receiver`
68    /// has gone away.
69    pub fn is_canceled(&self) -> bool {
70        self.inner.strong_count() == 1
71    }
72}
73
74impl<T> Drop for Sender<T> {
75    fn drop(&mut self) {
76        self.inner.get_ref().rx_task.wake();
77    }
78}
79
80impl<T> Receiver<T> {
81    /// Wait until the oneshot is ready and return value
82    pub async fn recv(&self) -> Result<T, Canceled> {
83        poll_fn(|cx| self.poll_recv(cx)).await
84    }
85
86    /// Polls the oneshot to determine if value is ready
87    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
88        // If we've got a value, then skip the logic below as we're done.
89        if let Some(val) = self.inner.get_mut().value.take() {
90            return Poll::Ready(Ok(val));
91        }
92
93        // Check if sender is dropped and return error if it is.
94        if self.inner.strong_count() == 1 {
95            Poll::Ready(Err(Canceled))
96        } else {
97            self.inner.get_ref().rx_task.register(cx.waker());
98            Poll::Pending
99        }
100    }
101}
102
103impl<T> Future for Receiver<T> {
104    type Output = Result<T, Canceled>;
105
106    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107        self.poll_recv(cx)
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::future::lazy;
115
116    #[ntex_macros::rt_test2]
117    async fn test_oneshot() {
118        let (tx, rx) = channel();
119        assert!(format!("{tx:?}").contains("Sender"));
120        assert!(format!("{rx:?}").contains("Receiver"));
121
122        tx.send("test").unwrap();
123        assert_eq!(rx.await.unwrap(), "test");
124
125        let (tx, rx) = channel();
126        tx.send("test").unwrap();
127        assert_eq!(rx.recv().await.unwrap(), "test");
128
129        let (tx, rx) = channel();
130        assert!(!tx.is_canceled());
131        drop(rx);
132        assert!(tx.is_canceled());
133        assert!(tx.send("test").is_err());
134
135        let (tx, rx) = channel::<&'static str>();
136        drop(tx);
137        assert!(rx.await.is_err());
138
139        let (tx, mut rx) = channel::<&'static str>();
140        assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
141        tx.send("test").unwrap();
142        assert_eq!(rx.await.unwrap(), "test");
143
144        let (tx, mut rx) = channel::<&'static str>();
145        assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
146        drop(tx);
147        assert!(rx.await.is_err());
148    }
149}