1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! An extension trait for Futures that provides a variety of convenient adapters.
mod with_cancellation_token;
use with_cancellation_token::{WithCancellationTokenFuture, WithCancellationTokenFutureOwned};
use std::future::Future;
use crate::sync::CancellationToken;
/// A trait which contains a variety of convenient adapters and utilities for `Future`s.
pub trait FutureExt: Future {
cfg_time! {
/// A wrapper around [`tokio::time::timeout`], with the advantage that it is easier to write
/// fluent call chains.
///
/// # Examples
///
/// ```rust
/// use tokio::{sync::oneshot, time::Duration};
/// use tokio_util::future::FutureExt;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
///
/// let res = rx.timeout(Duration::from_millis(10)).await;
/// assert!(res.is_err());
/// # }
/// ```
#[track_caller]
fn timeout(self, timeout: std::time::Duration) -> tokio::time::Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout(timeout, self)
}
/// A wrapper around [`tokio::time::timeout_at`], with the advantage that it is easier to write
/// fluent call chains.
///
/// # Examples
///
/// ```rust
/// use tokio::{sync::oneshot, time::{Duration, Instant}};
/// use tokio_util::future::FutureExt;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
/// let deadline = Instant::now() + Duration::from_millis(10);
///
/// let res = rx.timeout_at(deadline).await;
/// assert!(res.is_err());
/// # }
/// ```
fn timeout_at(self, deadline: tokio::time::Instant) -> tokio::time::Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout_at(deadline, self)
}
}
/// Similar to [`CancellationToken::run_until_cancelled`],
/// but with the advantage that it is easier to write fluent call chains.
///
/// # Fairness
///
/// Calling this on an already-cancelled token directly returns `None`.
/// For all subsequent polls, in case of concurrent completion and
/// cancellation, this is biased towards the `self` future completion.
///
/// # Examples
///
/// ```rust
/// use tokio::sync::oneshot;
/// use tokio_util::future::FutureExt;
/// use tokio_util::sync::CancellationToken;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
/// let token = CancellationToken::new();
/// let token_clone = token.clone();
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
/// assert!(rx.with_cancellation_token(&token_clone).await.is_none())
/// # }
/// ```
fn with_cancellation_token(
self,
cancellation_token: &CancellationToken,
) -> WithCancellationTokenFuture<'_, Self>
where
Self: Sized,
{
WithCancellationTokenFuture::new(cancellation_token, self)
}
/// Similar to [`CancellationToken::run_until_cancelled_owned`],
/// but with the advantage that it is easier to write fluent call chains.
///
/// # Fairness
///
/// Calling this on an already-cancelled token directly returns `None`.
/// For all subsequent polls, in case of concurrent completion and
/// cancellation, this is biased towards the `self` future completion.
///
/// # Examples
///
/// ```rust
/// use tokio::sync::oneshot;
/// use tokio_util::future::FutureExt;
/// use tokio_util::sync::CancellationToken;
///
/// # async fn dox() {
/// let (_tx, rx) = oneshot::channel::<()>();
/// let token = CancellationToken::new();
/// let token_clone = token.clone();
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
/// assert!(rx.with_cancellation_token_owned(token_clone).await.is_none())
/// # }
/// ```
fn with_cancellation_token_owned(
self,
cancellation_token: CancellationToken,
) -> WithCancellationTokenFutureOwned<Self>
where
Self: Sized,
{
WithCancellationTokenFutureOwned::new(cancellation_token, self)
}
}
impl<T: Future + ?Sized> FutureExt for T {}