1use crate::shared::*;
2use crate::{AsyncRx, MAsyncRx};
3use futures_core::stream;
4use std::fmt;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::task::*;
8
9pub struct AsyncStream<F: Flavor> {
13 rx: AsyncRx<F>,
14 waker: Option<<F::Recv as Registry>::Waker>,
15 ended: bool,
16}
17
18impl<F: Flavor> fmt::Debug for AsyncStream<F> {
19 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20 write!(f, "AsyncStream")
21 }
22}
23
24impl<F: Flavor> fmt::Display for AsyncStream<F> {
25 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26 write!(f, "AsyncStream")
27 }
28}
29
30impl<F: Flavor> AsyncStream<F> {
31 #[inline(always)]
32 pub fn new(rx: AsyncRx<F>) -> Self {
33 Self { rx, waker: None, ended: false }
34 }
35
36 #[inline]
61 pub fn poll_item(&mut self, ctx: &mut Context) -> Poll<Option<F::Item>> {
62 match self.rx.poll_item::<true>(ctx, &mut self.waker) {
63 Ok(item) => Poll::Ready(Some(item)),
64 Err(e) => {
65 if e.is_empty() {
66 return Poll::Pending;
67 }
68 self.ended = true;
69 return Poll::Ready(None);
70 }
71 }
72 }
73}
74
75impl<F: Flavor> Deref for AsyncStream<F> {
76 type Target = AsyncRx<F>;
77
78 #[inline]
79 fn deref(&self) -> &Self::Target {
80 &self.rx
81 }
82}
83
84impl<F: Flavor> stream::Stream for AsyncStream<F> {
85 type Item = F::Item;
86
87 #[inline(always)]
88 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
89 let mut _self = self.get_mut();
90 if _self.ended {
91 return Poll::Ready(None);
92 }
93 match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
94 Ok(item) => Poll::Ready(Some(item)),
95 Err(e) => {
96 if e.is_empty() {
97 return Poll::Pending;
98 }
99 _self.ended = true;
100 return Poll::Ready(None);
101 }
102 }
103 }
104}
105
106impl<F: Flavor> stream::FusedStream for AsyncStream<F> {
107 fn is_terminated(&self) -> bool {
108 self.ended
109 }
110}
111
112impl<F: Flavor> Drop for AsyncStream<F> {
113 fn drop(&mut self) {
114 if let Some(waker) = self.waker.as_ref() {
115 self.rx.shared.abandon_recv_waker(waker);
116 }
117 }
118}
119
120impl<F: Flavor> From<AsyncRx<F>> for AsyncStream<F> {
121 #[inline]
122 fn from(rx: AsyncRx<F>) -> Self {
123 rx.into_stream()
124 }
125}
126
127impl<F: Flavor> From<MAsyncRx<F>> for AsyncStream<F> {
128 #[inline]
129 fn from(rx: MAsyncRx<F>) -> Self {
130 rx.into_stream()
131 }
132}