tarantool/fiber/async/
oneshot.rs

1//! A one-shot channel is used for sending a single message between
2//! asynchronous tasks. The [`channel`] function is used to create a
3//! [`Sender`] and [`Receiver`] handle pair that form the channel.
4//!
5//! The `Sender` handle is used by the producer to send the value.
6//! The `Receiver` handle is used by the consumer to receive the value.
7//!
8//! Each handle can be used on separate fiber.
9//!
10//! Since the `send` method is not async it can be used from non-async code.
11//!
12//! # Example
13//! ```no_run
14//! use tarantool::fiber::r#async::oneshot;
15//! use tarantool::fiber;
16//!
17//! let (tx, rx) = oneshot::channel::<i32>();
18//! tx.send(56);
19//! let value = fiber::block_on(rx);
20//! ```
21//!
22//! If the sender is dropped without sending, the receiver will fail with
23//! [`super::RecvError`]:
24
25use super::RecvError;
26use std::{
27    cell::Cell,
28    fmt::Debug,
29    future::Future,
30    pin::Pin,
31    rc::{Rc, Weak},
32    task::{Context, Poll, Waker},
33};
34
35#[derive(Debug)]
36enum State<T> {
37    Pending(Option<Waker>),
38    Ready(T),
39}
40
41impl<T> Default for State<T> {
42    fn default() -> Self {
43        Self::Pending(None)
44    }
45}
46
47/// Receives a value from the associated [`Sender`].
48///
49/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
50/// [`channel`](fn@channel) function.
51///
52/// This channel has no `recv` method because the receiver itself implements the
53/// [`Future`] trait. To receive a value, `.await` the `Receiver` object directly.
54///
55/// If the sender is dropped without sending, the receiver will fail with
56/// [`super::RecvError`]
57#[must_use = "futures do nothing unless you `.await` or poll them"]
58pub struct Receiver<T>(Rc<Cell<State<T>>>);
59
60/// Sends a value to the associated [`Receiver`].
61///
62/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
63/// [`channel`](fn@channel) function.
64///
65/// If the sender is dropped without sending, the receiver will fail with
66/// [`super::RecvError`]
67pub struct Sender<T>(Weak<Cell<State<T>>>);
68
69impl<T> Receiver<T> {
70    /// Returns `true` if the associated [`Sender`] handle has been dropped.
71    ///
72    /// If `true` is returned, awaiting this future will always result in an error.
73    #[inline]
74    pub fn is_closed(&self) -> bool {
75        Rc::weak_count(&self.0) == 0
76    }
77}
78
79impl<T> Future for Receiver<T> {
80    type Output = Result<T, RecvError>;
81
82    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let cell = &self.0;
84        match cell.take() {
85            State::Pending(mut waker) if !self.is_closed() => {
86                waker.get_or_insert_with(|| cx.waker().clone());
87                cell.set(State::Pending(waker));
88                Poll::Pending
89            }
90            State::Pending(_) => Poll::Ready(Err(RecvError)),
91            State::Ready(t) => Poll::Ready(Ok(t)),
92        }
93    }
94}
95
96impl<T> Debug for Receiver<T> {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("Receiver").finish_non_exhaustive()
99    }
100}
101
102impl<T> Sender<T> {
103    /// Attempts to send a value on this channel, returning it back if it could
104    /// not be sent.
105    ///
106    /// This method consumes `self` as only one value may ever be sent on a oneshot
107    /// channel. It is not marked async because sending a message to an oneshot
108    /// channel never requires any form of waiting.  Because of this, the `send`
109    /// method can be used in both synchronous and asynchronous code without
110    /// problems.
111    ///
112    /// A successful send occurs when it is determined that the other end of the
113    /// channel has not hung up already. An unsuccessful send would be one where
114    /// the corresponding receiver has already been deallocated. Note that a
115    /// return value of `Err` means that the data will never be received, but
116    /// a return value of `Ok` does *not* mean that the data will be received.
117    /// It is possible for the corresponding receiver to hang up immediately
118    /// after this function returns `Ok`.
119    pub fn send(self, value: T) -> Result<(), T> {
120        let cell = if let Some(cell) = self.0.upgrade() {
121            cell
122        } else {
123            return Err(value);
124        };
125
126        if let State::Pending(Some(waker)) = cell.take() {
127            waker.wake()
128        }
129
130        cell.set(State::Ready(value));
131        Ok(())
132    }
133
134    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
135    ///
136    /// A [`Receiver`] is closed when
137    /// [`Receiver`] value is dropped.
138    ///
139    /// If `true` is returned, a call to `send` will always result in an error.    
140    pub fn is_closed(&self) -> bool {
141        self.0.strong_count() == 0
142    }
143}
144
145impl<T> Debug for Sender<T> {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("Sender").finish_non_exhaustive()
148    }
149}
150
151impl<T> Drop for Sender<T> {
152    fn drop(&mut self) {
153        let cell = if let Some(cell) = self.0.upgrade() {
154            cell
155        } else {
156            return;
157        };
158        match cell.take() {
159            ready @ State::Ready(_) => cell.set(ready),
160            State::Pending(Some(waker)) => waker.wake(),
161            State::Pending(None) => (),
162        }
163    }
164}
165
166/// Creates a new one-shot channel for sending single values across asynchronous
167/// tasks.
168///
169/// The function returns separate "send" and "receive" handles. The `Sender`
170/// handle is used by the producer to send the value. The `Receiver` handle is
171/// used by the consumer to receive the value.
172///
173/// Each handle can be used on separate fibers.
174///
175/// See [`super::oneshot`] for examples.
176pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
177    let cell = Cell::new(State::default());
178    let strong = Rc::from(cell);
179    let weak = Rc::downgrade(&strong);
180    (Sender(weak), Receiver(strong))
181}
182
183#[cfg(feature = "internal_test")]
184mod tests {
185    use super::*;
186    use crate::fiber;
187    use futures::join;
188    use std::time::Duration;
189
190    #[crate::test(tarantool = "crate")]
191    fn drop_receiver() {
192        let (tx, rx) = channel::<i32>();
193        assert!(!tx.is_closed());
194        drop(rx);
195        assert!(tx.is_closed());
196        assert_eq!(tx.send(0).unwrap_err(), 0);
197    }
198
199    #[crate::test(tarantool = "crate")]
200    fn drop_sender() {
201        let (tx, rx) = channel::<i32>();
202        assert!(!rx.is_closed());
203        drop(tx);
204        assert!(rx.is_closed());
205        assert_eq!(fiber::block_on(rx).unwrap_err(), RecvError);
206    }
207
208    #[crate::test(tarantool = "crate")]
209    fn receive_non_blocking() {
210        let (tx, rx) = channel::<i32>();
211        tx.send(56).unwrap();
212        assert_eq!(fiber::block_on(rx), Ok(56));
213    }
214
215    #[crate::test(tarantool = "crate")]
216    fn receive_non_blocking_after_dropping_sender() {
217        let (tx, rx) = channel::<i32>();
218        drop(tx);
219        assert_eq!(fiber::block_on(rx), Err(RecvError));
220    }
221
222    #[crate::test(tarantool = "crate")]
223    fn receive_blocking_before_sending() {
224        let (tx, rx) = channel::<i32>();
225        let jh = fiber::start_async(rx);
226        tx.send(39).unwrap();
227        assert_eq!(jh.join(), Ok(39));
228    }
229
230    #[crate::test(tarantool = "crate")]
231    fn receive_blocking_before_dropping_sender() {
232        let (tx, rx) = channel::<i32>();
233        let jh = fiber::start_async(rx);
234        drop(tx);
235        assert_eq!(jh.join(), Err(RecvError));
236    }
237
238    #[crate::test(tarantool = "crate")]
239    fn join_two_after_sending() {
240        let f = async {
241            let (tx1, rx1) = channel::<i32>();
242            let (tx2, rx2) = channel::<i32>();
243
244            tx1.send(101).unwrap();
245            tx2.send(102).unwrap();
246            join!(rx1, rx2)
247        };
248        assert_eq!(fiber::block_on(f), (Ok(101), Ok(102)));
249    }
250
251    #[crate::test(tarantool = "crate")]
252    fn join_two_before_sending() {
253        let c = fiber::Cond::new();
254        drop(c);
255
256        let (tx1, rx1) = channel::<i32>();
257        let (tx2, rx2) = channel::<i32>();
258
259        let jh = fiber::start_async(async { join!(rx1, rx2) });
260
261        tx1.send(201).unwrap();
262        fiber::sleep(Duration::ZERO);
263        tx2.send(202).unwrap();
264        assert_eq!(jh.join(), (Ok(201), Ok(202)));
265    }
266
267    #[crate::test(tarantool = "crate")]
268    fn join_two_drop_one() {
269        let (tx1, rx1) = channel::<i32>();
270        let (tx2, rx2) = channel::<i32>();
271
272        let jh = fiber::start_async(async { join!(rx1, rx2) });
273        tx1.send(301).unwrap();
274        fiber::sleep(Duration::ZERO);
275        drop(tx2);
276        assert_eq!(jh.join(), (Ok(301), Err(RecvError)));
277    }
278}