cs_utils/utils/futures/
with_stop.rs

1use std::pin::Pin;
2
3use anyhow::{Result, anyhow};
4use futures::{future, Future};
5use tokio::sync::oneshot::{self, error::TryRecvError};
6
7use super::wait;
8
9/// Wrap a future into one that can be stopped at any given
10/// moment by invoking a returned `stop` function.
11/// 
12/// Wraps the original result of a future into an `Option` to
13/// indicate if the future completed (`Some(original result)`) or
14/// was stopped explicitelly (`None`).
15///
16/// ### Examples
17/// 
18/// ```
19/// use std::pin::Pin;
20///
21/// use futures::{future, Future};
22/// use cs_utils::futures::{wait, with_stop};
23/// 
24/// #[tokio::main]
25/// async fn main() {
26///     // as future that never completes
27///     let forever_future = async move {
28///         loop {
29///             wait(5).await;
30///         }
31///     };
32///     
33///     // wrap the original future to get the `stop` function
34///     let (stop, forever_future) = with_stop(forever_future);
35///     
36///     // create futures for testing purposes
37///     let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
38///         Box::pin(async move {
39///             // run the forever future
40///             let result = forever_future.await;
41/// 
42///             assert!(
43///                 result.is_none(),
44///                 "Must complete with `None` result since it was stopped prematurely.",
45///             );
46///         }),
47///         Box::pin(async move {
48///             wait(25).await;
49///
50///             // stop the forever future
51///             stop()
52///                 .expect("Cannot stop the future.");
53///             
54///             // wait for some more time so the assertion in
55///             // the boxed future above has a chance to run
56///             wait(25).await;
57///         }),
58///     ];
59///     
60///     // race the futures to completion
61///     future::select_all(futures).await;
62/// }
63/// ```
64pub fn with_stop<
65    T: Send + 'static,
66    TFuture: Future<Output = T> + Send + 'static,
67>(
68    original_future: TFuture,
69) -> (Box<dyn FnOnce() -> Result<()> + Send>, Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>>) {
70    let (test_end_sender, mut test_end_receiver) = oneshot::channel::<()>();
71    let futures: Vec<Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>>> = vec![
72        Box::pin(async move {
73            while let Err(TryRecvError::Empty) = test_end_receiver.try_recv() {
74                // common thread time slice is `~100ms`, so
75                // `5ms` delay should be granular enough here 
76                wait(5).await;
77            }
78
79            return None;
80        }),
81        
82        Box::pin(async move {
83            return Some(
84                original_future.await,
85            );
86        }),
87    ];
88
89    let stop: Box<dyn FnOnce() -> Result<()> + Send> = Box::new(
90        move || {
91            match test_end_sender.send(()) {
92                Ok(_) => Ok(()),
93                Err(_) => Err(anyhow!("Cannot stop the future, might be already stopped.")),
94            }
95        },
96    );
97
98    let result_future: Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>> = Box::pin(async move {
99        let (result, _, _) = future::select_all(futures).await;
100
101        result
102    });
103
104    return (stop, result_future);
105}
106
107#[cfg(test)]
108mod tests {
109    use std::pin::Pin;
110    use futures::{future, Future};
111
112    use cs_utils::{
113        random_number,
114        futures::{wait, with_stop},
115    };
116    
117    #[tokio::test]
118    async fn can_stop_a_future() {
119        // as future that never completes
120        let forever_future = async move {
121            loop {
122                wait(5).await;
123            }
124        };
125        
126        // wrap the original future to get the `stop` function
127        let (stop, forever_future) = with_stop(forever_future);
128        
129        // create futures for testing purposes
130        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
131            Box::pin(async move {
132                // run the forever future
133                let result = forever_future.await;
134    
135                assert!(
136                    result.is_none(),
137                    "Stopped future must complete with `None` result.",
138                );
139            }),
140            Box::pin(async move {
141                wait(25).await;
142
143                // stop the forever future
144                stop()
145                    .expect("Cannot stop the future.");
146                
147                // wait for some more time so the assertion in
148                // the boxed future above has a chance to run
149                wait(25).await;
150            }),
151        ];
152        
153        // race the futures to completion
154        future::select_all(futures).await;
155    }
156
157    #[tokio::test]
158    async fn completed_future_returns_some_result() {
159        let completing_future_result = random_number(0..=i128::MAX);
160    
161        // as future that never completes
162        let completing_future = async move {
163            wait(5).await;
164
165            return completing_future_result;
166        };
167        
168        // wrap the original future to get the `stop` function
169        let (_stop, completing_future) = with_stop(completing_future);
170
171        let result = completing_future.await
172            .expect("Must complete with `Some(original result)`.");
173
174        assert_eq!(
175            result,
176            completing_future_result,
177            "Original and received future results must match.",
178        );
179    }
180}