async_oneshot/
receiver.rs1use crate::*;
2use core::{future::Future, pin::Pin};
3use core::task::{Context, Poll};
4
5#[derive(Debug)]
7pub struct Receiver<T> {
8 inner: Arc<Inner<T>>,
9 done: bool,
10}
11
12impl<T> Receiver<T> {
13 #[inline(always)]
14 pub(crate) fn new(inner: Arc<Inner<T>>) -> Self {
15 Receiver { inner, done: false }
16 }
17
18 #[inline(always)]
20 pub fn close(self) { }
21
22 #[inline(always)]
23 fn handle_state(&mut self, state: crate::inner::State) -> Poll<Result<T, Closed>> {
24 if state.ready() {
25 Poll::Ready(Ok(self.inner.take_value()))
26 } else if state.closed() {
27 Poll::Ready(Err(Closed()))
28 } else {
29 Poll::Pending
30 }.map(|x| {
31 self.done = true;
32 x
33 })
34 }
35
36 #[inline]
39 pub fn try_recv(mut self) -> Result<T, TryRecvError<T>> {
40 let state = self.inner.state();
41 match self.handle_state(state) {
42 Poll::Ready(Ok(x)) => Ok(x),
43 Poll::Ready(Err(Closed())) => Err(TryRecvError::Closed),
44 Poll::Pending => Err(TryRecvError::Empty(self)),
45 }
46 }
47}
48
49impl<T> Future for Receiver<T> {
50 type Output = Result<T, Closed>;
51 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, Closed>> {
52 let this = Pin::into_inner(self);
53 match this.handle_state(this.inner.state()) {
54 Poll::Pending => {},
55 x => return x,
56 }
57 let state = this.inner.set_recv(ctx.waker().clone());
58 match this.handle_state(state) {
59 Poll::Pending => {},
60 x => return x,
61 }
62 if state.send() { this.inner.send().wake_by_ref(); }
63 Poll::Pending
64 }
65}
66
67impl<T> Drop for Receiver<T> {
68 #[inline(always)]
69 fn drop(&mut self) {
70 if !self.done {
71 let state = self.inner.state();
72 if !state.closed() && !state.ready() {
73 let old = self.inner.close();
74 if old.send() { self.inner.send().wake_by_ref(); }
75 }
76 }
77 }
78}