ntex_util/channel/
inplace.rs

1//! A futures-aware bounded(1) channel.
2use std::{cell::Cell, fmt, future::poll_fn, task::Context, task::Poll};
3
4use crate::task::LocalWaker;
5
6/// Creates a new futures-aware, channel.
7pub fn channel<T>() -> Inplace<T> {
8    Inplace {
9        value: Cell::new(None),
10        rx_task: LocalWaker::new(),
11    }
12}
13
14/// A futures-aware bounded(1) channel.
15pub struct Inplace<T> {
16    value: Cell<Option<T>>,
17    rx_task: LocalWaker,
18}
19
20// The channels do not ever project Pin to the inner T
21impl<T> Unpin for Inplace<T> {}
22
23impl<T> fmt::Debug for Inplace<T> {
24    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25        write!(f, "Inplace<T>")
26    }
27}
28
29impl<T> Inplace<T> {
30    /// Set a successful result.
31    ///
32    /// If the value is successfully enqueued for the remote end to receive,
33    /// then `Ok(())` is returned. If previose value is not consumed
34    /// then `Err` is returned with the value provided.
35    pub fn send(&self, val: T) -> Result<(), T> {
36        if let Some(v) = self.value.take() {
37            self.value.set(Some(v));
38            Err(val)
39        } else {
40            self.value.set(Some(val));
41            self.rx_task.wake();
42            Ok(())
43        }
44    }
45
46    /// Wait until the oneshot is ready and return value
47    pub async fn recv(&self) -> T {
48        poll_fn(|cx| self.poll_recv(cx)).await
49    }
50
51    /// Polls the oneshot to determine if value is ready
52    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<T> {
53        // If we've got a value, then skip the logic below as we're done.
54        if let Some(val) = self.value.take() {
55            return Poll::Ready(val);
56        }
57
58        // Check if sender is dropped and return error if it is.
59        self.rx_task.register(cx.waker());
60        Poll::Pending
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::future::lazy;
68
69    #[ntex_macros::rt_test2]
70    async fn test_inplace() {
71        let ch = channel();
72        assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Pending);
73
74        assert!(ch.send(1).is_ok());
75        assert!(ch.send(2) == Err(2));
76        assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Ready(1));
77
78        assert!(ch.send(1).is_ok());
79        assert_eq!(ch.recv().await, 1);
80    }
81}