1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use crate::shared::*;
use crate::{AsyncRx, MAsyncRx};
use futures_core::stream;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
use std::task::*;
/// Constructed by [AsyncRx::into_stream()](crate::AsyncRx::into_stream())
///
/// Implements `futures_core::stream::Stream`.
pub struct AsyncStream<F: Flavor> {
rx: AsyncRx<F>,
waker: Option<<F::Recv as Registry>::Waker>,
ended: bool,
}
impl<F: Flavor> fmt::Debug for AsyncStream<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AsyncStream")
}
}
impl<F: Flavor> fmt::Display for AsyncStream<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AsyncStream")
}
}
impl<F: Flavor> AsyncStream<F> {
#[inline(always)]
pub fn new(rx: AsyncRx<F>) -> Self {
Self { rx, waker: None, ended: false }
}
/// `poll_item()` will try to receive a message.
/// If the channel is empty, it will register a notification for the next poll.
///
/// # Behavior
///
/// The polling behavior is different from [RecvFuture](crate::RecvFuture).
/// Because the waker is not exposed to the user, you cannot perform delicate operations on
/// the waker (compared to the `Drop` handler in `RecvFuture`).
/// To make sure no deadlock happens on cancellation, the `WakerState` will be `Init`
/// after being registered (and will not be converted to `Waiting`).
/// The senders will wake up all `Init` state wakers until they find a normal
/// pending receiver in the `Waiting` state.
///
/// # Return Value:
///
/// Returns `Ok(T)` on success.
///
/// Returns Err([TryRecvError::Empty]) for a `Poll::Pending` case.
/// The next time the channel is not empty, your future will be woken again.
/// You should then continue calling `poll_item()` to receive the message.
/// If you want to cancel, just don't call `poll_item()` again. Others will still have a chance
/// to receive messages.
///
/// Returns Err([TryRecvError::Disconnected]) if all `Tx` have been dropped and the channel is empty.
#[inline]
pub fn poll_item(&mut self, ctx: &mut Context) -> Poll<Option<F::Item>>
where
F::Item: Send + 'static,
{
match self.rx.poll_item::<true>(ctx, &mut self.waker) {
Ok(item) => Poll::Ready(Some(item)),
Err(e) => {
if e.is_empty() {
return Poll::Pending;
}
self.ended = true;
Poll::Ready(None)
}
}
}
}
impl<F: Flavor> Deref for AsyncStream<F> {
type Target = AsyncRx<F>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.rx
}
}
impl<F: Flavor> stream::Stream for AsyncStream<F>
where
F::Item: Send + 'static,
{
type Item = F::Item;
#[inline(always)]
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let mut _self = self.get_mut();
if _self.ended {
return Poll::Ready(None);
}
match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
Ok(item) => Poll::Ready(Some(item)),
Err(e) => {
if e.is_empty() {
return Poll::Pending;
}
_self.ended = true;
Poll::Ready(None)
}
}
}
}
impl<F: Flavor> stream::FusedStream for AsyncStream<F>
where
F::Item: Send + 'static,
{
fn is_terminated(&self) -> bool {
self.ended
}
}
impl<F: Flavor> Drop for AsyncStream<F> {
fn drop(&mut self) {
if let Some(waker) = self.waker.as_ref() {
self.rx.shared.abandon_recv_waker(waker);
}
}
}
impl<F: Flavor> From<AsyncRx<F>> for AsyncStream<F> {
#[inline]
fn from(rx: AsyncRx<F>) -> Self {
rx.into_stream()
}
}
impl<F: Flavor> From<MAsyncRx<F>> for AsyncStream<F> {
#[inline]
fn from(rx: MAsyncRx<F>) -> Self {
rx.into_stream()
}
}