pollable_map/stream/
optional.rs

1use futures::stream::FusedStream;
2use futures::Stream;
3use std::pin::Pin;
4use std::task::{Context, Poll, Waker};
5
6/// A reusable stream that is the equivalent to an `Option`.
7///
8/// By default, this future will be empty, which would return  [`Poll::Pending`] when polled,
9/// but if a [`Stream`] is supplied either upon construction via [`OptionalStream::new`] or
10/// is set via [`OptionalStream::replace`], the stream could later polled once [`OptionalStream`]
11/// is polled. Once the stream is polled to completion, [`OptionalStream`] will be empty.
12pub struct OptionalStream<S> {
13    stream: Option<S>,
14    waker: Option<Waker>,
15}
16
17impl<S: Unpin> Unpin for OptionalStream<S> {}
18
19impl<S> Default for OptionalStream<S> {
20    fn default() -> Self {
21        Self {
22            stream: None,
23            waker: None,
24        }
25    }
26}
27
28impl<S> From<Option<S>> for OptionalStream<S> {
29    fn from(st: Option<S>) -> Self {
30        Self {
31            stream: st,
32            waker: None,
33        }
34    }
35}
36
37impl<S: Stream> From<S> for OptionalStream<S> {
38    fn from(st: S) -> Self {
39        Self {
40            stream: Some(st),
41            waker: None,
42        }
43    }
44}
45
46impl<S> OptionalStream<S> {
47    /// Constructs a new `OptionalStream` with an existing `Stream`.
48    pub fn new(st: S) -> Self {
49        Self {
50            stream: Some(st),
51            waker: None,
52        }
53    }
54
55    /// Take the stream out, leaving the `OptionalStream` empty
56    pub fn take(&mut self) -> Option<S> {
57        let fut = self.stream.take();
58        if let Some(waker) = self.waker.take() {
59            waker.wake();
60        }
61        fut
62    }
63
64    /// Returns true if stream still exist.
65    pub fn is_some(&self) -> bool {
66        self.stream.is_some()
67    }
68
69    /// Returns false if stream doesnt exist or has been completed.
70    pub fn is_none(&self) -> bool {
71        self.stream.is_none()
72    }
73
74    /// Returns reference of a stream.
75    pub fn as_ref(&self) -> Option<&S> {
76        self.stream.as_ref()
77    }
78
79    /// Returns mutable reference of a stream.
80    pub fn as_mut(&mut self) -> Option<&mut S> {
81        self.stream.as_mut()
82    }
83
84    /// Replaces the current stream with a new one, returning the previous stream if present.
85    pub fn replace(&mut self, st: S) -> Option<S> {
86        let fut = self.stream.replace(st);
87        if let Some(waker) = self.waker.take() {
88            waker.wake();
89        }
90        fut
91    }
92}
93
94impl<S> Stream for OptionalStream<S>
95where
96    S: Stream + Send + Unpin + 'static,
97{
98    type Item = S::Item;
99
100    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101        let Some(stream) = self.stream.as_mut() else {
102            self.waker.replace(cx.waker().clone());
103            return Poll::Pending;
104        };
105
106        match Pin::new(stream).poll_next(cx) {
107            Poll::Ready(Some(output)) => Poll::Ready(Some(output)),
108            Poll::Ready(None) => {
109                self.stream.take();
110                Poll::Ready(None)
111            }
112            Poll::Pending => {
113                self.waker.replace(cx.waker().clone());
114                Poll::Pending
115            }
116        }
117    }
118
119    fn size_hint(&self) -> (usize, Option<usize>) {
120        match self.stream.as_ref() {
121            Some(st) => st.size_hint(),
122            None => (0, Some(0)),
123        }
124    }
125}
126
127impl<S> FusedStream for OptionalStream<S>
128where
129    S: Stream + Send + Unpin + 'static,
130{
131    fn is_terminated(&self) -> bool {
132        self.stream.is_none()
133    }
134}
135
136#[cfg(test)]
137mod test {
138    use super::*;
139    use futures::StreamExt;
140
141    #[test]
142    fn test_optional_stream() {
143        let mut stream = OptionalStream::new(futures::stream::once(async { 0 }).boxed());
144        assert!(stream.is_some());
145        let waker = futures::task::noop_waker_ref();
146
147        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
148        assert_eq!(val, Poll::Ready(Some(0)));
149        assert!(stream.is_some());
150
151        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
152        assert_eq!(val, Poll::Ready(None));
153        assert!(stream.is_none());
154    }
155
156    #[test]
157    fn reusable_optional_stream() {
158        let mut stream = OptionalStream::new(futures::stream::once(async { 0 }).boxed());
159        assert!(stream.is_some());
160        let waker = futures::task::noop_waker_ref();
161
162        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
163        assert_eq!(val, Poll::Ready(Some(0)));
164        assert!(stream.is_some());
165
166        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
167        assert_eq!(val, Poll::Ready(None));
168        assert!(stream.is_none());
169
170        stream.replace(futures::stream::once(async { 1 }).boxed());
171        assert!(stream.is_some());
172
173        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
174        assert_eq!(val, Poll::Ready(Some(1)));
175        assert!(stream.is_some());
176
177        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
178        assert_eq!(val, Poll::Ready(None));
179        assert!(stream.is_none());
180    }
181
182    #[test]
183    fn convert_stream_to_optional_stream() {
184        let st = futures::stream::once(async { 0 }).boxed();
185
186        let mut stream = OptionalStream::from(st);
187
188        assert!(stream.is_some());
189        let waker = futures::task::noop_waker_ref();
190
191        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
192        assert_eq!(val, Poll::Ready(Some(0)));
193        assert!(stream.is_some());
194
195        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
196        assert_eq!(val, Poll::Ready(None));
197        assert!(stream.is_none());
198    }
199}