n0_future/maybe_future.rs
1//! Implements the [`MaybeFuture`] utility.
2
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use pin_project::pin_project;
10
11/// A future which may not be present.
12///
13/// This is a single type which may optionally contain a future. If there is no inner
14/// future polling will always return [`Poll::Pending`].
15///
16/// When the inner future is set, then [`MaybeFuture`] is polled and the inner future
17/// completes, then the poll returns the value of the inner future and [`MaybeFuture`]'s
18/// state is set to None.
19///
20/// The [`Default`] impl will create a [`MaybeFuture`] without an inner.
21///
22/// # Example
23///
24/// One major use case for this is ergonomically disabling branches in a `tokio::select!`.
25///
26/// ```
27/// use std::time::Duration;
28///
29/// use n0_future::{task, time, MaybeFuture};
30///
31/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
32/// # async fn main() {
33/// let start = time::Instant::now();
34///
35/// let (send, mut recv) = tokio::sync::mpsc::channel(10);
36/// task::spawn(async move {
37/// // Send for the first time after 2s
38/// time::sleep(Duration::from_millis(2000)).await;
39/// let _ = send.send(()).await;
40/// println!("{:?}: Sent", start.elapsed());
41/// // Send after only 100ms
42/// time::sleep(Duration::from_millis(100)).await;
43/// let _ = send.send(()).await;
44/// println!("{:?}: Sent", start.elapsed());
45/// // Send again after only 100ms
46/// time::sleep(Duration::from_millis(100)).await;
47/// let _ = send.send(()).await;
48/// println!("{:?}: Sent", start.elapsed());
49/// // Finally send "too late" after 1100ms:
50/// time::sleep(Duration::from_millis(1100)).await;
51/// let _ = send.send(()).await;
52/// println!("{:?}: Sent", start.elapsed());
53/// });
54///
55/// let mut timeout_fut = std::pin::pin!(MaybeFuture::default());
56/// loop {
57/// tokio::select! {
58/// // If a timeout hasn't been set yet (a first msg hasn't been received)
59/// // then this won't trigger.
60/// _ = &mut timeout_fut => {
61/// println!("{:?}: Timed out!", start.elapsed());
62/// return;
63/// }
64/// _ = recv.recv() => {
65/// // Set (or reset) the timeout
66/// timeout_fut.as_mut().set_future(time::sleep(Duration::from_millis(1000)));
67/// println!("{:?}: Received!", start.elapsed());
68/// }
69/// }
70/// }
71/// # }
72/// ```
73///
74/// This example prints:
75/// ```plain
76/// 2s: Sent
77/// 2s: Received!
78/// 2.1s: Sent
79/// 2.1s: Received!
80/// 2.2s: Sent
81/// 2.2s: Received!
82/// 3.2s: Timed out!
83/// ```
84///
85/// The last send times out, but it doesn't time out before the first send.
86#[derive(Default, Debug)]
87#[pin_project(project = MaybeFutureProj, project_replace = MaybeFutureProjReplace)]
88pub enum MaybeFuture<T> {
89 /// The state in which it wraps a future to be polled.
90 Some(#[pin] T),
91 /// The state in which there's no future set, and polling will always return [`Poll::Pending`]
92 #[default]
93 None,
94}
95
96impl<T> MaybeFuture<T> {
97 /// Sets the future to None again.
98 pub fn set_none(mut self: Pin<&mut Self>) {
99 self.as_mut().project_replace(Self::None);
100 }
101
102 /// Sets a new future.
103 pub fn set_future(mut self: Pin<&mut Self>, fut: T) {
104 self.as_mut().project_replace(Self::Some(fut));
105 }
106
107 /// Returns `true` if the inner is empty.
108 pub fn is_none(&self) -> bool {
109 matches!(self, Self::None)
110 }
111
112 /// Returns `true` if the inner contains a future.
113 pub fn is_some(&self) -> bool {
114 matches!(self, Self::Some(_))
115 }
116}
117
118impl<T: Future> Future for MaybeFuture<T> {
119 type Output = T::Output;
120
121 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
122 let mut this = self.as_mut().project();
123 let poll_res = match this {
124 MaybeFutureProj::Some(ref mut t) => t.as_mut().poll(cx),
125 MaybeFutureProj::None => Poll::Pending,
126 };
127 match poll_res {
128 Poll::Ready(val) => {
129 self.as_mut().project_replace(Self::None);
130 Poll::Ready(val)
131 }
132 Poll::Pending => Poll::Pending,
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use std::pin::pin;
140
141 use super::*;
142 use crate::time::Duration;
143
144 #[tokio::test(start_paused = true)]
145 async fn test_maybefuture_poll_after_use() {
146 let fut = async move { "hello" };
147 let mut maybe_fut = pin!(MaybeFuture::Some(fut));
148 let res = (&mut maybe_fut).await;
149
150 assert_eq!(res, "hello");
151
152 // Now poll again
153 let res = tokio::time::timeout(Duration::from_millis(10), maybe_fut).await;
154 assert!(res.is_err());
155 }
156
157 #[tokio::test(start_paused = true)]
158 async fn test_maybefuture_mut_ref() {
159 let mut fut = Box::pin(async move { "hello" });
160 let mut maybe_fut = pin!(MaybeFuture::Some(&mut fut));
161 let res = (&mut maybe_fut).await;
162
163 assert_eq!(res, "hello");
164
165 // Now poll again
166 let res = tokio::time::timeout(Duration::from_millis(10), maybe_fut).await;
167 assert!(res.is_err());
168 }
169
170 #[tokio::test(start_paused = true)]
171 async fn example() {
172 use std::time::Duration;
173
174 use crate::{task, time};
175
176 let start = time::Instant::now();
177
178 let (send, mut recv) = tokio::sync::mpsc::channel(10);
179 task::spawn(async move {
180 // Send for the first time after 2s
181 time::sleep(Duration::from_millis(2000)).await;
182 let _ = send.send(()).await;
183 println!("{:?}: Sent", start.elapsed());
184 // Send after only 100ms
185 time::sleep(Duration::from_millis(100)).await;
186 let _ = send.send(()).await;
187 println!("{:?}: Sent", start.elapsed());
188 // Send again after only 100ms
189 time::sleep(Duration::from_millis(100)).await;
190 let _ = send.send(()).await;
191 println!("{:?}: Sent", start.elapsed());
192 // Finally send "too late" after 1100ms:
193 time::sleep(Duration::from_millis(1100)).await;
194 let _ = send.send(()).await;
195 println!("{:?}: Sent", start.elapsed());
196 });
197
198 let mut timeout_fut = std::pin::pin!(MaybeFuture::default());
199 loop {
200 tokio::select! {
201 // If a timeout hasn't been set yet (a first msg hasn't been received)
202 // then this won't trigger.
203 _ = &mut timeout_fut => {
204 println!("{:?}: Timed out!", start.elapsed());
205 return;
206 }
207 _ = recv.recv() => {
208 // Set (or reset) the timeout
209 timeout_fut.as_mut().set_future(time::sleep(Duration::from_millis(1000)));
210 println!("{:?}: Received!", start.elapsed());
211 }
212 }
213 }
214 }
215}