pollable_map/
optional.rs

1use futures::future::FusedFuture;
2use futures::stream::FusedStream;
3use futures::Stream;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll, Waker};
7
8/// A reusable future or stream based on `Option`.
9///
10/// By default, `Optional` will be empty, similar to `Option::None`, which would return [`Poll::Pending`] when polled,
11/// but if a [`Future`] or [`Stream`] is supplied either upon construction via [`Optional::new`] or
12/// is set via [`Optional::replace`], it would then be polled once [`Optional`]
13/// is polled. Once the future is polled to completion, the results will be returned, with
14/// [`Optional`] being empty.
15pub struct Optional<T> {
16    task: Option<T>,
17    waker: Option<Waker>,
18}
19
20impl<T: Unpin> Unpin for Optional<T> {}
21
22impl<T> Default for Optional<T> {
23    fn default() -> Self {
24        Self {
25            task: None,
26            waker: None,
27        }
28    }
29}
30
31impl<T> From<Option<T>> for Optional<T> {
32    fn from(task: Option<T>) -> Self {
33        Self { task, waker: None }
34    }
35}
36
37impl<T> From<T> for Optional<T> {
38    fn from(fut: T) -> Self {
39        Self {
40            task: Some(fut),
41            waker: None,
42        }
43    }
44}
45
46impl<T> Optional<T> {
47    /// Construct a new [`Optional`] with an existing [`Future`] or [`Stream`].
48    pub fn new(task: T) -> Self {
49        Self {
50            task: Some(task),
51            waker: None,
52        }
53    }
54
55    /// Construct a new [`Optional`] with an existing [`Future`].
56    pub fn with_future(future: T) -> Self
57    where
58        T: Future,
59    {
60        Self::new(future)
61    }
62
63    /// Construct a new [`Optional`] with an existing [`Stream`].
64    pub fn with_stream(stream: T) -> Self
65    where
66        T: Stream,
67    {
68        Self::new(stream)
69    }
70
71    /// Takes the future or stream out, leaving the [`Optional`] empty.
72    pub fn take(&mut self) -> Option<T> {
73        let fut = self.task.take();
74        if let Some(waker) = self.waker.take() {
75            waker.wake();
76        }
77        fut
78    }
79
80    /// Returns true if the future or stream still exist.
81    pub fn is_some(&self) -> bool {
82        self.task.is_some()
83    }
84
85    /// Returns false if the future or stream doesn't exist or has been completed.
86    pub fn is_none(&self) -> bool {
87        self.task.is_none()
88    }
89
90    /// Returns reference of the future or stream.
91    pub fn as_ref(&self) -> Option<&T> {
92        self.task.as_ref()
93    }
94
95    /// Returns mutable reference of the future or stream.
96    pub fn as_mut(&mut self) -> Option<&mut T> {
97        self.task.as_mut()
98    }
99
100    /// Replaces the current the future or stream with a new one, returning the previous value if present.
101    pub fn replace(&mut self, task: T) -> Option<T> {
102        let fut = self.task.replace(task);
103        if let Some(waker) = self.waker.take() {
104            waker.wake();
105        }
106        fut
107    }
108
109    /// Returns a constructed `Option<Pin<&mut T>>`.
110    pub fn as_pin_mut(&mut self) -> Option<Pin<&mut T>>
111    where
112        T: Unpin,
113    {
114        self.task.as_mut().map(Pin::new)
115    }
116
117    /// Returns a constructed `Option<Pin<&T>>`.
118    pub fn as_pin_ref(&self) -> Option<Pin<&T>>
119    where
120        T: Unpin,
121    {
122        self.task.as_ref().map(Pin::new)
123    }
124}
125
126impl<F> Future for Optional<F>
127where
128    F: Future + Unpin,
129{
130    type Output = F::Output;
131
132    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
133        let Some(future) = self.as_pin_mut() else {
134            self.waker.replace(cx.waker().clone());
135            return Poll::Pending;
136        };
137
138        match future.poll(cx) {
139            Poll::Ready(output) => {
140                self.task.take();
141                Poll::Ready(output)
142            }
143            Poll::Pending => {
144                self.waker.replace(cx.waker().clone());
145                Poll::Pending
146            }
147        }
148    }
149}
150
151impl<F: Future> FusedFuture for Optional<F>
152where
153    F: Future + Unpin,
154{
155    fn is_terminated(&self) -> bool {
156        self.task.is_none()
157    }
158}
159
160impl<S> Stream for Optional<S>
161where
162    S: Stream + Unpin,
163{
164    type Item = S::Item;
165
166    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167        let Some(stream) = self.as_pin_mut() else {
168            self.waker.replace(cx.waker().clone());
169            return Poll::Pending;
170        };
171
172        match stream.poll_next(cx) {
173            Poll::Ready(Some(output)) => Poll::Ready(Some(output)),
174            Poll::Ready(None) => {
175                self.task.take();
176                Poll::Ready(None)
177            }
178            Poll::Pending => {
179                self.waker.replace(cx.waker().clone());
180                Poll::Pending
181            }
182        }
183    }
184
185    fn size_hint(&self) -> (usize, Option<usize>) {
186        match self.task.as_ref() {
187            Some(st) => st.size_hint(),
188            None => (0, Some(0)),
189        }
190    }
191}
192
193impl<S> FusedStream for Optional<S>
194where
195    S: Stream + Unpin,
196{
197    fn is_terminated(&self) -> bool {
198        self.task.is_none()
199    }
200}
201
202#[cfg(test)]
203mod test {
204    use super::*;
205    use futures::StreamExt;
206
207    #[test]
208    fn test_optional_future() {
209        let mut future = Optional::new(futures::future::ready(0));
210        assert!(future.is_some());
211        let waker = futures::task::noop_waker_ref();
212
213        let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
214        assert_eq!(val, Poll::Ready(0));
215        assert!(future.is_none());
216    }
217
218    #[test]
219    fn reusable_optional_future() {
220        let mut future = Optional::new(futures::future::ready(0));
221        assert!(future.is_some());
222        let waker = futures::task::noop_waker_ref();
223
224        let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
225        assert_eq!(val, Poll::Ready(0));
226        assert!(future.is_none());
227
228        future.replace(futures::future::ready(1));
229        assert!(future.is_some());
230
231        let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
232        assert_eq!(val, Poll::Ready(1));
233        assert!(future.is_none());
234    }
235
236    #[test]
237    fn convert_future_to_optional_future() {
238        let fut = futures::future::ready(0);
239
240        let mut future = Optional::from(fut);
241        assert!(future.is_some());
242        let waker = futures::task::noop_waker_ref();
243
244        let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
245        assert_eq!(val, Poll::Ready(0));
246        assert!(future.is_none());
247    }
248
249    #[test]
250    fn test_optional_stream() {
251        let mut stream = Optional::new(futures::stream::once(async { 0 }).boxed());
252        assert!(stream.is_some());
253        let waker = futures::task::noop_waker_ref();
254
255        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
256        assert_eq!(val, Poll::Ready(Some(0)));
257        assert!(stream.is_some());
258
259        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
260        assert_eq!(val, Poll::Ready(None));
261        assert!(stream.is_none());
262    }
263
264    #[test]
265    fn reusable_optional_stream() {
266        let mut stream = Optional::new(futures::stream::once(async { 0 }).boxed());
267        assert!(stream.is_some());
268        let waker = futures::task::noop_waker_ref();
269
270        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
271        assert_eq!(val, Poll::Ready(Some(0)));
272        assert!(stream.is_some());
273
274        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
275        assert_eq!(val, Poll::Ready(None));
276        assert!(stream.is_none());
277
278        stream.replace(futures::stream::once(async { 1 }).boxed());
279        assert!(stream.is_some());
280
281        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
282        assert_eq!(val, Poll::Ready(Some(1)));
283        assert!(stream.is_some());
284
285        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
286        assert_eq!(val, Poll::Ready(None));
287        assert!(stream.is_none());
288    }
289
290    #[test]
291    fn convert_stream_to_optional_stream() {
292        let st = futures::stream::once(async { 0 }).boxed();
293
294        let mut stream = Optional::from(st);
295
296        assert!(stream.is_some());
297        let waker = futures::task::noop_waker_ref();
298
299        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
300        assert_eq!(val, Poll::Ready(Some(0)));
301        assert!(stream.is_some());
302
303        let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
304        assert_eq!(val, Poll::Ready(None));
305        assert!(stream.is_none());
306    }
307}