ntex_util/channel/
oneshot.rs1use std::{future::poll_fn, future::Future, pin::Pin, task::Context, task::Poll};
3
4use super::{cell::Cell, Canceled};
5use crate::task::LocalWaker;
6
7pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
9 let inner = Cell::new(Inner {
10 value: None,
11 rx_task: LocalWaker::new(),
12 });
13 let tx = Sender {
14 inner: inner.clone(),
15 };
16 let rx = Receiver { inner };
17 (tx, rx)
18}
19
20#[derive(Debug)]
23pub struct Sender<T> {
24 inner: Cell<Inner<T>>,
25}
26
27#[derive(Debug)]
30#[must_use = "futures do nothing unless polled"]
31pub struct Receiver<T> {
32 inner: Cell<Inner<T>>,
33}
34
35impl<T> Unpin for Receiver<T> {}
37impl<T> Unpin for Sender<T> {}
38
39#[derive(Debug)]
40struct Inner<T> {
41 value: Option<T>,
42 rx_task: LocalWaker,
43}
44
45impl<T> Sender<T> {
46 pub fn send(self, val: T) -> Result<(), T> {
57 if self.inner.strong_count() == 2 {
58 let inner = self.inner.get_mut();
59 inner.value = Some(val);
60 inner.rx_task.wake();
61 Ok(())
62 } else {
63 Err(val)
64 }
65 }
66
67 pub fn is_canceled(&self) -> bool {
70 self.inner.strong_count() == 1
71 }
72}
73
74impl<T> Drop for Sender<T> {
75 fn drop(&mut self) {
76 self.inner.get_ref().rx_task.wake();
77 }
78}
79
80impl<T> Receiver<T> {
81 pub async fn recv(&self) -> Result<T, Canceled> {
83 poll_fn(|cx| self.poll_recv(cx)).await
84 }
85
86 pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
88 if let Some(val) = self.inner.get_mut().value.take() {
90 return Poll::Ready(Ok(val));
91 }
92
93 if self.inner.strong_count() == 1 {
95 Poll::Ready(Err(Canceled))
96 } else {
97 self.inner.get_ref().rx_task.register(cx.waker());
98 Poll::Pending
99 }
100 }
101}
102
103impl<T> Future for Receiver<T> {
104 type Output = Result<T, Canceled>;
105
106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107 self.poll_recv(cx)
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::future::lazy;
115
116 #[ntex_macros::rt_test2]
117 async fn test_oneshot() {
118 let (tx, rx) = channel();
119 assert!(format!("{tx:?}").contains("Sender"));
120 assert!(format!("{rx:?}").contains("Receiver"));
121
122 tx.send("test").unwrap();
123 assert_eq!(rx.await.unwrap(), "test");
124
125 let (tx, rx) = channel();
126 tx.send("test").unwrap();
127 assert_eq!(rx.recv().await.unwrap(), "test");
128
129 let (tx, rx) = channel();
130 assert!(!tx.is_canceled());
131 drop(rx);
132 assert!(tx.is_canceled());
133 assert!(tx.send("test").is_err());
134
135 let (tx, rx) = channel::<&'static str>();
136 drop(tx);
137 assert!(rx.await.is_err());
138
139 let (tx, mut rx) = channel::<&'static str>();
140 assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
141 tx.send("test").unwrap();
142 assert_eq!(rx.await.unwrap(), "test");
143
144 let (tx, mut rx) = channel::<&'static str>();
145 assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
146 drop(tx);
147 assert!(rx.await.is_err());
148 }
149}