async_fuse/
fuse.rs

1//! Extension trait to simplify optionally polling futures.
2
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7#[cfg(feature = "alloc")]
8use alloc::boxed::Box;
9
10#[cfg(feature = "stream03")]
11use futures_core03::Stream as Stream03;
12
13use crate::poll::{self, PollFuture, PollInner, Project};
14
15/// A fusing adapter around a value.
16///
17/// A `Fuse<T>` is similar to `Option<T>`, with the exception that it provides
18/// and API which is more suitable for interacting with asynchronous tasks and
19/// pinned values.
20///
21/// For most polling operations (except [`Fuse::poll_inner`]), if the value
22/// completes, the adapter will switch to an [empty state][Fuse::empty] and
23/// return [`Poll::Pending`]. It can later be updated again with
24/// [set][Fuse::set].
25///
26/// See [`Fuse::new`] for more details.
27pub struct Fuse<T> {
28    value: Option<T>,
29}
30
31#[cfg(feature = "alloc")]
32#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
33impl<T> Fuse<Pin<Box<T>>> {
34    /// Construct a fusing adapter around a value that is already pinned.
35    ///
36    /// # Examples
37    ///
38    /// ```rust
39    /// use async_fuse::Fuse;
40    /// use std::future::Future;
41    /// use tokio::time;
42    ///
43    /// async fn foo() -> u32 { 1 }
44    ///
45    /// # #[tokio::main]
46    /// # async fn main() {
47    /// let mut fut = Fuse::pin(foo());
48    /// assert!(!fut.is_empty());
49    ///
50    /// let value = (&mut fut).await;
51    /// assert!(fut.is_empty());
52    /// # }
53    /// ```
54    #[inline]
55    pub fn pin(value: T) -> Self {
56        Self {
57            value: Some(Box::pin(value)),
58        }
59    }
60}
61
62impl<T> Fuse<T> {
63    /// Construct a fusing adapter around a value.
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// use std::pin::pin;
69    /// use async_fuse::Fuse;
70    /// use std::time::Duration;
71    /// use tokio::time;
72    ///
73    /// # #[tokio::main]
74    /// # async fn main() {
75    /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
76    ///
77    /// tokio::select! {
78    ///     _ = &mut sleep => {
79    ///         assert!(sleep.is_empty());
80    ///         sleep.set(Fuse::new(time::sleep(Duration::from_millis(200))));
81    ///     }
82    /// }
83    ///
84    /// assert!(!sleep.is_empty());
85    /// # }
86    /// ```
87    ///
88    /// # Example using an unsized trait object
89    ///
90    /// ```rust
91    /// use async_fuse::Fuse;
92    /// use std::future::Future;
93    /// use std::pin::Pin;
94    /// use tokio::time;
95    ///
96    /// async fn foo() -> u32 { 1 }
97    /// async fn bar() -> u32 { 2 }
98    ///
99    /// # #[tokio::main]
100    /// # async fn main() {
101    /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::new(Box::pin(foo()));
102    /// let mut total = 0;
103    ///
104    /// while !fut.is_empty() {
105    ///     let value = (&mut fut).await;
106    ///
107    ///     if value == 1 {
108    ///         fut.set(Box::pin(bar()));
109    ///     }
110    ///
111    ///     total += value;
112    /// }
113    ///
114    /// assert_eq!(total, 3);
115    /// # }
116    /// ```
117    #[inline]
118    pub fn new(value: T) -> Self {
119        Self { value: Some(value) }
120    }
121
122    /// Set the fused value.
123    ///
124    /// # Examples
125    ///
126    /// ```rust
127    /// use async_fuse::Fuse;
128    /// use std::time::Duration;
129    /// use tokio::time;
130    ///
131    /// # #[tokio::main]
132    /// # async fn main() {
133    /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
134    ///
135    /// assert!(!sleep.is_empty());
136    /// sleep.set(Box::pin(time::sleep(Duration::from_millis(200))));
137    /// assert!(!sleep.is_empty());
138    /// # }
139    /// ```
140    ///
141    /// # Example setting an unsized trait object
142    ///
143    /// ```rust
144    /// use async_fuse::Fuse;
145    /// use std::future::Future;
146    /// use std::pin::Pin;
147    /// use tokio::time;
148    ///
149    /// async fn foo() -> u32 { 1 }
150    /// async fn bar() -> u32 { 2 }
151    ///
152    /// # #[tokio::main]
153    /// # async fn main() {
154    /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::empty();
155    /// assert!(fut.is_empty());
156    ///
157    /// fut.set(Box::pin(foo()));
158    /// assert!(!fut.is_empty());
159    ///
160    /// fut.set(Box::pin(bar()));
161    /// assert!(!fut.is_empty());
162    /// # }
163    /// ```
164    #[inline]
165    pub fn set(&mut self, value: T)
166    where
167        Self: Unpin,
168    {
169        self.value = Some(value);
170    }
171
172    /// Clear the fused value.
173    ///
174    /// # Examples
175    ///
176    /// ```rust
177    /// use async_fuse::Fuse;
178    /// use std::time::Duration;
179    /// use tokio::time;
180    ///
181    /// # #[tokio::main]
182    /// # async fn main() {
183    /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
184    ///
185    /// assert!(!sleep.is_empty());
186    /// sleep.clear();
187    /// assert!(sleep.is_empty());
188    /// # }
189    /// ```
190    #[inline]
191    pub fn clear(&mut self)
192    where
193        Self: Unpin,
194    {
195        self.value = None;
196    }
197
198    /// Construct an empty fuse.
199    ///
200    /// # Examples
201    ///
202    /// ```rust
203    /// use std::pin::pin;
204    /// use async_fuse::Fuse;
205    /// use tokio::time;
206    ///
207    /// # #[tokio::main]
208    /// # async fn main() {
209    /// let mut sleep = pin!(Fuse::<time::Sleep>::empty());
210    ///
211    /// assert!(sleep.is_empty());
212    /// # }
213    /// ```
214    #[must_use]
215    #[inline]
216    pub fn empty() -> Self {
217        Fuse::default()
218    }
219
220    /// Test if the polled for value is empty.
221    ///
222    /// # Examples
223    ///
224    /// ```rust
225    /// use std::pin::pin;
226    /// use async_fuse::Fuse;
227    /// use std::time::Duration;
228    /// use tokio::time;
229    ///
230    /// # #[tokio::main]
231    /// # async fn main() {
232    /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
233    ///
234    /// assert!(!sleep.is_empty());
235    /// sleep.set(Fuse::empty());
236    /// assert!(sleep.is_empty());
237    /// # }
238    /// ```
239    #[inline]
240    pub fn is_empty(&self) -> bool {
241        self.value.is_none()
242    }
243
244    /// Access the interior value as a reference.
245    ///
246    /// # Examples
247    ///
248    /// ```rust
249    /// use std::pin::pin;
250    /// use std::time::Duration;
251    /// use async_fuse::Fuse;
252    /// use tokio::time;
253    ///
254    /// # #[tokio::main]
255    /// # async fn main() {
256    /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
257    ///
258    /// assert!(sleep.as_inner_ref().is_some());
259    /// sleep.set(Fuse::empty());
260    /// assert!(sleep.as_inner_ref().is_none());
261    /// # }
262    /// ```
263    #[inline]
264    pub fn as_inner_ref(&self) -> Option<&T> {
265        self.value.as_ref()
266    }
267
268    /// Poll the current value with the given polling implementation.
269    ///
270    /// This can be used for types which only provides a polling function.
271    ///
272    /// This will never empty the underlying value.
273    ///
274    /// # Examples
275    ///
276    /// ```rust
277    /// use std::pin::pin;
278    /// use std::future::Future;
279    /// use async_fuse::Fuse;
280    /// use tokio::sync::mpsc;
281    ///
282    /// async fn op(n: u32) -> u32 {
283    ///     n
284    /// }
285    ///
286    /// # #[tokio::main]
287    /// # async fn main() {
288    /// let mut op1 = pin!(Fuse::new(op(1)));
289    ///
290    /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 1);
291    /// assert!(!op1.is_empty());
292    ///
293    /// op1.set(Fuse::new(op(2)));
294    /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 2);
295    /// assert!(!op1.is_empty());
296    /// # }
297    /// ```
298    #[inline]
299    pub async fn poll_inner<P, O>(self: Pin<&mut Self>, poll: P) -> O
300    where
301        P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
302    {
303        PollInner::new(ProjectFuse(self), poll).await
304    }
305
306    /// Poll the current value with the given polling implementation.
307    ///
308    /// This can be used for types which only provides a polling function.
309    ///
310    /// Once the underlying poll impl returns `Poll::Ready`, the underlying
311    /// value will be emptied.
312    ///
313    /// # Examples
314    ///
315    /// ```rust
316    /// use std::pin::pin;
317    /// use std::future::Future;
318    /// use async_fuse::Fuse;
319    /// use tokio::sync::mpsc;
320    ///
321    /// async fn op(n: u32) -> u32 {
322    ///     n
323    /// }
324    ///
325    /// # #[tokio::main]
326    /// # async fn main() {
327    /// let mut op1 = pin!(Fuse::new(op(1)));
328    ///
329    /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 1);
330    /// assert!(op1.is_empty());
331    ///
332    /// op1.set(Fuse::new(op(2)));
333    /// assert!(!op1.is_empty());
334    /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 2);
335    /// assert!(op1.is_empty());
336    /// # }
337    /// ```
338    #[inline]
339    pub async fn poll_future<P, O>(self: Pin<&mut Self>, poll: P) -> O
340    where
341        P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
342    {
343        PollFuture::new(ProjectFuse(self), poll).await
344    }
345
346    /// Poll the current value with the given polling implementation.
347    ///
348    /// This can be used for types which only provides a polling function, or
349    /// types which can be polled multiple streams. Like streams which do not
350    /// provide a Stream implementation.
351    ///
352    /// Will empty the fused value once the underlying poll returns
353    /// `Poll::Ready(None)`.
354    ///
355    /// # Examples
356    ///
357    /// ```rust
358    /// use std::pin::pin;
359    /// use std::future::Future;
360    /// use async_fuse::{Fuse, Stream};
361    /// use tokio::sync::mpsc;
362    ///
363    /// fn op(n: u32) -> impl Stream<Item = u32> {
364    ///     async_stream::stream! {
365    ///         yield n;
366    ///         yield n + 1;
367    ///     }
368    /// }
369    ///
370    /// # #[tokio::main]
371    /// # async fn main() {
372    /// let mut op1 = pin!(Fuse::new(op(1)));
373    ///
374    /// assert!(!op1.is_empty());
375    /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(1));
376    /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(2));
377    /// assert!(!op1.is_empty());
378    /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, None);
379    /// assert!(op1.is_empty());
380    /// # }
381    /// ```
382    #[inline]
383    pub async fn poll_stream<P, O>(self: Pin<&mut Self>, poll: P) -> Option<O>
384    where
385        P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<Option<O>>,
386    {
387        poll::PollStream::new(ProjectFuse(self), poll).await
388    }
389
390    /// Access the interior mutable value. This is only available if it
391    /// implements [Unpin].
392    ///
393    /// # Examples
394    ///
395    /// ```rust
396    /// use async_fuse::Fuse;
397    ///
398    /// # fn main() {
399    /// let mut rx = Fuse::new(Box::pin(async { 42 }));
400    ///
401    /// assert!(rx.as_inner_mut().is_some());
402    /// # }
403    #[inline]
404    pub fn as_inner_mut(&mut self) -> Option<&mut T>
405    where
406        Self: Unpin,
407    {
408        self.value.as_mut()
409    }
410
411    /// Helper conversion to a pinned value.
412    ///
413    /// # Examples
414    ///
415    /// ```rust
416    /// use async_fuse::Fuse;
417    /// use tokio::sync::mpsc;
418    ///
419    /// # #[tokio::main]
420    /// # async fn main() {
421    /// let (tx, rx) = mpsc::unbounded_channel::<u32>();
422    /// let mut rx = Fuse::new(rx);
423    ///
424    /// tx.send(42);
425    ///
426    /// // Manually poll the sleep.
427    /// assert_eq!(rx.as_pin_mut().poll_stream(|mut i, cx| i.poll_recv(cx)).await, Some(42));
428    ///
429    /// rx = Fuse::empty();
430    /// assert!(rx.is_empty());
431    /// # }
432    /// ```
433    #[inline]
434    pub fn as_pin_mut(&mut self) -> Pin<&mut Self>
435    where
436        Self: Unpin,
437    {
438        Pin::new(self)
439    }
440
441    /// Poll the next value in the stream where the underlying value is unpin.
442    ///
443    /// Behaves the same as [`poll_stream`], except that it only works for
444    /// values which are [Unpin].
445    ///
446    /// # Examples
447    ///
448    /// ```rust
449    /// use async_fuse::{Fuse, Stream};
450    /// use std::future::Future;
451    /// use tokio::sync::mpsc;
452    ///
453    /// fn op(n: u32) -> impl Stream<Item = u32> {
454    ///     async_stream::stream! {
455    ///         yield n;
456    ///         yield n + 1;
457    ///     }
458    /// }
459    ///
460    /// # #[tokio::main]
461    /// # async fn main() {
462    /// let mut stream = Fuse::new(Box::pin(op(1)));
463    /// assert!(!stream.is_empty());
464    ///
465    /// assert_eq!(stream.next().await, Some(1));
466    /// assert_eq!(stream.next().await, Some(2));
467    /// assert_eq!(stream.next().await, None);
468    ///
469    /// assert!(stream.is_empty());
470    /// # }
471    /// ```
472    #[cfg(feature = "stream03")]
473    #[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
474    pub async fn next(&mut self) -> Option<T::Item>
475    where
476        Self: Unpin,
477        T: Stream03,
478    {
479        self.as_pin_mut().poll_stream(Stream03::poll_next).await
480    }
481
482    #[inline]
483    fn project(self: Pin<&mut Self>) -> Pin<&mut Option<T>> {
484        // Safety: We're projecting into the owned pinned value field, which we
485        // otherwise do not move before it's dropped.
486        unsafe { Pin::map_unchecked_mut(self, |this| &mut this.value) }
487    }
488}
489
490impl<T> Future for Fuse<T>
491where
492    T: Future,
493{
494    type Output = T::Output;
495
496    #[inline]
497    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
498        Pin::new(&mut PollFuture::new(ProjectFuse(self), Future::poll)).poll(cx)
499    }
500}
501
502#[cfg(feature = "stream03")]
503#[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
504impl<T> Stream03 for Fuse<T>
505where
506    T: Stream03,
507{
508    type Item = T::Item;
509
510    #[inline]
511    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
512        Pin::new(&mut poll::PollStream::new(
513            ProjectFuse(self),
514            Stream03::poll_next,
515        ))
516        .poll(cx)
517    }
518}
519
520impl<T> From<Option<T>> for Fuse<T> {
521    #[inline]
522    fn from(value: Option<T>) -> Self {
523        Self { value }
524    }
525}
526
527#[cfg(feature = "alloc")]
528#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
529impl<T> From<Box<T>> for Fuse<Pin<Box<T>>> {
530    #[inline]
531    fn from(value: Box<T>) -> Self {
532        Self {
533            value: Some(value.into()),
534        }
535    }
536}
537
538#[cfg(feature = "alloc")]
539#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
540impl<T> From<Option<Box<T>>> for Fuse<Pin<Box<T>>> {
541    #[inline]
542    fn from(value: Option<Box<T>>) -> Self {
543        Self {
544            value: value.map(Into::into),
545        }
546    }
547}
548
549impl<T> Default for Fuse<T> {
550    #[inline]
551    fn default() -> Self {
552        Self { value: None }
553    }
554}
555
556struct ProjectFuse<'a, T>(Pin<&'a mut Fuse<T>>);
557
558impl<T> Project for ProjectFuse<'_, T> {
559    type Value = T;
560
561    #[inline]
562    fn clear(&mut self) {
563        self.0.as_mut().project().set(None);
564    }
565
566    #[inline]
567    fn project(&mut self) -> Poll<Pin<&mut Self::Value>> {
568        match self.0.as_mut().project().as_pin_mut() {
569            Some(value) => Poll::Ready(value),
570            None => Poll::Pending,
571        }
572    }
573}