Skip to main content

crossfire/
stream.rs

1use crate::shared::*;
2use crate::{AsyncRx, MAsyncRx};
3use futures_core::stream;
4use std::fmt;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::task::*;
8
9/// Constructed by [AsyncRx::into_stream()](crate::AsyncRx::into_stream())
10///
11/// Implements `futures_core::stream::Stream`.
12pub struct AsyncStream<F: Flavor> {
13    rx: AsyncRx<F>,
14    waker: Option<<F::Recv as Registry>::Waker>,
15    ended: bool,
16}
17
18impl<F: Flavor> fmt::Debug for AsyncStream<F> {
19    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20        write!(f, "AsyncStream")
21    }
22}
23
24impl<F: Flavor> fmt::Display for AsyncStream<F> {
25    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26        write!(f, "AsyncStream")
27    }
28}
29
30impl<F: Flavor> AsyncStream<F> {
31    #[inline(always)]
32    pub fn new(rx: AsyncRx<F>) -> Self {
33        Self { rx, waker: None, ended: false }
34    }
35
36    /// `poll_item()` will try to receive a message.
37    /// If the channel is empty, it will register a notification for the next poll.
38    ///
39    /// # Behavior
40    ///
41    /// The polling behavior is different from [RecvFuture](crate::RecvFuture).
42    /// Because the waker is not exposed to the user, you cannot perform delicate operations on
43    /// the waker (compared to the `Drop` handler in `RecvFuture`).
44    /// To make sure no deadlock happens on cancellation, the `WakerState` will be `Init`
45    /// after being registered (and will not be converted to `Waiting`).
46    /// The senders will wake up all `Init` state wakers until they find a normal
47    /// pending receiver in the `Waiting` state.
48    ///
49    /// # Return Value:
50    ///
51    /// Returns `Ok(T)` on success.
52    ///
53    /// Returns Err([TryRecvError::Empty]) for a `Poll::Pending` case.
54    /// The next time the channel is not empty, your future will be woken again.
55    /// You should then continue calling `poll_item()` to receive the message.
56    /// If you want to cancel, just don't call `poll_item()` again. Others will still have a chance
57    /// to receive messages.
58    ///
59    /// Returns Err([TryRecvError::Disconnected]) if all `Tx` have been dropped and the channel is empty.
60    #[inline]
61    pub fn poll_item(&mut self, ctx: &mut Context) -> Poll<Option<F::Item>>
62    where
63        F::Item: Send + 'static,
64    {
65        match self.rx.poll_item::<true>(ctx, &mut self.waker) {
66            Ok(item) => Poll::Ready(Some(item)),
67            Err(e) => {
68                if e.is_empty() {
69                    return Poll::Pending;
70                }
71                self.ended = true;
72                return Poll::Ready(None);
73            }
74        }
75    }
76}
77
78impl<F: Flavor> Deref for AsyncStream<F> {
79    type Target = AsyncRx<F>;
80
81    #[inline]
82    fn deref(&self) -> &Self::Target {
83        &self.rx
84    }
85}
86
87impl<F: Flavor> stream::Stream for AsyncStream<F>
88where
89    F::Item: Send + 'static,
90{
91    type Item = F::Item;
92
93    #[inline(always)]
94    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
95        let mut _self = self.get_mut();
96        if _self.ended {
97            return Poll::Ready(None);
98        }
99        match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
100            Ok(item) => Poll::Ready(Some(item)),
101            Err(e) => {
102                if e.is_empty() {
103                    return Poll::Pending;
104                }
105                _self.ended = true;
106                return Poll::Ready(None);
107            }
108        }
109    }
110}
111
112impl<F: Flavor> stream::FusedStream for AsyncStream<F>
113where
114    F::Item: Send + 'static,
115{
116    fn is_terminated(&self) -> bool {
117        self.ended
118    }
119}
120
121impl<F: Flavor> Drop for AsyncStream<F> {
122    fn drop(&mut self) {
123        if let Some(waker) = self.waker.as_ref() {
124            self.rx.shared.abandon_recv_waker(waker);
125        }
126    }
127}
128
129impl<F: Flavor> From<AsyncRx<F>> for AsyncStream<F> {
130    #[inline]
131    fn from(rx: AsyncRx<F>) -> Self {
132        rx.into_stream()
133    }
134}
135
136impl<F: Flavor> From<MAsyncRx<F>> for AsyncStream<F> {
137    #[inline]
138    fn from(rx: MAsyncRx<F>) -> Self {
139        rx.into_stream()
140    }
141}