pollable_map/stream/
optional.rs1use futures::stream::FusedStream;
2use futures::Stream;
3use std::pin::Pin;
4use std::task::{Context, Poll, Waker};
5
6pub struct OptionalStream<S> {
13 stream: Option<S>,
14 waker: Option<Waker>,
15}
16
17impl<S: Unpin> Unpin for OptionalStream<S> {}
18
19impl<S> Default for OptionalStream<S> {
20 fn default() -> Self {
21 Self {
22 stream: None,
23 waker: None,
24 }
25 }
26}
27
28impl<S> From<Option<S>> for OptionalStream<S> {
29 fn from(st: Option<S>) -> Self {
30 Self {
31 stream: st,
32 waker: None,
33 }
34 }
35}
36
37impl<S: Stream> From<S> for OptionalStream<S> {
38 fn from(st: S) -> Self {
39 Self {
40 stream: Some(st),
41 waker: None,
42 }
43 }
44}
45
46impl<S> OptionalStream<S> {
47 pub fn new(st: S) -> Self {
49 Self {
50 stream: Some(st),
51 waker: None,
52 }
53 }
54
55 pub fn take(&mut self) -> Option<S> {
57 let fut = self.stream.take();
58 if let Some(waker) = self.waker.take() {
59 waker.wake();
60 }
61 fut
62 }
63
64 pub fn is_some(&self) -> bool {
66 self.stream.is_some()
67 }
68
69 pub fn is_none(&self) -> bool {
71 self.stream.is_none()
72 }
73
74 pub fn as_ref(&self) -> Option<&S> {
76 self.stream.as_ref()
77 }
78
79 pub fn as_mut(&mut self) -> Option<&mut S> {
81 self.stream.as_mut()
82 }
83
84 pub fn replace(&mut self, st: S) -> Option<S> {
86 let fut = self.stream.replace(st);
87 if let Some(waker) = self.waker.take() {
88 waker.wake();
89 }
90 fut
91 }
92}
93
94impl<S> Stream for OptionalStream<S>
95where
96 S: Stream + Send + Unpin + 'static,
97{
98 type Item = S::Item;
99
100 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101 let Some(stream) = self.stream.as_mut() else {
102 self.waker.replace(cx.waker().clone());
103 return Poll::Pending;
104 };
105
106 match Pin::new(stream).poll_next(cx) {
107 Poll::Ready(Some(output)) => Poll::Ready(Some(output)),
108 Poll::Ready(None) => {
109 self.stream.take();
110 Poll::Ready(None)
111 }
112 Poll::Pending => {
113 self.waker.replace(cx.waker().clone());
114 Poll::Pending
115 }
116 }
117 }
118
119 fn size_hint(&self) -> (usize, Option<usize>) {
120 match self.stream.as_ref() {
121 Some(st) => st.size_hint(),
122 None => (0, Some(0)),
123 }
124 }
125}
126
127impl<S> FusedStream for OptionalStream<S>
128where
129 S: Stream + Send + Unpin + 'static,
130{
131 fn is_terminated(&self) -> bool {
132 self.stream.is_none()
133 }
134}
135
136#[cfg(test)]
137mod test {
138 use super::*;
139 use futures::StreamExt;
140
141 #[test]
142 fn test_optional_stream() {
143 let mut stream = OptionalStream::new(futures::stream::once(async { 0 }).boxed());
144 assert!(stream.is_some());
145 let waker = futures::task::noop_waker_ref();
146
147 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
148 assert_eq!(val, Poll::Ready(Some(0)));
149 assert!(stream.is_some());
150
151 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
152 assert_eq!(val, Poll::Ready(None));
153 assert!(stream.is_none());
154 }
155
156 #[test]
157 fn reusable_optional_stream() {
158 let mut stream = OptionalStream::new(futures::stream::once(async { 0 }).boxed());
159 assert!(stream.is_some());
160 let waker = futures::task::noop_waker_ref();
161
162 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
163 assert_eq!(val, Poll::Ready(Some(0)));
164 assert!(stream.is_some());
165
166 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
167 assert_eq!(val, Poll::Ready(None));
168 assert!(stream.is_none());
169
170 stream.replace(futures::stream::once(async { 1 }).boxed());
171 assert!(stream.is_some());
172
173 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
174 assert_eq!(val, Poll::Ready(Some(1)));
175 assert!(stream.is_some());
176
177 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
178 assert_eq!(val, Poll::Ready(None));
179 assert!(stream.is_none());
180 }
181
182 #[test]
183 fn convert_stream_to_optional_stream() {
184 let st = futures::stream::once(async { 0 }).boxed();
185
186 let mut stream = OptionalStream::from(st);
187
188 assert!(stream.is_some());
189 let waker = futures::task::noop_waker_ref();
190
191 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
192 assert_eq!(val, Poll::Ready(Some(0)));
193 assert!(stream.is_some());
194
195 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
196 assert_eq!(val, Poll::Ready(None));
197 assert!(stream.is_none());
198 }
199}