n0_future/
maybe_future.rs

1//! Implements the [`MaybeFuture`] utility.
2
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use pin_project::pin_project;
10
11/// A future which may not be present.
12///
13/// This is a single type which may optionally contain a future.  If there is no inner
14/// future polling will always return [`Poll::Pending`].
15///
16/// When the inner future is set, then [`MaybeFuture`] is polled and the inner future
17/// completes, then the poll returns the value of the inner future and [`MaybeFuture`]'s
18/// state is set to None.
19///
20/// The [`Default`] impl will create a [`MaybeFuture`] without an inner.
21///
22/// # Example
23///
24/// One major use case for this is ergonomically disabling branches in a `tokio::select!`.
25///
26/// ```
27/// use std::time::Duration;
28///
29/// use n0_future::{task, time, MaybeFuture};
30///
31/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
32/// # async fn main() {
33/// let start = time::Instant::now();
34///
35/// let (send, mut recv) = tokio::sync::mpsc::channel(10);
36/// task::spawn(async move {
37///     // Send for the first time after 2s
38///     time::sleep(Duration::from_millis(2000)).await;
39///     let _ = send.send(()).await;
40///     println!("{:?}: Sent", start.elapsed());
41///     // Send after only 100ms
42///     time::sleep(Duration::from_millis(100)).await;
43///     let _ = send.send(()).await;
44///     println!("{:?}: Sent", start.elapsed());
45///     // Send again after only 100ms
46///     time::sleep(Duration::from_millis(100)).await;
47///     let _ = send.send(()).await;
48///     println!("{:?}: Sent", start.elapsed());
49///     // Finally send "too late" after 1100ms:
50///     time::sleep(Duration::from_millis(1100)).await;
51///     let _ = send.send(()).await;
52///     println!("{:?}: Sent", start.elapsed());
53/// });
54///
55/// let mut timeout_fut = std::pin::pin!(MaybeFuture::default());
56/// loop {
57///     tokio::select! {
58///         // If a timeout hasn't been set yet (a first msg hasn't been received)
59///         // then this won't trigger.
60///         _ = &mut timeout_fut => {
61///             println!("{:?}: Timed out!", start.elapsed());
62///             return;
63///         }
64///         _ = recv.recv() => {
65///             // Set (or reset) the timeout
66///             timeout_fut.as_mut().set_future(time::sleep(Duration::from_millis(1000)));
67///             println!("{:?}: Received!", start.elapsed());
68///         }
69///     }
70/// }
71/// # }
72/// ```
73///
74/// This example prints:
75/// ```plain
76/// 2s: Sent
77/// 2s: Received!
78/// 2.1s: Sent
79/// 2.1s: Received!
80/// 2.2s: Sent
81/// 2.2s: Received!
82/// 3.2s: Timed out!
83/// ```
84///
85/// The last send times out, but it doesn't time out before the first send.
86#[derive(Default, Debug)]
87#[pin_project(project = MaybeFutureProj, project_replace = MaybeFutureProjReplace)]
88pub enum MaybeFuture<T> {
89    /// The state in which it wraps a future to be polled.
90    Some(#[pin] T),
91    /// The state in which there's no future set, and polling will always return [`Poll::Pending`]
92    #[default]
93    None,
94}
95
96impl<T> MaybeFuture<T> {
97    /// Sets the future to None again.
98    pub fn set_none(mut self: Pin<&mut Self>) {
99        self.as_mut().project_replace(Self::None);
100    }
101
102    /// Sets a new future.
103    pub fn set_future(mut self: Pin<&mut Self>, fut: T) {
104        self.as_mut().project_replace(Self::Some(fut));
105    }
106
107    /// Returns `true` if the inner is empty.
108    pub fn is_none(&self) -> bool {
109        matches!(self, Self::None)
110    }
111
112    /// Returns `true` if the inner contains a future.
113    pub fn is_some(&self) -> bool {
114        matches!(self, Self::Some(_))
115    }
116}
117
118impl<T: Future> Future for MaybeFuture<T> {
119    type Output = T::Output;
120
121    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
122        let mut this = self.as_mut().project();
123        let poll_res = match this {
124            MaybeFutureProj::Some(ref mut t) => t.as_mut().poll(cx),
125            MaybeFutureProj::None => Poll::Pending,
126        };
127        match poll_res {
128            Poll::Ready(val) => {
129                self.as_mut().project_replace(Self::None);
130                Poll::Ready(val)
131            }
132            Poll::Pending => Poll::Pending,
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use std::pin::pin;
140
141    use super::*;
142    use crate::time::Duration;
143
144    #[tokio::test(start_paused = true)]
145    async fn test_maybefuture_poll_after_use() {
146        let fut = async move { "hello" };
147        let mut maybe_fut = pin!(MaybeFuture::Some(fut));
148        let res = (&mut maybe_fut).await;
149
150        assert_eq!(res, "hello");
151
152        // Now poll again
153        let res = tokio::time::timeout(Duration::from_millis(10), maybe_fut).await;
154        assert!(res.is_err());
155    }
156
157    #[tokio::test(start_paused = true)]
158    async fn test_maybefuture_mut_ref() {
159        let mut fut = Box::pin(async move { "hello" });
160        let mut maybe_fut = pin!(MaybeFuture::Some(&mut fut));
161        let res = (&mut maybe_fut).await;
162
163        assert_eq!(res, "hello");
164
165        // Now poll again
166        let res = tokio::time::timeout(Duration::from_millis(10), maybe_fut).await;
167        assert!(res.is_err());
168    }
169
170    #[tokio::test(start_paused = true)]
171    async fn example() {
172        use std::time::Duration;
173
174        use crate::{task, time};
175
176        let start = time::Instant::now();
177
178        let (send, mut recv) = tokio::sync::mpsc::channel(10);
179        task::spawn(async move {
180            // Send for the first time after 2s
181            time::sleep(Duration::from_millis(2000)).await;
182            let _ = send.send(()).await;
183            println!("{:?}: Sent", start.elapsed());
184            // Send after only 100ms
185            time::sleep(Duration::from_millis(100)).await;
186            let _ = send.send(()).await;
187            println!("{:?}: Sent", start.elapsed());
188            // Send again after only 100ms
189            time::sleep(Duration::from_millis(100)).await;
190            let _ = send.send(()).await;
191            println!("{:?}: Sent", start.elapsed());
192            // Finally send "too late" after 1100ms:
193            time::sleep(Duration::from_millis(1100)).await;
194            let _ = send.send(()).await;
195            println!("{:?}: Sent", start.elapsed());
196        });
197
198        let mut timeout_fut = std::pin::pin!(MaybeFuture::default());
199        loop {
200            tokio::select! {
201                // If a timeout hasn't been set yet (a first msg hasn't been received)
202                // then this won't trigger.
203                _ = &mut timeout_fut => {
204                    println!("{:?}: Timed out!", start.elapsed());
205                    return;
206                }
207                _ = recv.recv() => {
208                    // Set (or reset) the timeout
209                    timeout_fut.as_mut().set_future(time::sleep(Duration::from_millis(1000)));
210                    println!("{:?}: Received!", start.elapsed());
211                }
212            }
213        }
214    }
215}