1use crate::shared::*;
2use crate::{AsyncRx, MAsyncRx};
3use futures_core::stream;
4use std::fmt;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::task::*;
8
9pub struct AsyncStream<F: Flavor> {
13 rx: AsyncRx<F>,
14 waker: Option<<F::Recv as Registry>::Waker>,
15 ended: bool,
16}
17
18impl<F: Flavor> fmt::Debug for AsyncStream<F> {
19 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
20 write!(f, "AsyncStream")
21 }
22}
23
24impl<F: Flavor> fmt::Display for AsyncStream<F> {
25 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26 write!(f, "AsyncStream")
27 }
28}
29
30impl<F: Flavor> AsyncStream<F> {
31 #[inline(always)]
32 pub fn new(rx: AsyncRx<F>) -> Self {
33 Self { rx, waker: None, ended: false }
34 }
35
36 #[inline]
61 pub fn poll_item(&mut self, ctx: &mut Context) -> Poll<Option<F::Item>>
62 where
63 F::Item: Send + 'static,
64 {
65 match self.rx.poll_item::<true>(ctx, &mut self.waker) {
66 Ok(item) => Poll::Ready(Some(item)),
67 Err(e) => {
68 if e.is_empty() {
69 return Poll::Pending;
70 }
71 self.ended = true;
72 return Poll::Ready(None);
73 }
74 }
75 }
76}
77
78impl<F: Flavor> Deref for AsyncStream<F> {
79 type Target = AsyncRx<F>;
80
81 #[inline]
82 fn deref(&self) -> &Self::Target {
83 &self.rx
84 }
85}
86
87impl<F: Flavor> stream::Stream for AsyncStream<F>
88where
89 F::Item: Send + 'static,
90{
91 type Item = F::Item;
92
93 #[inline(always)]
94 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
95 let mut _self = self.get_mut();
96 if _self.ended {
97 return Poll::Ready(None);
98 }
99 match _self.rx.poll_item::<false>(ctx, &mut _self.waker) {
100 Ok(item) => Poll::Ready(Some(item)),
101 Err(e) => {
102 if e.is_empty() {
103 return Poll::Pending;
104 }
105 _self.ended = true;
106 return Poll::Ready(None);
107 }
108 }
109 }
110}
111
112impl<F: Flavor> stream::FusedStream for AsyncStream<F>
113where
114 F::Item: Send + 'static,
115{
116 fn is_terminated(&self) -> bool {
117 self.ended
118 }
119}
120
121impl<F: Flavor> Drop for AsyncStream<F> {
122 fn drop(&mut self) {
123 if let Some(waker) = self.waker.as_ref() {
124 self.rx.shared.abandon_recv_waker(waker);
125 }
126 }
127}
128
129impl<F: Flavor> From<AsyncRx<F>> for AsyncStream<F> {
130 #[inline]
131 fn from(rx: AsyncRx<F>) -> Self {
132 rx.into_stream()
133 }
134}
135
136impl<F: Flavor> From<MAsyncRx<F>> for AsyncStream<F> {
137 #[inline]
138 fn from(rx: MAsyncRx<F>) -> Self {
139 rx.into_stream()
140 }
141}