async_fuse/
fuse.rs

1//! Extension trait to simplify optionally polling futures.
2
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7#[cfg(feature = "alloc")]
8use alloc::boxed::Box;
9
10#[cfg(feature = "stream03")]
11use futures_core03::Stream as Stream03;
12
13use crate::poll::{self, PollFuture, PollInner, Project};
14
15/// A fusing adapter around a value.
16///
17/// A `Fuse<T>` is similar to `Option<T>`, with the exception that it provides
18/// and API which is more suitable for interacting with asynchronous tasks and
19/// pinned values.
20///
21/// For most polling operations (except [`Fuse::poll_inner`]), if the value
22/// completes, the adapter will switch to an [empty state][Fuse::empty] and
23/// return [`Poll::Pending`]. It can later be updated again with
24/// [set][Fuse::set].
25///
26/// See [`Fuse::new`] for more details.
27pub struct Fuse<T> {
28    value: Option<T>,
29}
30
31#[cfg(feature = "alloc")]
32#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
33impl<T> Fuse<Pin<Box<T>>> {
34    /// Construct a fusing adapter around a value that is already pinned.
35    ///
36    /// # Examples
37    ///
38    /// ```rust
39    /// use async_fuse::Fuse;
40    /// use std::future::Future;
41    /// use tokio::time;
42    ///
43    /// async fn foo() -> u32 { 1 }
44    ///
45    /// # #[tokio::main]
46    /// # async fn main() {
47    /// let mut fut = Fuse::pin(foo());
48    /// assert!(!fut.is_empty());
49    ///
50    /// let value = (&mut fut).await;
51    /// assert!(fut.is_empty());
52    /// # }
53    /// ```
54    #[inline]
55    pub fn pin(value: T) -> Self {
56        Self {
57            value: Some(Box::pin(value)),
58        }
59    }
60}
61
62impl<T> Fuse<T> {
63    /// Construct a fusing adapter around a value.
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// use std::pin::pin;
69    /// use async_fuse::Fuse;
70    /// use std::time::Duration;
71    /// use tokio::time;
72    ///
73    /// # #[tokio::main]
74    /// # async fn main() {
75    /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
76    ///
77    /// tokio::select! {
78    ///     _ = &mut sleep => {
79    ///         assert!(sleep.is_empty());
80    ///         sleep.set(Fuse::new(time::sleep(Duration::from_millis(200))));
81    ///     }
82    /// }
83    ///
84    /// assert!(!sleep.is_empty());
85    /// # }
86    /// ```
87    ///
88    /// # Example using an unsized trait object
89    ///
90    /// ```rust
91    /// use async_fuse::Fuse;
92    /// use std::future::Future;
93    /// use std::pin::Pin;
94    /// use tokio::time;
95    ///
96    /// async fn foo() -> u32 { 1 }
97    /// async fn bar() -> u32 { 2 }
98    ///
99    /// # #[tokio::main]
100    /// # async fn main() {
101    /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::new(Box::pin(foo()));
102    /// let mut total = 0;
103    ///
104    /// while !fut.is_empty() {
105    ///     let value = (&mut fut).await;
106    ///
107    ///     if value == 1 {
108    ///         fut.set(Box::pin(bar()));
109    ///     }
110    ///
111    ///     total += value;
112    /// }
113    ///
114    /// assert_eq!(total, 3);
115    /// # }
116    /// ```
117    #[inline]
118    pub fn new(value: T) -> Self {
119        Self { value: Some(value) }
120    }
121
122    /// Set the fused value.
123    ///
124    /// # Examples
125    ///
126    /// ```rust
127    /// use async_fuse::Fuse;
128    /// use std::time::Duration;
129    /// use tokio::time;
130    ///
131    /// # #[tokio::main]
132    /// # async fn main() {
133    /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
134    ///
135    /// assert!(!sleep.is_empty());
136    /// sleep.set(Box::pin(time::sleep(Duration::from_millis(200))));
137    /// assert!(!sleep.is_empty());
138    /// # }
139    /// ```
140    ///
141    /// # Example setting an unsized trait object
142    ///
143    /// ```rust
144    /// use async_fuse::Fuse;
145    /// use std::future::Future;
146    /// use std::pin::Pin;
147    /// use tokio::time;
148    ///
149    /// async fn foo() -> u32 { 1 }
150    /// async fn bar() -> u32 { 2 }
151    ///
152    /// # #[tokio::main]
153    /// # async fn main() {
154    /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::empty();
155    /// assert!(fut.is_empty());
156    ///
157    /// fut.set(Box::pin(foo()));
158    /// assert!(!fut.is_empty());
159    ///
160    /// fut.set(Box::pin(bar()));
161    /// assert!(!fut.is_empty());
162    /// # }
163    /// ```
164    #[inline]
165    pub fn set(&mut self, value: T)
166    where
167        Self: Unpin,
168    {
169        self.value = Some(value);
170    }
171
172    /// Clear the fused value.
173    ///
174    /// # Examples
175    ///
176    /// ```rust
177    /// use async_fuse::Fuse;
178    /// use std::time::Duration;
179    /// use tokio::time;
180    ///
181    /// # #[tokio::main]
182    /// # async fn main() {
183    /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
184    ///
185    /// assert!(!sleep.is_empty());
186    /// sleep.clear();
187    /// assert!(sleep.is_empty());
188    /// # }
189    /// ```
190    #[inline]
191    pub fn clear(&mut self)
192    where
193        Self: Unpin,
194    {
195        self.value = None;
196    }
197
198    /// Construct an empty fuse.
199    ///
200    /// # Examples
201    ///
202    /// ```rust
203    /// use std::pin::pin;
204    /// use async_fuse::Fuse;
205    /// use tokio::time;
206    ///
207    /// # #[tokio::main]
208    /// # async fn main() {
209    /// let mut sleep = pin!(Fuse::<time::Sleep>::empty());
210    ///
211    /// assert!(sleep.is_empty());
212    /// # }
213    /// ```
214    #[must_use]
215    #[inline]
216    pub fn empty() -> Self {
217        Fuse::default()
218    }
219
220    /// Test if the polled for value is empty.
221    ///
222    /// # Examples
223    ///
224    /// ```rust
225    /// use std::pin::pin;
226    /// use async_fuse::Fuse;
227    /// use std::time::Duration;
228    /// use tokio::time;
229    ///
230    /// # #[tokio::main]
231    /// # async fn main() {
232    /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
233    ///
234    /// assert!(!sleep.is_empty());
235    /// sleep.set(Fuse::empty());
236    /// assert!(sleep.is_empty());
237    /// # }
238    /// ```
239    #[inline]
240    pub fn is_empty(&self) -> bool {
241        self.value.is_none()
242    }
243
244    /// Access the interior value as a reference.
245    ///
246    /// # Examples
247    ///
248    /// ```rust
249    /// use std::pin::pin;
250    /// use std::time::Duration;
251    /// use async_fuse::Fuse;
252    /// use tokio::time;
253    ///
254    /// # #[tokio::main]
255    /// # async fn main() {
256    /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
257    ///
258    /// assert!(sleep.as_inner_ref().is_some());
259    /// sleep.set(Fuse::empty());
260    /// assert!(sleep.as_inner_ref().is_none());
261    /// # }
262    /// ```
263    #[inline]
264    pub fn as_inner_ref(&self) -> Option<&T> {
265        self.value.as_ref()
266    }
267
268    /// Poll the current value with the given polling implementation.
269    ///
270    /// This can be used for types which only provides a polling function.
271    ///
272    /// This will never empty the underlying value.
273    ///
274    /// # Examples
275    ///
276    /// ```rust
277    /// use std::pin::pin;
278    /// use std::future::Future;
279    /// use async_fuse::Fuse;
280    /// use tokio::sync::mpsc;
281    ///
282    /// async fn op(n: u32) -> u32 {
283    ///     n
284    /// }
285    ///
286    /// # #[tokio::main]
287    /// # async fn main() {
288    /// let mut op1 = pin!(Fuse::new(op(1)));
289    ///
290    /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 1);
291    /// assert!(!op1.is_empty());
292    ///
293    /// op1.set(Fuse::new(op(2)));
294    /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 2);
295    /// assert!(!op1.is_empty());
296    /// # }
297    /// ```
298    #[inline]
299    pub async fn poll_inner<P, O>(self: Pin<&mut Self>, poll: P) -> O
300    where
301        P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
302    {
303        PollInner::new(ProjectFuse(self), poll).await
304    }
305
306    /// Poll the current value with the given polling implementation.
307    ///
308    /// This can be used for types which only provides a polling function.
309    ///
310    /// Once the underlying poll impl returns `Poll::Ready`, the underlying
311    /// value will be emptied.
312    ///
313    /// # Examples
314    ///
315    /// ```rust
316    /// use std::pin::pin;
317    /// use std::future::Future;
318    /// use async_fuse::Fuse;
319    /// use tokio::sync::mpsc;
320    ///
321    /// async fn op(n: u32) -> u32 {
322    ///     n
323    /// }
324    ///
325    /// # #[tokio::main]
326    /// # async fn main() {
327    /// let mut op1 = pin!(Fuse::new(op(1)));
328    ///
329    /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 1);
330    /// assert!(op1.is_empty());
331    ///
332    /// op1.set(Fuse::new(op(2)));
333    /// assert!(!op1.is_empty());
334    /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 2);
335    /// assert!(op1.is_empty());
336    /// # }
337    /// ```
338    #[inline]
339    pub async fn poll_future<P, O>(self: Pin<&mut Self>, poll: P) -> O
340    where
341        P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
342    {
343        PollFuture::new(ProjectFuse(self), poll).await
344    }
345
346    /// Poll the current value with the given polling implementation.
347    ///
348    /// This can be used for types which only provides a polling function, or
349    /// types which can be polled multiple streams. Like streams which do not
350    /// provide a Stream implementation.
351    ///
352    /// Will empty the fused value once the underlying poll returns
353    /// `Poll::Ready(None)`.
354    ///
355    /// # Examples
356    ///
357    /// ```rust
358    /// # extern crate futures_core03 as futures_core;
359    /// use std::pin::pin;
360    /// use std::future::Future;
361    /// use async_fuse::Fuse;
362    /// use futures_core::Stream;
363    /// use tokio::sync::mpsc;
364    ///
365    /// fn op(n: u32) -> impl Stream<Item = u32> {
366    ///     async_stream::stream! {
367    ///         yield n;
368    ///         yield n + 1;
369    ///     }
370    /// }
371    ///
372    /// # #[tokio::main]
373    /// # async fn main() {
374    /// let mut op1 = pin!(Fuse::new(op(1)));
375    ///
376    /// assert!(!op1.is_empty());
377    /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(1));
378    /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(2));
379    /// assert!(!op1.is_empty());
380    /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, None);
381    /// assert!(op1.is_empty());
382    /// # }
383    /// ```
384    #[inline]
385    pub async fn poll_stream<P, O>(self: Pin<&mut Self>, poll: P) -> Option<O>
386    where
387        P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<Option<O>>,
388    {
389        poll::PollStream::new(ProjectFuse(self), poll).await
390    }
391
392    /// Access the interior mutable value. This is only available if it
393    /// implements [Unpin].
394    ///
395    /// # Examples
396    ///
397    /// ```rust
398    /// use async_fuse::Fuse;
399    ///
400    /// # fn main() {
401    /// let mut rx = Fuse::new(Box::pin(async { 42 }));
402    ///
403    /// assert!(rx.as_inner_mut().is_some());
404    /// # }
405    #[inline]
406    pub fn as_inner_mut(&mut self) -> Option<&mut T>
407    where
408        Self: Unpin,
409    {
410        self.value.as_mut()
411    }
412
413    /// Helper conversion to a pinned value.
414    ///
415    /// # Examples
416    ///
417    /// ```rust
418    /// use async_fuse::Fuse;
419    /// use tokio::sync::mpsc;
420    ///
421    /// # #[tokio::main]
422    /// # async fn main() {
423    /// let (tx, rx) = mpsc::unbounded_channel::<u32>();
424    /// let mut rx = Fuse::new(rx);
425    ///
426    /// tx.send(42);
427    ///
428    /// // Manually poll the sleep.
429    /// assert_eq!(rx.as_pin_mut().poll_stream(|mut i, cx| i.poll_recv(cx)).await, Some(42));
430    ///
431    /// rx = Fuse::empty();
432    /// assert!(rx.is_empty());
433    /// # }
434    /// ```
435    #[inline]
436    pub fn as_pin_mut(&mut self) -> Pin<&mut Self>
437    where
438        Self: Unpin,
439    {
440        Pin::new(self)
441    }
442
443    /// Poll the next value in the stream where the underlying value is unpin.
444    ///
445    /// Behaves the same as [`poll_stream`], except that it only works for
446    /// values which are [Unpin].
447    ///
448    /// # Examples
449    ///
450    /// ```rust
451    /// # extern crate futures_core03 as futures_core;
452    /// use std::future::Future;
453    /// use async_fuse::Fuse;
454    /// use futures_core::Stream;
455    /// use tokio::sync::mpsc;
456    ///
457    /// fn op(n: u32) -> impl Stream<Item = u32> {
458    ///     async_stream::stream! {
459    ///         yield n;
460    ///         yield n + 1;
461    ///     }
462    /// }
463    ///
464    /// # #[tokio::main]
465    /// # async fn main() {
466    /// let mut stream = Fuse::new(Box::pin(op(1)));
467    /// assert!(!stream.is_empty());
468    ///
469    /// assert_eq!(stream.next().await, Some(1));
470    /// assert_eq!(stream.next().await, Some(2));
471    /// assert_eq!(stream.next().await, None);
472    ///
473    /// assert!(stream.is_empty());
474    /// # }
475    /// ```
476    #[cfg(feature = "stream03")]
477    #[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
478    pub async fn next(&mut self) -> Option<T::Item>
479    where
480        Self: Unpin,
481        T: Stream03,
482    {
483        self.as_pin_mut().poll_stream(Stream03::poll_next).await
484    }
485
486    #[inline]
487    fn project(self: Pin<&mut Self>) -> Pin<&mut Option<T>> {
488        // Safety: We're projecting into the owned pinned value field, which we
489        // otherwise do not move before it's dropped.
490        unsafe { Pin::map_unchecked_mut(self, |this| &mut this.value) }
491    }
492}
493
494impl<T> Future for Fuse<T>
495where
496    T: Future,
497{
498    type Output = T::Output;
499
500    #[inline]
501    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502        Pin::new(&mut PollFuture::new(ProjectFuse(self), Future::poll)).poll(cx)
503    }
504}
505
506#[cfg(feature = "stream03")]
507#[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
508impl<T> Stream03 for Fuse<T>
509where
510    T: Stream03,
511{
512    type Item = T::Item;
513
514    #[inline]
515    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
516        Pin::new(&mut poll::PollStream::new(
517            ProjectFuse(self),
518            Stream03::poll_next,
519        ))
520        .poll(cx)
521    }
522}
523
524impl<T> From<Option<T>> for Fuse<T> {
525    #[inline]
526    fn from(value: Option<T>) -> Self {
527        Self { value }
528    }
529}
530
531#[cfg(feature = "alloc")]
532#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
533impl<T> From<Box<T>> for Fuse<Pin<Box<T>>> {
534    #[inline]
535    fn from(value: Box<T>) -> Self {
536        Self {
537            value: Some(value.into()),
538        }
539    }
540}
541
542#[cfg(feature = "alloc")]
543#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
544impl<T> From<Option<Box<T>>> for Fuse<Pin<Box<T>>> {
545    #[inline]
546    fn from(value: Option<Box<T>>) -> Self {
547        Self {
548            value: value.map(Into::into),
549        }
550    }
551}
552
553impl<T> Default for Fuse<T> {
554    #[inline]
555    fn default() -> Self {
556        Self { value: None }
557    }
558}
559
560struct ProjectFuse<'a, T>(Pin<&'a mut Fuse<T>>);
561
562impl<T> Project for ProjectFuse<'_, T> {
563    type Value = T;
564
565    #[inline]
566    fn clear(&mut self) {
567        self.0.as_mut().project().set(None);
568    }
569
570    #[inline]
571    fn project(&mut self) -> Poll<Pin<&mut Self::Value>> {
572        match self.0.as_mut().project().as_pin_mut() {
573            Some(value) => Poll::Ready(value),
574            None => Poll::Pending,
575        }
576    }
577}