async_oneshot/
receiver.rs

1use crate::*;
2use core::{future::Future, pin::Pin};
3use core::task::{Context, Poll};
4
5/// The receiving half of a oneshot channel.
6#[derive(Debug)]
7pub struct Receiver<T> {
8    inner: Arc<Inner<T>>,
9    done: bool,
10}
11
12impl<T> Receiver<T> {
13    #[inline(always)]
14    pub(crate) fn new(inner: Arc<Inner<T>>) -> Self {
15        Receiver { inner, done: false }
16    }
17
18    /// Closes the channel by causing an immediate drop.
19    #[inline(always)]
20    pub fn close(self) { }
21
22    #[inline(always)]
23    fn handle_state(&mut self, state: crate::inner::State) -> Poll<Result<T, Closed>> {
24        if state.ready() {
25            Poll::Ready(Ok(self.inner.take_value()))
26        } else if state.closed() {
27            Poll::Ready(Err(Closed()))
28        } else {
29            Poll::Pending
30        }.map(|x| {
31            self.done = true;
32            x
33        })
34    }
35
36    /// Attempts to receive. On failure, if the channel is not closed,
37    /// returns self to try again.
38    #[inline]
39    pub fn try_recv(mut self) -> Result<T, TryRecvError<T>> {
40        let state = self.inner.state();
41        match self.handle_state(state) {
42            Poll::Ready(Ok(x)) => Ok(x),
43            Poll::Ready(Err(Closed())) => Err(TryRecvError::Closed),
44            Poll::Pending => Err(TryRecvError::Empty(self)),
45        }
46    }
47}
48
49impl<T> Future for Receiver<T> {
50    type Output = Result<T, Closed>;
51    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, Closed>> {
52        let this = Pin::into_inner(self);
53        match this.handle_state(this.inner.state()) {
54            Poll::Pending => {},
55            x => return x,
56        }
57        let state = this.inner.set_recv(ctx.waker().clone());
58        match this.handle_state(state) {
59            Poll::Pending => {},
60            x => return x,
61        }
62        if state.send() { this.inner.send().wake_by_ref(); }
63        Poll::Pending
64    }
65}
66
67impl<T> Drop for Receiver<T> {
68    #[inline(always)]
69    fn drop(&mut self) {
70        if !self.done {
71            let state = self.inner.state();
72            if !state.closed() && !state.ready() {
73                let old = self.inner.close();
74                if old.send() { self.inner.send().wake_by_ref(); }
75            }
76        }
77    }
78}