1use futures::future::FusedFuture;
2use futures::stream::FusedStream;
3use futures::Stream;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll, Waker};
7
8pub struct Optional<T> {
16 task: Option<T>,
17 waker: Option<Waker>,
18}
19
20impl<T: Unpin> Unpin for Optional<T> {}
21
22impl<T> Default for Optional<T> {
23 fn default() -> Self {
24 Self {
25 task: None,
26 waker: None,
27 }
28 }
29}
30
31impl<T> From<Option<T>> for Optional<T> {
32 fn from(task: Option<T>) -> Self {
33 Self { task, waker: None }
34 }
35}
36
37impl<T> From<T> for Optional<T> {
38 fn from(fut: T) -> Self {
39 Self {
40 task: Some(fut),
41 waker: None,
42 }
43 }
44}
45
46impl<T> Optional<T> {
47 pub fn new(task: T) -> Self {
49 Self {
50 task: Some(task),
51 waker: None,
52 }
53 }
54
55 pub fn with_future(future: T) -> Self
57 where
58 T: Future,
59 {
60 Self::new(future)
61 }
62
63 pub fn with_stream(stream: T) -> Self
65 where
66 T: Stream,
67 {
68 Self::new(stream)
69 }
70
71 pub fn take(&mut self) -> Option<T> {
73 let fut = self.task.take();
74 if let Some(waker) = self.waker.take() {
75 waker.wake();
76 }
77 fut
78 }
79
80 pub fn is_some(&self) -> bool {
82 self.task.is_some()
83 }
84
85 pub fn is_none(&self) -> bool {
87 self.task.is_none()
88 }
89
90 pub fn as_ref(&self) -> Option<&T> {
92 self.task.as_ref()
93 }
94
95 pub fn as_mut(&mut self) -> Option<&mut T> {
97 self.task.as_mut()
98 }
99
100 pub fn replace(&mut self, task: T) -> Option<T> {
102 let fut = self.task.replace(task);
103 if let Some(waker) = self.waker.take() {
104 waker.wake();
105 }
106 fut
107 }
108
109 pub fn as_pin_mut(&mut self) -> Option<Pin<&mut T>>
111 where
112 T: Unpin,
113 {
114 self.task.as_mut().map(Pin::new)
115 }
116
117 pub fn as_pin_ref(&self) -> Option<Pin<&T>>
119 where
120 T: Unpin,
121 {
122 self.task.as_ref().map(Pin::new)
123 }
124}
125
126impl<F> Future for Optional<F>
127where
128 F: Future + Unpin,
129{
130 type Output = F::Output;
131
132 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
133 let Some(future) = self.as_pin_mut() else {
134 self.waker.replace(cx.waker().clone());
135 return Poll::Pending;
136 };
137
138 match future.poll(cx) {
139 Poll::Ready(output) => {
140 self.task.take();
141 Poll::Ready(output)
142 }
143 Poll::Pending => {
144 self.waker.replace(cx.waker().clone());
145 Poll::Pending
146 }
147 }
148 }
149}
150
151impl<F: Future> FusedFuture for Optional<F>
152where
153 F: Future + Unpin,
154{
155 fn is_terminated(&self) -> bool {
156 self.task.is_none()
157 }
158}
159
160impl<S> Stream for Optional<S>
161where
162 S: Stream + Unpin,
163{
164 type Item = S::Item;
165
166 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167 let Some(stream) = self.as_pin_mut() else {
168 self.waker.replace(cx.waker().clone());
169 return Poll::Pending;
170 };
171
172 match stream.poll_next(cx) {
173 Poll::Ready(Some(output)) => Poll::Ready(Some(output)),
174 Poll::Ready(None) => {
175 self.task.take();
176 Poll::Ready(None)
177 }
178 Poll::Pending => {
179 self.waker.replace(cx.waker().clone());
180 Poll::Pending
181 }
182 }
183 }
184
185 fn size_hint(&self) -> (usize, Option<usize>) {
186 match self.task.as_ref() {
187 Some(st) => st.size_hint(),
188 None => (0, Some(0)),
189 }
190 }
191}
192
193impl<S> FusedStream for Optional<S>
194where
195 S: Stream + Unpin,
196{
197 fn is_terminated(&self) -> bool {
198 self.task.is_none()
199 }
200}
201
202#[cfg(test)]
203mod test {
204 use super::*;
205 use futures::StreamExt;
206
207 #[test]
208 fn test_optional_future() {
209 let mut future = Optional::new(futures::future::ready(0));
210 assert!(future.is_some());
211 let waker = futures::task::noop_waker_ref();
212
213 let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
214 assert_eq!(val, Poll::Ready(0));
215 assert!(future.is_none());
216 }
217
218 #[test]
219 fn reusable_optional_future() {
220 let mut future = Optional::new(futures::future::ready(0));
221 assert!(future.is_some());
222 let waker = futures::task::noop_waker_ref();
223
224 let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
225 assert_eq!(val, Poll::Ready(0));
226 assert!(future.is_none());
227
228 future.replace(futures::future::ready(1));
229 assert!(future.is_some());
230
231 let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
232 assert_eq!(val, Poll::Ready(1));
233 assert!(future.is_none());
234 }
235
236 #[test]
237 fn convert_future_to_optional_future() {
238 let fut = futures::future::ready(0);
239
240 let mut future = Optional::from(fut);
241 assert!(future.is_some());
242 let waker = futures::task::noop_waker_ref();
243
244 let val = Pin::new(&mut future).poll(&mut Context::from_waker(waker));
245 assert_eq!(val, Poll::Ready(0));
246 assert!(future.is_none());
247 }
248
249 #[test]
250 fn test_optional_stream() {
251 let mut stream = Optional::new(futures::stream::once(async { 0 }).boxed());
252 assert!(stream.is_some());
253 let waker = futures::task::noop_waker_ref();
254
255 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
256 assert_eq!(val, Poll::Ready(Some(0)));
257 assert!(stream.is_some());
258
259 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
260 assert_eq!(val, Poll::Ready(None));
261 assert!(stream.is_none());
262 }
263
264 #[test]
265 fn reusable_optional_stream() {
266 let mut stream = Optional::new(futures::stream::once(async { 0 }).boxed());
267 assert!(stream.is_some());
268 let waker = futures::task::noop_waker_ref();
269
270 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
271 assert_eq!(val, Poll::Ready(Some(0)));
272 assert!(stream.is_some());
273
274 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
275 assert_eq!(val, Poll::Ready(None));
276 assert!(stream.is_none());
277
278 stream.replace(futures::stream::once(async { 1 }).boxed());
279 assert!(stream.is_some());
280
281 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
282 assert_eq!(val, Poll::Ready(Some(1)));
283 assert!(stream.is_some());
284
285 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
286 assert_eq!(val, Poll::Ready(None));
287 assert!(stream.is_none());
288 }
289
290 #[test]
291 fn convert_stream_to_optional_stream() {
292 let st = futures::stream::once(async { 0 }).boxed();
293
294 let mut stream = Optional::from(st);
295
296 assert!(stream.is_some());
297 let waker = futures::task::noop_waker_ref();
298
299 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
300 assert_eq!(val, Poll::Ready(Some(0)));
301 assert!(stream.is_some());
302
303 let val = Pin::new(&mut stream).poll_next(&mut Context::from_waker(waker));
304 assert_eq!(val, Poll::Ready(None));
305 assert!(stream.is_none());
306 }
307}