cs_utils/utils/futures/with_stop.rs
1use std::pin::Pin;
2
3use anyhow::{Result, anyhow};
4use futures::{future, Future};
5use tokio::sync::oneshot::{self, error::TryRecvError};
6
7use super::wait;
8
9/// Wrap a future into one that can be stopped at any given
10/// moment by invoking a returned `stop` function.
11///
12/// Wraps the original result of a future into an `Option` to
13/// indicate if the future completed (`Some(original result)`) or
14/// was stopped explicitelly (`None`).
15///
16/// ### Examples
17///
18/// ```
19/// use std::pin::Pin;
20///
21/// use futures::{future, Future};
22/// use cs_utils::futures::{wait, with_stop};
23///
24/// #[tokio::main]
25/// async fn main() {
26/// // as future that never completes
27/// let forever_future = async move {
28/// loop {
29/// wait(5).await;
30/// }
31/// };
32///
33/// // wrap the original future to get the `stop` function
34/// let (stop, forever_future) = with_stop(forever_future);
35///
36/// // create futures for testing purposes
37/// let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
38/// Box::pin(async move {
39/// // run the forever future
40/// let result = forever_future.await;
41///
42/// assert!(
43/// result.is_none(),
44/// "Must complete with `None` result since it was stopped prematurely.",
45/// );
46/// }),
47/// Box::pin(async move {
48/// wait(25).await;
49///
50/// // stop the forever future
51/// stop()
52/// .expect("Cannot stop the future.");
53///
54/// // wait for some more time so the assertion in
55/// // the boxed future above has a chance to run
56/// wait(25).await;
57/// }),
58/// ];
59///
60/// // race the futures to completion
61/// future::select_all(futures).await;
62/// }
63/// ```
64pub fn with_stop<
65 T: Send + 'static,
66 TFuture: Future<Output = T> + Send + 'static,
67>(
68 original_future: TFuture,
69) -> (Box<dyn FnOnce() -> Result<()> + Send>, Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>>) {
70 let (test_end_sender, mut test_end_receiver) = oneshot::channel::<()>();
71 let futures: Vec<Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>>> = vec![
72 Box::pin(async move {
73 while let Err(TryRecvError::Empty) = test_end_receiver.try_recv() {
74 // common thread time slice is `~100ms`, so
75 // `5ms` delay should be granular enough here
76 wait(5).await;
77 }
78
79 return None;
80 }),
81
82 Box::pin(async move {
83 return Some(
84 original_future.await,
85 );
86 }),
87 ];
88
89 let stop: Box<dyn FnOnce() -> Result<()> + Send> = Box::new(
90 move || {
91 match test_end_sender.send(()) {
92 Ok(_) => Ok(()),
93 Err(_) => Err(anyhow!("Cannot stop the future, might be already stopped.")),
94 }
95 },
96 );
97
98 let result_future: Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>> = Box::pin(async move {
99 let (result, _, _) = future::select_all(futures).await;
100
101 result
102 });
103
104 return (stop, result_future);
105}
106
107#[cfg(test)]
108mod tests {
109 use std::pin::Pin;
110 use futures::{future, Future};
111
112 use cs_utils::{
113 random_number,
114 futures::{wait, with_stop},
115 };
116
117 #[tokio::test]
118 async fn can_stop_a_future() {
119 // as future that never completes
120 let forever_future = async move {
121 loop {
122 wait(5).await;
123 }
124 };
125
126 // wrap the original future to get the `stop` function
127 let (stop, forever_future) = with_stop(forever_future);
128
129 // create futures for testing purposes
130 let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
131 Box::pin(async move {
132 // run the forever future
133 let result = forever_future.await;
134
135 assert!(
136 result.is_none(),
137 "Stopped future must complete with `None` result.",
138 );
139 }),
140 Box::pin(async move {
141 wait(25).await;
142
143 // stop the forever future
144 stop()
145 .expect("Cannot stop the future.");
146
147 // wait for some more time so the assertion in
148 // the boxed future above has a chance to run
149 wait(25).await;
150 }),
151 ];
152
153 // race the futures to completion
154 future::select_all(futures).await;
155 }
156
157 #[tokio::test]
158 async fn completed_future_returns_some_result() {
159 let completing_future_result = random_number(0..=i128::MAX);
160
161 // as future that never completes
162 let completing_future = async move {
163 wait(5).await;
164
165 return completing_future_result;
166 };
167
168 // wrap the original future to get the `stop` function
169 let (_stop, completing_future) = with_stop(completing_future);
170
171 let result = completing_future.await
172 .expect("Must complete with `Some(original result)`.");
173
174 assert_eq!(
175 result,
176 completing_future_result,
177 "Original and received future results must match.",
178 );
179 }
180}