crossfire/
stream.rs

1use crate::shared::*;
2use crate::{AsyncRx, MAsyncRx};
3use futures_core::stream;
4use std::fmt;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::task::*;
8
9/// Constructed by [AsyncRx::into_stream()](crate::AsyncRx::into_stream())
10///
11/// Implements `futures_core::stream::Stream`.
12pub struct AsyncStream<F: Flavor> {
13    rx: AsyncRx<F>,
14    waker: Option<<F::Recv as Registry>::Waker>,
15    ended: bool,
16}
17
18impl<F: Flavor> fmt::Debug for AsyncStream<F> {
19    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20        write!(f, "AsyncStream")
21    }
22}
23
24impl<F: Flavor> fmt::Display for AsyncStream<F> {
25    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26        write!(f, "AsyncStream")
27    }
28}
29
30impl<F: Flavor> AsyncStream<F> {
31    #[inline(always)]
32    pub fn new(rx: AsyncRx<F>) -> Self {
33        Self { rx, waker: None, ended: false }
34    }
35
36    /// `poll_item()` will try to receive a message.
37    /// If the channel is empty, it will register a notification for the next poll.
38    ///
39    /// # Behavior
40    ///
41    /// The polling behavior is different from [RecvFuture](crate::RecvFuture).
42    /// Because the waker is not exposed to the user, you cannot perform delicate operations on
43    /// the waker (compared to the `Drop` handler in `RecvFuture`).
44    /// To make sure no deadlock happens on cancellation, the `WakerState` will be `Init`
45    /// after being registered (and will not be converted to `Waiting`).
46    /// The senders will wake up all `Init` state wakers until they find a normal
47    /// pending receiver in the `Waiting` state.
48    ///
49    /// # Return Value:
50    ///
51    /// Returns `Ok(T)` on success.
52    ///
53    /// Returns Err([TryRecvError::Empty]) for a `Poll::Pending` case.
54    /// The next time the channel is not empty, your future will be woken again.
55    /// You should then continue calling `poll_item()` to receive the message.
56    /// If you want to cancel, just don't call `poll_item()` again. Others will still have a chance
57    /// to receive messages.
58    ///
59    /// Returns Err([TryRecvError::Disconnected]) if all `Tx` have been dropped and the channel is empty.
60    #[inline]
61    pub fn poll_item(&mut self, ctx: &mut Context) -> Poll<Option<F::Item>> {
62        match self.rx.poll_item::<true>(ctx, &mut self.waker) {
63            Ok(item) => Poll::Ready(Some(item)),
64            Err(e) => {
65                if e.is_empty() {
66                    return Poll::Pending;
67                }
68                self.ended = true;
69                return Poll::Ready(None);
70            }
71        }
72    }
73}
74
75impl<F: Flavor> Deref for AsyncStream<F> {
76    type Target = AsyncRx<F>;
77
78    #[inline]
79    fn deref(&self) -> &Self::Target {
80        &self.rx
81    }
82}
83
84impl<F: Flavor> stream::Stream for AsyncStream<F> {
85    type Item = F::Item;
86
87    #[inline(always)]
88    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
89        let mut _self = self.get_mut();
90        if _self.ended {
91            return Poll::Ready(None);
92        }
93        match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
94            Ok(item) => Poll::Ready(Some(item)),
95            Err(e) => {
96                if e.is_empty() {
97                    return Poll::Pending;
98                }
99                _self.ended = true;
100                return Poll::Ready(None);
101            }
102        }
103    }
104}
105
106impl<F: Flavor> stream::FusedStream for AsyncStream<F> {
107    fn is_terminated(&self) -> bool {
108        self.ended
109    }
110}
111
112impl<F: Flavor> Drop for AsyncStream<F> {
113    fn drop(&mut self) {
114        if let Some(waker) = self.waker.as_ref() {
115            self.rx.shared.abandon_recv_waker(waker);
116        }
117    }
118}
119
120impl<F: Flavor> From<AsyncRx<F>> for AsyncStream<F> {
121    #[inline]
122    fn from(rx: AsyncRx<F>) -> Self {
123        rx.into_stream()
124    }
125}
126
127impl<F: Flavor> From<MAsyncRx<F>> for AsyncStream<F> {
128    #[inline]
129    fn from(rx: MAsyncRx<F>) -> Self {
130        rx.into_stream()
131    }
132}