async_fuse/fuse.rs
1//! Extension trait to simplify optionally polling futures.
2
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7#[cfg(feature = "alloc")]
8use alloc::boxed::Box;
9
10#[cfg(feature = "stream03")]
11use futures_core03::Stream as Stream03;
12
13use crate::poll::{self, PollFuture, PollInner, Project};
14
15/// A fusing adapter around a value.
16///
17/// A `Fuse<T>` is similar to `Option<T>`, with the exception that it provides
18/// and API which is more suitable for interacting with asynchronous tasks and
19/// pinned values.
20///
21/// For most polling operations (except [`Fuse::poll_inner`]), if the value
22/// completes, the adapter will switch to an [empty state][Fuse::empty] and
23/// return [`Poll::Pending`]. It can later be updated again with
24/// [set][Fuse::set].
25///
26/// See [`Fuse::new`] for more details.
27pub struct Fuse<T> {
28 value: Option<T>,
29}
30
31#[cfg(feature = "alloc")]
32#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
33impl<T> Fuse<Pin<Box<T>>> {
34 /// Construct a fusing adapter around a value that is already pinned.
35 ///
36 /// # Examples
37 ///
38 /// ```rust
39 /// use async_fuse::Fuse;
40 /// use std::future::Future;
41 /// use tokio::time;
42 ///
43 /// async fn foo() -> u32 { 1 }
44 ///
45 /// # #[tokio::main]
46 /// # async fn main() {
47 /// let mut fut = Fuse::pin(foo());
48 /// assert!(!fut.is_empty());
49 ///
50 /// let value = (&mut fut).await;
51 /// assert!(fut.is_empty());
52 /// # }
53 /// ```
54 #[inline]
55 pub fn pin(value: T) -> Self {
56 Self {
57 value: Some(Box::pin(value)),
58 }
59 }
60}
61
62impl<T> Fuse<T> {
63 /// Construct a fusing adapter around a value.
64 ///
65 /// # Examples
66 ///
67 /// ```rust
68 /// use std::pin::pin;
69 /// use async_fuse::Fuse;
70 /// use std::time::Duration;
71 /// use tokio::time;
72 ///
73 /// # #[tokio::main]
74 /// # async fn main() {
75 /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
76 ///
77 /// tokio::select! {
78 /// _ = &mut sleep => {
79 /// assert!(sleep.is_empty());
80 /// sleep.set(Fuse::new(time::sleep(Duration::from_millis(200))));
81 /// }
82 /// }
83 ///
84 /// assert!(!sleep.is_empty());
85 /// # }
86 /// ```
87 ///
88 /// # Example using an unsized trait object
89 ///
90 /// ```rust
91 /// use async_fuse::Fuse;
92 /// use std::future::Future;
93 /// use std::pin::Pin;
94 /// use tokio::time;
95 ///
96 /// async fn foo() -> u32 { 1 }
97 /// async fn bar() -> u32 { 2 }
98 ///
99 /// # #[tokio::main]
100 /// # async fn main() {
101 /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::new(Box::pin(foo()));
102 /// let mut total = 0;
103 ///
104 /// while !fut.is_empty() {
105 /// let value = (&mut fut).await;
106 ///
107 /// if value == 1 {
108 /// fut.set(Box::pin(bar()));
109 /// }
110 ///
111 /// total += value;
112 /// }
113 ///
114 /// assert_eq!(total, 3);
115 /// # }
116 /// ```
117 #[inline]
118 pub fn new(value: T) -> Self {
119 Self { value: Some(value) }
120 }
121
122 /// Set the fused value.
123 ///
124 /// # Examples
125 ///
126 /// ```rust
127 /// use async_fuse::Fuse;
128 /// use std::time::Duration;
129 /// use tokio::time;
130 ///
131 /// # #[tokio::main]
132 /// # async fn main() {
133 /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
134 ///
135 /// assert!(!sleep.is_empty());
136 /// sleep.set(Box::pin(time::sleep(Duration::from_millis(200))));
137 /// assert!(!sleep.is_empty());
138 /// # }
139 /// ```
140 ///
141 /// # Example setting an unsized trait object
142 ///
143 /// ```rust
144 /// use async_fuse::Fuse;
145 /// use std::future::Future;
146 /// use std::pin::Pin;
147 /// use tokio::time;
148 ///
149 /// async fn foo() -> u32 { 1 }
150 /// async fn bar() -> u32 { 2 }
151 ///
152 /// # #[tokio::main]
153 /// # async fn main() {
154 /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::empty();
155 /// assert!(fut.is_empty());
156 ///
157 /// fut.set(Box::pin(foo()));
158 /// assert!(!fut.is_empty());
159 ///
160 /// fut.set(Box::pin(bar()));
161 /// assert!(!fut.is_empty());
162 /// # }
163 /// ```
164 #[inline]
165 pub fn set(&mut self, value: T)
166 where
167 Self: Unpin,
168 {
169 self.value = Some(value);
170 }
171
172 /// Clear the fused value.
173 ///
174 /// # Examples
175 ///
176 /// ```rust
177 /// use async_fuse::Fuse;
178 /// use std::time::Duration;
179 /// use tokio::time;
180 ///
181 /// # #[tokio::main]
182 /// # async fn main() {
183 /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
184 ///
185 /// assert!(!sleep.is_empty());
186 /// sleep.clear();
187 /// assert!(sleep.is_empty());
188 /// # }
189 /// ```
190 #[inline]
191 pub fn clear(&mut self)
192 where
193 Self: Unpin,
194 {
195 self.value = None;
196 }
197
198 /// Construct an empty fuse.
199 ///
200 /// # Examples
201 ///
202 /// ```rust
203 /// use std::pin::pin;
204 /// use async_fuse::Fuse;
205 /// use tokio::time;
206 ///
207 /// # #[tokio::main]
208 /// # async fn main() {
209 /// let mut sleep = pin!(Fuse::<time::Sleep>::empty());
210 ///
211 /// assert!(sleep.is_empty());
212 /// # }
213 /// ```
214 #[must_use]
215 #[inline]
216 pub fn empty() -> Self {
217 Fuse::default()
218 }
219
220 /// Test if the polled for value is empty.
221 ///
222 /// # Examples
223 ///
224 /// ```rust
225 /// use std::pin::pin;
226 /// use async_fuse::Fuse;
227 /// use std::time::Duration;
228 /// use tokio::time;
229 ///
230 /// # #[tokio::main]
231 /// # async fn main() {
232 /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
233 ///
234 /// assert!(!sleep.is_empty());
235 /// sleep.set(Fuse::empty());
236 /// assert!(sleep.is_empty());
237 /// # }
238 /// ```
239 #[inline]
240 pub fn is_empty(&self) -> bool {
241 self.value.is_none()
242 }
243
244 /// Access the interior value as a reference.
245 ///
246 /// # Examples
247 ///
248 /// ```rust
249 /// use std::pin::pin;
250 /// use std::time::Duration;
251 /// use async_fuse::Fuse;
252 /// use tokio::time;
253 ///
254 /// # #[tokio::main]
255 /// # async fn main() {
256 /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
257 ///
258 /// assert!(sleep.as_inner_ref().is_some());
259 /// sleep.set(Fuse::empty());
260 /// assert!(sleep.as_inner_ref().is_none());
261 /// # }
262 /// ```
263 #[inline]
264 pub fn as_inner_ref(&self) -> Option<&T> {
265 self.value.as_ref()
266 }
267
268 /// Poll the current value with the given polling implementation.
269 ///
270 /// This can be used for types which only provides a polling function.
271 ///
272 /// This will never empty the underlying value.
273 ///
274 /// # Examples
275 ///
276 /// ```rust
277 /// use std::pin::pin;
278 /// use std::future::Future;
279 /// use async_fuse::Fuse;
280 /// use tokio::sync::mpsc;
281 ///
282 /// async fn op(n: u32) -> u32 {
283 /// n
284 /// }
285 ///
286 /// # #[tokio::main]
287 /// # async fn main() {
288 /// let mut op1 = pin!(Fuse::new(op(1)));
289 ///
290 /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 1);
291 /// assert!(!op1.is_empty());
292 ///
293 /// op1.set(Fuse::new(op(2)));
294 /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 2);
295 /// assert!(!op1.is_empty());
296 /// # }
297 /// ```
298 #[inline]
299 pub async fn poll_inner<P, O>(self: Pin<&mut Self>, poll: P) -> O
300 where
301 P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
302 {
303 PollInner::new(ProjectFuse(self), poll).await
304 }
305
306 /// Poll the current value with the given polling implementation.
307 ///
308 /// This can be used for types which only provides a polling function.
309 ///
310 /// Once the underlying poll impl returns `Poll::Ready`, the underlying
311 /// value will be emptied.
312 ///
313 /// # Examples
314 ///
315 /// ```rust
316 /// use std::pin::pin;
317 /// use std::future::Future;
318 /// use async_fuse::Fuse;
319 /// use tokio::sync::mpsc;
320 ///
321 /// async fn op(n: u32) -> u32 {
322 /// n
323 /// }
324 ///
325 /// # #[tokio::main]
326 /// # async fn main() {
327 /// let mut op1 = pin!(Fuse::new(op(1)));
328 ///
329 /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 1);
330 /// assert!(op1.is_empty());
331 ///
332 /// op1.set(Fuse::new(op(2)));
333 /// assert!(!op1.is_empty());
334 /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 2);
335 /// assert!(op1.is_empty());
336 /// # }
337 /// ```
338 #[inline]
339 pub async fn poll_future<P, O>(self: Pin<&mut Self>, poll: P) -> O
340 where
341 P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
342 {
343 PollFuture::new(ProjectFuse(self), poll).await
344 }
345
346 /// Poll the current value with the given polling implementation.
347 ///
348 /// This can be used for types which only provides a polling function, or
349 /// types which can be polled multiple streams. Like streams which do not
350 /// provide a Stream implementation.
351 ///
352 /// Will empty the fused value once the underlying poll returns
353 /// `Poll::Ready(None)`.
354 ///
355 /// # Examples
356 ///
357 /// ```rust
358 /// # extern crate futures_core03 as futures_core;
359 /// use std::pin::pin;
360 /// use std::future::Future;
361 /// use async_fuse::Fuse;
362 /// use futures_core::Stream;
363 /// use tokio::sync::mpsc;
364 ///
365 /// fn op(n: u32) -> impl Stream<Item = u32> {
366 /// async_stream::stream! {
367 /// yield n;
368 /// yield n + 1;
369 /// }
370 /// }
371 ///
372 /// # #[tokio::main]
373 /// # async fn main() {
374 /// let mut op1 = pin!(Fuse::new(op(1)));
375 ///
376 /// assert!(!op1.is_empty());
377 /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(1));
378 /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(2));
379 /// assert!(!op1.is_empty());
380 /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, None);
381 /// assert!(op1.is_empty());
382 /// # }
383 /// ```
384 #[inline]
385 pub async fn poll_stream<P, O>(self: Pin<&mut Self>, poll: P) -> Option<O>
386 where
387 P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<Option<O>>,
388 {
389 poll::PollStream::new(ProjectFuse(self), poll).await
390 }
391
392 /// Access the interior mutable value. This is only available if it
393 /// implements [Unpin].
394 ///
395 /// # Examples
396 ///
397 /// ```rust
398 /// use async_fuse::Fuse;
399 ///
400 /// # fn main() {
401 /// let mut rx = Fuse::new(Box::pin(async { 42 }));
402 ///
403 /// assert!(rx.as_inner_mut().is_some());
404 /// # }
405 #[inline]
406 pub fn as_inner_mut(&mut self) -> Option<&mut T>
407 where
408 Self: Unpin,
409 {
410 self.value.as_mut()
411 }
412
413 /// Helper conversion to a pinned value.
414 ///
415 /// # Examples
416 ///
417 /// ```rust
418 /// use async_fuse::Fuse;
419 /// use tokio::sync::mpsc;
420 ///
421 /// # #[tokio::main]
422 /// # async fn main() {
423 /// let (tx, rx) = mpsc::unbounded_channel::<u32>();
424 /// let mut rx = Fuse::new(rx);
425 ///
426 /// tx.send(42);
427 ///
428 /// // Manually poll the sleep.
429 /// assert_eq!(rx.as_pin_mut().poll_stream(|mut i, cx| i.poll_recv(cx)).await, Some(42));
430 ///
431 /// rx = Fuse::empty();
432 /// assert!(rx.is_empty());
433 /// # }
434 /// ```
435 #[inline]
436 pub fn as_pin_mut(&mut self) -> Pin<&mut Self>
437 where
438 Self: Unpin,
439 {
440 Pin::new(self)
441 }
442
443 /// Poll the next value in the stream where the underlying value is unpin.
444 ///
445 /// Behaves the same as [`poll_stream`], except that it only works for
446 /// values which are [Unpin].
447 ///
448 /// # Examples
449 ///
450 /// ```rust
451 /// # extern crate futures_core03 as futures_core;
452 /// use std::future::Future;
453 /// use async_fuse::Fuse;
454 /// use futures_core::Stream;
455 /// use tokio::sync::mpsc;
456 ///
457 /// fn op(n: u32) -> impl Stream<Item = u32> {
458 /// async_stream::stream! {
459 /// yield n;
460 /// yield n + 1;
461 /// }
462 /// }
463 ///
464 /// # #[tokio::main]
465 /// # async fn main() {
466 /// let mut stream = Fuse::new(Box::pin(op(1)));
467 /// assert!(!stream.is_empty());
468 ///
469 /// assert_eq!(stream.next().await, Some(1));
470 /// assert_eq!(stream.next().await, Some(2));
471 /// assert_eq!(stream.next().await, None);
472 ///
473 /// assert!(stream.is_empty());
474 /// # }
475 /// ```
476 #[cfg(feature = "stream03")]
477 #[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
478 pub async fn next(&mut self) -> Option<T::Item>
479 where
480 Self: Unpin,
481 T: Stream03,
482 {
483 self.as_pin_mut().poll_stream(Stream03::poll_next).await
484 }
485
486 #[inline]
487 fn project(self: Pin<&mut Self>) -> Pin<&mut Option<T>> {
488 // Safety: We're projecting into the owned pinned value field, which we
489 // otherwise do not move before it's dropped.
490 unsafe { Pin::map_unchecked_mut(self, |this| &mut this.value) }
491 }
492}
493
494impl<T> Future for Fuse<T>
495where
496 T: Future,
497{
498 type Output = T::Output;
499
500 #[inline]
501 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502 Pin::new(&mut PollFuture::new(ProjectFuse(self), Future::poll)).poll(cx)
503 }
504}
505
506#[cfg(feature = "stream03")]
507#[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
508impl<T> Stream03 for Fuse<T>
509where
510 T: Stream03,
511{
512 type Item = T::Item;
513
514 #[inline]
515 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
516 Pin::new(&mut poll::PollStream::new(
517 ProjectFuse(self),
518 Stream03::poll_next,
519 ))
520 .poll(cx)
521 }
522}
523
524impl<T> From<Option<T>> for Fuse<T> {
525 #[inline]
526 fn from(value: Option<T>) -> Self {
527 Self { value }
528 }
529}
530
531#[cfg(feature = "alloc")]
532#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
533impl<T> From<Box<T>> for Fuse<Pin<Box<T>>> {
534 #[inline]
535 fn from(value: Box<T>) -> Self {
536 Self {
537 value: Some(value.into()),
538 }
539 }
540}
541
542#[cfg(feature = "alloc")]
543#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
544impl<T> From<Option<Box<T>>> for Fuse<Pin<Box<T>>> {
545 #[inline]
546 fn from(value: Option<Box<T>>) -> Self {
547 Self {
548 value: value.map(Into::into),
549 }
550 }
551}
552
553impl<T> Default for Fuse<T> {
554 #[inline]
555 fn default() -> Self {
556 Self { value: None }
557 }
558}
559
560struct ProjectFuse<'a, T>(Pin<&'a mut Fuse<T>>);
561
562impl<T> Project for ProjectFuse<'_, T> {
563 type Value = T;
564
565 #[inline]
566 fn clear(&mut self) {
567 self.0.as_mut().project().set(None);
568 }
569
570 #[inline]
571 fn project(&mut self) -> Poll<Pin<&mut Self::Value>> {
572 match self.0.as_mut().project().as_pin_mut() {
573 Some(value) => Poll::Ready(value),
574 None => Poll::Pending,
575 }
576 }
577}