async_fuse/fuse.rs
1//! Extension trait to simplify optionally polling futures.
2
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7#[cfg(feature = "alloc")]
8use alloc::boxed::Box;
9
10#[cfg(feature = "stream03")]
11use futures_core03::Stream as Stream03;
12
13use crate::poll::{self, PollFuture, PollInner, Project};
14
15/// A fusing adapter around a value.
16///
17/// A `Fuse<T>` is similar to `Option<T>`, with the exception that it provides
18/// and API which is more suitable for interacting with asynchronous tasks and
19/// pinned values.
20///
21/// For most polling operations (except [`Fuse::poll_inner`]), if the value
22/// completes, the adapter will switch to an [empty state][Fuse::empty] and
23/// return [`Poll::Pending`]. It can later be updated again with
24/// [set][Fuse::set].
25///
26/// See [`Fuse::new`] for more details.
27pub struct Fuse<T> {
28 value: Option<T>,
29}
30
31#[cfg(feature = "alloc")]
32#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
33impl<T> Fuse<Pin<Box<T>>> {
34 /// Construct a fusing adapter around a value that is already pinned.
35 ///
36 /// # Examples
37 ///
38 /// ```rust
39 /// use async_fuse::Fuse;
40 /// use std::future::Future;
41 /// use tokio::time;
42 ///
43 /// async fn foo() -> u32 { 1 }
44 ///
45 /// # #[tokio::main]
46 /// # async fn main() {
47 /// let mut fut = Fuse::pin(foo());
48 /// assert!(!fut.is_empty());
49 ///
50 /// let value = (&mut fut).await;
51 /// assert!(fut.is_empty());
52 /// # }
53 /// ```
54 #[inline]
55 pub fn pin(value: T) -> Self {
56 Self {
57 value: Some(Box::pin(value)),
58 }
59 }
60}
61
62impl<T> Fuse<T> {
63 /// Construct a fusing adapter around a value.
64 ///
65 /// # Examples
66 ///
67 /// ```rust
68 /// use std::pin::pin;
69 /// use async_fuse::Fuse;
70 /// use std::time::Duration;
71 /// use tokio::time;
72 ///
73 /// # #[tokio::main]
74 /// # async fn main() {
75 /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
76 ///
77 /// tokio::select! {
78 /// _ = &mut sleep => {
79 /// assert!(sleep.is_empty());
80 /// sleep.set(Fuse::new(time::sleep(Duration::from_millis(200))));
81 /// }
82 /// }
83 ///
84 /// assert!(!sleep.is_empty());
85 /// # }
86 /// ```
87 ///
88 /// # Example using an unsized trait object
89 ///
90 /// ```rust
91 /// use async_fuse::Fuse;
92 /// use std::future::Future;
93 /// use std::pin::Pin;
94 /// use tokio::time;
95 ///
96 /// async fn foo() -> u32 { 1 }
97 /// async fn bar() -> u32 { 2 }
98 ///
99 /// # #[tokio::main]
100 /// # async fn main() {
101 /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::new(Box::pin(foo()));
102 /// let mut total = 0;
103 ///
104 /// while !fut.is_empty() {
105 /// let value = (&mut fut).await;
106 ///
107 /// if value == 1 {
108 /// fut.set(Box::pin(bar()));
109 /// }
110 ///
111 /// total += value;
112 /// }
113 ///
114 /// assert_eq!(total, 3);
115 /// # }
116 /// ```
117 #[inline]
118 pub fn new(value: T) -> Self {
119 Self { value: Some(value) }
120 }
121
122 /// Set the fused value.
123 ///
124 /// # Examples
125 ///
126 /// ```rust
127 /// use async_fuse::Fuse;
128 /// use std::time::Duration;
129 /// use tokio::time;
130 ///
131 /// # #[tokio::main]
132 /// # async fn main() {
133 /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
134 ///
135 /// assert!(!sleep.is_empty());
136 /// sleep.set(Box::pin(time::sleep(Duration::from_millis(200))));
137 /// assert!(!sleep.is_empty());
138 /// # }
139 /// ```
140 ///
141 /// # Example setting an unsized trait object
142 ///
143 /// ```rust
144 /// use async_fuse::Fuse;
145 /// use std::future::Future;
146 /// use std::pin::Pin;
147 /// use tokio::time;
148 ///
149 /// async fn foo() -> u32 { 1 }
150 /// async fn bar() -> u32 { 2 }
151 ///
152 /// # #[tokio::main]
153 /// # async fn main() {
154 /// let mut fut = Fuse::<Pin<Box<dyn Future<Output = u32>>>>::empty();
155 /// assert!(fut.is_empty());
156 ///
157 /// fut.set(Box::pin(foo()));
158 /// assert!(!fut.is_empty());
159 ///
160 /// fut.set(Box::pin(bar()));
161 /// assert!(!fut.is_empty());
162 /// # }
163 /// ```
164 #[inline]
165 pub fn set(&mut self, value: T)
166 where
167 Self: Unpin,
168 {
169 self.value = Some(value);
170 }
171
172 /// Clear the fused value.
173 ///
174 /// # Examples
175 ///
176 /// ```rust
177 /// use async_fuse::Fuse;
178 /// use std::time::Duration;
179 /// use tokio::time;
180 ///
181 /// # #[tokio::main]
182 /// # async fn main() {
183 /// let mut sleep = Fuse::new(Box::pin(time::sleep(Duration::from_millis(200))));
184 ///
185 /// assert!(!sleep.is_empty());
186 /// sleep.clear();
187 /// assert!(sleep.is_empty());
188 /// # }
189 /// ```
190 #[inline]
191 pub fn clear(&mut self)
192 where
193 Self: Unpin,
194 {
195 self.value = None;
196 }
197
198 /// Construct an empty fuse.
199 ///
200 /// # Examples
201 ///
202 /// ```rust
203 /// use std::pin::pin;
204 /// use async_fuse::Fuse;
205 /// use tokio::time;
206 ///
207 /// # #[tokio::main]
208 /// # async fn main() {
209 /// let mut sleep = pin!(Fuse::<time::Sleep>::empty());
210 ///
211 /// assert!(sleep.is_empty());
212 /// # }
213 /// ```
214 #[must_use]
215 #[inline]
216 pub fn empty() -> Self {
217 Fuse::default()
218 }
219
220 /// Test if the polled for value is empty.
221 ///
222 /// # Examples
223 ///
224 /// ```rust
225 /// use std::pin::pin;
226 /// use async_fuse::Fuse;
227 /// use std::time::Duration;
228 /// use tokio::time;
229 ///
230 /// # #[tokio::main]
231 /// # async fn main() {
232 /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
233 ///
234 /// assert!(!sleep.is_empty());
235 /// sleep.set(Fuse::empty());
236 /// assert!(sleep.is_empty());
237 /// # }
238 /// ```
239 #[inline]
240 pub fn is_empty(&self) -> bool {
241 self.value.is_none()
242 }
243
244 /// Access the interior value as a reference.
245 ///
246 /// # Examples
247 ///
248 /// ```rust
249 /// use std::pin::pin;
250 /// use std::time::Duration;
251 /// use async_fuse::Fuse;
252 /// use tokio::time;
253 ///
254 /// # #[tokio::main]
255 /// # async fn main() {
256 /// let mut sleep = pin!(Fuse::new(time::sleep(Duration::from_millis(200))));
257 ///
258 /// assert!(sleep.as_inner_ref().is_some());
259 /// sleep.set(Fuse::empty());
260 /// assert!(sleep.as_inner_ref().is_none());
261 /// # }
262 /// ```
263 #[inline]
264 pub fn as_inner_ref(&self) -> Option<&T> {
265 self.value.as_ref()
266 }
267
268 /// Poll the current value with the given polling implementation.
269 ///
270 /// This can be used for types which only provides a polling function.
271 ///
272 /// This will never empty the underlying value.
273 ///
274 /// # Examples
275 ///
276 /// ```rust
277 /// use std::pin::pin;
278 /// use std::future::Future;
279 /// use async_fuse::Fuse;
280 /// use tokio::sync::mpsc;
281 ///
282 /// async fn op(n: u32) -> u32 {
283 /// n
284 /// }
285 ///
286 /// # #[tokio::main]
287 /// # async fn main() {
288 /// let mut op1 = pin!(Fuse::new(op(1)));
289 ///
290 /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 1);
291 /// assert!(!op1.is_empty());
292 ///
293 /// op1.set(Fuse::new(op(2)));
294 /// assert_eq!(op1.as_mut().poll_inner(|mut i, cx| i.poll(cx)).await, 2);
295 /// assert!(!op1.is_empty());
296 /// # }
297 /// ```
298 #[inline]
299 pub async fn poll_inner<P, O>(self: Pin<&mut Self>, poll: P) -> O
300 where
301 P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
302 {
303 PollInner::new(ProjectFuse(self), poll).await
304 }
305
306 /// Poll the current value with the given polling implementation.
307 ///
308 /// This can be used for types which only provides a polling function.
309 ///
310 /// Once the underlying poll impl returns `Poll::Ready`, the underlying
311 /// value will be emptied.
312 ///
313 /// # Examples
314 ///
315 /// ```rust
316 /// use std::pin::pin;
317 /// use std::future::Future;
318 /// use async_fuse::Fuse;
319 /// use tokio::sync::mpsc;
320 ///
321 /// async fn op(n: u32) -> u32 {
322 /// n
323 /// }
324 ///
325 /// # #[tokio::main]
326 /// # async fn main() {
327 /// let mut op1 = pin!(Fuse::new(op(1)));
328 ///
329 /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 1);
330 /// assert!(op1.is_empty());
331 ///
332 /// op1.set(Fuse::new(op(2)));
333 /// assert!(!op1.is_empty());
334 /// assert_eq!(op1.as_mut().poll_future(|mut i, cx| i.poll(cx)).await, 2);
335 /// assert!(op1.is_empty());
336 /// # }
337 /// ```
338 #[inline]
339 pub async fn poll_future<P, O>(self: Pin<&mut Self>, poll: P) -> O
340 where
341 P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<O>,
342 {
343 PollFuture::new(ProjectFuse(self), poll).await
344 }
345
346 /// Poll the current value with the given polling implementation.
347 ///
348 /// This can be used for types which only provides a polling function, or
349 /// types which can be polled multiple streams. Like streams which do not
350 /// provide a Stream implementation.
351 ///
352 /// Will empty the fused value once the underlying poll returns
353 /// `Poll::Ready(None)`.
354 ///
355 /// # Examples
356 ///
357 /// ```rust
358 /// use std::pin::pin;
359 /// use std::future::Future;
360 /// use async_fuse::{Fuse, Stream};
361 /// use tokio::sync::mpsc;
362 ///
363 /// fn op(n: u32) -> impl Stream<Item = u32> {
364 /// async_stream::stream! {
365 /// yield n;
366 /// yield n + 1;
367 /// }
368 /// }
369 ///
370 /// # #[tokio::main]
371 /// # async fn main() {
372 /// let mut op1 = pin!(Fuse::new(op(1)));
373 ///
374 /// assert!(!op1.is_empty());
375 /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(1));
376 /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, Some(2));
377 /// assert!(!op1.is_empty());
378 /// assert_eq!(op1.as_mut().poll_stream(|mut i, cx| i.poll_next(cx)).await, None);
379 /// assert!(op1.is_empty());
380 /// # }
381 /// ```
382 #[inline]
383 pub async fn poll_stream<P, O>(self: Pin<&mut Self>, poll: P) -> Option<O>
384 where
385 P: FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<Option<O>>,
386 {
387 poll::PollStream::new(ProjectFuse(self), poll).await
388 }
389
390 /// Access the interior mutable value. This is only available if it
391 /// implements [Unpin].
392 ///
393 /// # Examples
394 ///
395 /// ```rust
396 /// use async_fuse::Fuse;
397 ///
398 /// # fn main() {
399 /// let mut rx = Fuse::new(Box::pin(async { 42 }));
400 ///
401 /// assert!(rx.as_inner_mut().is_some());
402 /// # }
403 #[inline]
404 pub fn as_inner_mut(&mut self) -> Option<&mut T>
405 where
406 Self: Unpin,
407 {
408 self.value.as_mut()
409 }
410
411 /// Helper conversion to a pinned value.
412 ///
413 /// # Examples
414 ///
415 /// ```rust
416 /// use async_fuse::Fuse;
417 /// use tokio::sync::mpsc;
418 ///
419 /// # #[tokio::main]
420 /// # async fn main() {
421 /// let (tx, rx) = mpsc::unbounded_channel::<u32>();
422 /// let mut rx = Fuse::new(rx);
423 ///
424 /// tx.send(42);
425 ///
426 /// // Manually poll the sleep.
427 /// assert_eq!(rx.as_pin_mut().poll_stream(|mut i, cx| i.poll_recv(cx)).await, Some(42));
428 ///
429 /// rx = Fuse::empty();
430 /// assert!(rx.is_empty());
431 /// # }
432 /// ```
433 #[inline]
434 pub fn as_pin_mut(&mut self) -> Pin<&mut Self>
435 where
436 Self: Unpin,
437 {
438 Pin::new(self)
439 }
440
441 /// Poll the next value in the stream where the underlying value is unpin.
442 ///
443 /// Behaves the same as [`poll_stream`], except that it only works for
444 /// values which are [Unpin].
445 ///
446 /// # Examples
447 ///
448 /// ```rust
449 /// use async_fuse::{Fuse, Stream};
450 /// use std::future::Future;
451 /// use tokio::sync::mpsc;
452 ///
453 /// fn op(n: u32) -> impl Stream<Item = u32> {
454 /// async_stream::stream! {
455 /// yield n;
456 /// yield n + 1;
457 /// }
458 /// }
459 ///
460 /// # #[tokio::main]
461 /// # async fn main() {
462 /// let mut stream = Fuse::new(Box::pin(op(1)));
463 /// assert!(!stream.is_empty());
464 ///
465 /// assert_eq!(stream.next().await, Some(1));
466 /// assert_eq!(stream.next().await, Some(2));
467 /// assert_eq!(stream.next().await, None);
468 ///
469 /// assert!(stream.is_empty());
470 /// # }
471 /// ```
472 #[cfg(feature = "stream03")]
473 #[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
474 pub async fn next(&mut self) -> Option<T::Item>
475 where
476 Self: Unpin,
477 T: Stream03,
478 {
479 self.as_pin_mut().poll_stream(Stream03::poll_next).await
480 }
481
482 #[inline]
483 fn project(self: Pin<&mut Self>) -> Pin<&mut Option<T>> {
484 // Safety: We're projecting into the owned pinned value field, which we
485 // otherwise do not move before it's dropped.
486 unsafe { Pin::map_unchecked_mut(self, |this| &mut this.value) }
487 }
488}
489
490impl<T> Future for Fuse<T>
491where
492 T: Future,
493{
494 type Output = T::Output;
495
496 #[inline]
497 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
498 Pin::new(&mut PollFuture::new(ProjectFuse(self), Future::poll)).poll(cx)
499 }
500}
501
502#[cfg(feature = "stream03")]
503#[cfg_attr(docsrs, doc(cfg(feature = "stream03")))]
504impl<T> Stream03 for Fuse<T>
505where
506 T: Stream03,
507{
508 type Item = T::Item;
509
510 #[inline]
511 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
512 Pin::new(&mut poll::PollStream::new(
513 ProjectFuse(self),
514 Stream03::poll_next,
515 ))
516 .poll(cx)
517 }
518}
519
520impl<T> From<Option<T>> for Fuse<T> {
521 #[inline]
522 fn from(value: Option<T>) -> Self {
523 Self { value }
524 }
525}
526
527#[cfg(feature = "alloc")]
528#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
529impl<T> From<Box<T>> for Fuse<Pin<Box<T>>> {
530 #[inline]
531 fn from(value: Box<T>) -> Self {
532 Self {
533 value: Some(value.into()),
534 }
535 }
536}
537
538#[cfg(feature = "alloc")]
539#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
540impl<T> From<Option<Box<T>>> for Fuse<Pin<Box<T>>> {
541 #[inline]
542 fn from(value: Option<Box<T>>) -> Self {
543 Self {
544 value: value.map(Into::into),
545 }
546 }
547}
548
549impl<T> Default for Fuse<T> {
550 #[inline]
551 fn default() -> Self {
552 Self { value: None }
553 }
554}
555
556struct ProjectFuse<'a, T>(Pin<&'a mut Fuse<T>>);
557
558impl<T> Project for ProjectFuse<'_, T> {
559 type Value = T;
560
561 #[inline]
562 fn clear(&mut self) {
563 self.0.as_mut().project().set(None);
564 }
565
566 #[inline]
567 fn project(&mut self) -> Poll<Pin<&mut Self::Value>> {
568 match self.0.as_mut().project().as_pin_mut() {
569 Some(value) => Poll::Ready(value),
570 None => Poll::Pending,
571 }
572 }
573}