tarantool/fiber/async/
oneshot.rs1use super::RecvError;
26use std::{
27 cell::Cell,
28 fmt::Debug,
29 future::Future,
30 pin::Pin,
31 rc::{Rc, Weak},
32 task::{Context, Poll, Waker},
33};
34
35#[derive(Debug)]
36enum State<T> {
37 Pending(Option<Waker>),
38 Ready(T),
39}
40
41impl<T> Default for State<T> {
42 fn default() -> Self {
43 Self::Pending(None)
44 }
45}
46
47#[must_use = "futures do nothing unless you `.await` or poll them"]
58pub struct Receiver<T>(Rc<Cell<State<T>>>);
59
60pub struct Sender<T>(Weak<Cell<State<T>>>);
68
69impl<T> Receiver<T> {
70 #[inline]
74 pub fn is_closed(&self) -> bool {
75 Rc::weak_count(&self.0) == 0
76 }
77}
78
79impl<T> Future for Receiver<T> {
80 type Output = Result<T, RecvError>;
81
82 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 let cell = &self.0;
84 match cell.take() {
85 State::Pending(mut waker) if !self.is_closed() => {
86 waker.get_or_insert_with(|| cx.waker().clone());
87 cell.set(State::Pending(waker));
88 Poll::Pending
89 }
90 State::Pending(_) => Poll::Ready(Err(RecvError)),
91 State::Ready(t) => Poll::Ready(Ok(t)),
92 }
93 }
94}
95
96impl<T> Debug for Receiver<T> {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("Receiver").finish_non_exhaustive()
99 }
100}
101
102impl<T> Sender<T> {
103 pub fn send(self, value: T) -> Result<(), T> {
120 let cell = if let Some(cell) = self.0.upgrade() {
121 cell
122 } else {
123 return Err(value);
124 };
125
126 if let State::Pending(Some(waker)) = cell.take() {
127 waker.wake()
128 }
129
130 cell.set(State::Ready(value));
131 Ok(())
132 }
133
134 pub fn is_closed(&self) -> bool {
141 self.0.strong_count() == 0
142 }
143}
144
145impl<T> Debug for Sender<T> {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct("Sender").finish_non_exhaustive()
148 }
149}
150
151impl<T> Drop for Sender<T> {
152 fn drop(&mut self) {
153 let cell = if let Some(cell) = self.0.upgrade() {
154 cell
155 } else {
156 return;
157 };
158 match cell.take() {
159 ready @ State::Ready(_) => cell.set(ready),
160 State::Pending(Some(waker)) => waker.wake(),
161 State::Pending(None) => (),
162 }
163 }
164}
165
166pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
177 let cell = Cell::new(State::default());
178 let strong = Rc::from(cell);
179 let weak = Rc::downgrade(&strong);
180 (Sender(weak), Receiver(strong))
181}
182
183#[cfg(feature = "internal_test")]
184mod tests {
185 use super::*;
186 use crate::fiber;
187 use futures::join;
188 use std::time::Duration;
189
190 #[crate::test(tarantool = "crate")]
191 fn drop_receiver() {
192 let (tx, rx) = channel::<i32>();
193 assert!(!tx.is_closed());
194 drop(rx);
195 assert!(tx.is_closed());
196 assert_eq!(tx.send(0).unwrap_err(), 0);
197 }
198
199 #[crate::test(tarantool = "crate")]
200 fn drop_sender() {
201 let (tx, rx) = channel::<i32>();
202 assert!(!rx.is_closed());
203 drop(tx);
204 assert!(rx.is_closed());
205 assert_eq!(fiber::block_on(rx).unwrap_err(), RecvError);
206 }
207
208 #[crate::test(tarantool = "crate")]
209 fn receive_non_blocking() {
210 let (tx, rx) = channel::<i32>();
211 tx.send(56).unwrap();
212 assert_eq!(fiber::block_on(rx), Ok(56));
213 }
214
215 #[crate::test(tarantool = "crate")]
216 fn receive_non_blocking_after_dropping_sender() {
217 let (tx, rx) = channel::<i32>();
218 drop(tx);
219 assert_eq!(fiber::block_on(rx), Err(RecvError));
220 }
221
222 #[crate::test(tarantool = "crate")]
223 fn receive_blocking_before_sending() {
224 let (tx, rx) = channel::<i32>();
225 let jh = fiber::start_async(rx);
226 tx.send(39).unwrap();
227 assert_eq!(jh.join(), Ok(39));
228 }
229
230 #[crate::test(tarantool = "crate")]
231 fn receive_blocking_before_dropping_sender() {
232 let (tx, rx) = channel::<i32>();
233 let jh = fiber::start_async(rx);
234 drop(tx);
235 assert_eq!(jh.join(), Err(RecvError));
236 }
237
238 #[crate::test(tarantool = "crate")]
239 fn join_two_after_sending() {
240 let f = async {
241 let (tx1, rx1) = channel::<i32>();
242 let (tx2, rx2) = channel::<i32>();
243
244 tx1.send(101).unwrap();
245 tx2.send(102).unwrap();
246 join!(rx1, rx2)
247 };
248 assert_eq!(fiber::block_on(f), (Ok(101), Ok(102)));
249 }
250
251 #[crate::test(tarantool = "crate")]
252 fn join_two_before_sending() {
253 let c = fiber::Cond::new();
254 drop(c);
255
256 let (tx1, rx1) = channel::<i32>();
257 let (tx2, rx2) = channel::<i32>();
258
259 let jh = fiber::start_async(async { join!(rx1, rx2) });
260
261 tx1.send(201).unwrap();
262 fiber::sleep(Duration::ZERO);
263 tx2.send(202).unwrap();
264 assert_eq!(jh.join(), (Ok(201), Ok(202)));
265 }
266
267 #[crate::test(tarantool = "crate")]
268 fn join_two_drop_one() {
269 let (tx1, rx1) = channel::<i32>();
270 let (tx2, rx2) = channel::<i32>();
271
272 let jh = fiber::start_async(async { join!(rx1, rx2) });
273 tx1.send(301).unwrap();
274 fiber::sleep(Duration::ZERO);
275 drop(tx2);
276 assert_eq!(jh.join(), (Ok(301), Err(RecvError)));
277 }
278}