async_oneshot/
sender.rs

1use crate::*;
2use alloc::sync::Arc;
3use core::{future::Future, task::Poll};
4use futures_micro::poll_fn;
5
6/// The sending half of a oneshot channel.
7#[derive(Debug)]
8pub struct Sender<T> {
9    inner: Arc<Inner<T>>,
10    done: bool,
11}
12
13impl<T> Sender<T> {
14    #[inline(always)]
15    pub(crate) fn new(inner: Arc<Inner<T>>) -> Self {
16        Sender { inner, done: false }
17    }
18
19    /// Closes the channel by causing an immediate drop
20    #[inline(always)]
21    pub fn close(self) { }
22
23    /// true if the channel is closed
24    #[inline(always)]
25    pub fn is_closed(&self) -> bool { self.inner.state().closed() }
26
27    /// Waits for a Receiver to be waiting for us to send something
28    /// (i.e. allows you to produce a value to send on demand).
29    /// Fails if the Receiver is dropped.
30    #[inline]
31    pub fn wait(self) -> impl Future<Output = Result<Self, Closed>> {
32        let mut this = Some(self);
33        poll_fn(move |ctx| {
34            let mut that = this.take().unwrap();
35            let state = that.inner.state();
36            if state.closed() {
37                that.done = true;
38                Poll::Ready(Err(Closed()))
39            } else if state.recv() {
40                Poll::Ready(Ok(that))
41            } else {
42                that.inner.set_send(ctx.waker().clone());
43                this = Some(that);
44                Poll::Pending
45            }
46        })
47    }
48
49    /// Sends a message on the channel. Fails if the Receiver is dropped.
50    #[inline]
51    pub fn send(&mut self, value: T) -> Result<(), Closed> {
52        if self.done {
53            Err(Closed())
54        } else {
55            self.done = true;
56            let inner = &mut self.inner;
57            let state = inner.set_value(value);
58            if !state.closed() {
59                if state.recv() {
60                    inner.recv().wake_by_ref();
61                    Ok(())
62                } else {
63                    Ok(())
64                }
65            } else {
66                inner.take_value(); // force drop.
67                Err(Closed())
68            }
69        }
70    }
71
72}
73
74impl<T> Drop for Sender<T> {
75    #[inline(always)]
76    fn drop(&mut self) {
77        if !self.done {
78            let state = self.inner.state();
79            if !state.closed() {
80                let old = self.inner.close();
81                if old.recv() { self.inner.recv().wake_by_ref(); }
82            }
83        }
84    }
85}