1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crate::*;
use alloc::sync::Arc;
use core::task::Poll;
use futures_micro::poll_state;
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
done: bool,
}
impl<T> Sender<T> {
pub(crate) fn new(inner: Arc<Inner<T>>) -> Self {
Sender { inner, done: false }
}
pub fn close(self) { }
pub fn is_closed(&self) -> bool { self.inner.state().closed() }
pub async fn wait(self) -> Result<Self, Closed> {
poll_state(Some(self), |this, ctx| {
let mut that = this.take().unwrap();
let state = that.inner.state();
if state.closed() {
that.done = true;
Poll::Ready(Err(Closed()))
} else if state.recv() {
that.done = true;
Poll::Ready(Ok(that))
} else {
that.inner.set_send(ctx.waker().clone());
*this = Some(that);
Poll::Pending
}
}).await
}
pub fn send(mut self, value: T) -> Result<(), Closed> {
self.done = true;
let inner = &mut self.inner;
let state = inner.set_value(value);
if !state.closed() {
if state.recv() {
inner.recv().wake_by_ref();
Ok(())
} else {
Ok(())
}
} else {
inner.take_value();
Err(Closed())
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if !self.done {
let state = self.inner.state();
if !state.closed() {
let old = self.inner.close();
if old.recv() { self.inner.recv().wake_by_ref(); }
}
}
}
}