cs_utils/utils/futures/
with_timeout.rs1use std::pin::Pin;
2
3use futures::{future, Future};
4
5use super::wait;
6
7pub fn with_timeout<
62 T: Send + 'static,
63 TFuture: Future<Output = T> + Send + 'static,
64>(
65 original_future: TFuture,
66 timeout_ms: u64,
67) -> Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>> {
68 let futures: Vec<Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>>> = vec![
69 Box::pin(async move {
70 wait(timeout_ms).await;
71
72 return None;
73 }),
74
75 Box::pin(async move {
76 return Some(
77 original_future.await,
78 );
79 }),
80 ];
81
82 let result_future: Pin<Box<dyn Future<Output = Option<T>> + Send + 'static>> = Box::pin(async move {
83 let (result, _, _) = future::select_all(futures).await;
84
85 result
86 });
87
88 return result_future;
89}
90
91#[cfg(test)]
92mod tests {
93 use std::time::Instant;
94
95 use cs_utils::{
96 random_number,
97 futures::{wait, with_timeout},
98 };
99
100 #[tokio::test]
101 async fn can_stop_a_future_after_a_timeout() {
102 let timeout: u64 = 25;
103
104 let forever_future = async move {
106 loop {
107 wait(5).await;
108 }
109 };
110
111 let with_timeout_future = with_timeout(forever_future, timeout);
113
114 let start_time = Instant::now();
116
117 let result = with_timeout_future.await;
119
120 let time_delta_ms = (Instant::now() - start_time).as_millis();
122
123 assert!(
124 result.is_none(),
125 "Timed out future must complete with `None` result.",
126 );
127
128 assert!(
130 time_delta_ms >= (timeout - 2) as u128,
131 "Must have waited for at least duration of the timeout (\"{delta}\" vs \"{timeout}\").",
132 delta = time_delta_ms,
133 timeout = timeout,
134 );
135 assert!(
136 time_delta_ms <= (timeout + 2) as u128,
137 "Must have waited for at most duration of the timeout (\"{delta}\" vs \"{timeout}\").",
138 delta = time_delta_ms,
139 timeout = timeout,
140 );
141 }
142
143 #[tokio::test]
144 async fn completed_future_returns_some_result() {
145 let timeout: u64 = 25;
146 let completion_delay: u64 = 5;
147 let completing_future_result = random_number(0..=i128::MAX);
148
149 let completing_future = async move {
151 wait(completion_delay).await;
152
153 return completing_future_result;
154 };
155
156 let with_timeout_future = with_timeout(completing_future, timeout);
158
159 let start_time = Instant::now();
161
162 let result = with_timeout_future.await
164 .expect("Completed future must complete with `Some(original result)`.");
165
166 let time_delta_ms = (Instant::now() - start_time).as_millis();
168
169 assert_eq!(
170 result,
171 completing_future_result,
172 "Original and received future results must match.",
173 );
174
175 assert!(
177 time_delta_ms >= (completion_delay - 2) as u128,
178 "Must have waited for at least duration of the completion delay (\"{delta}\" vs \"{delay}\").",
179 delta = time_delta_ms,
180 delay = completion_delay,
181 );
182 assert!(
183 time_delta_ms <= (completion_delay + 2) as u128,
184 "Must have waited for at most duration of the completion delay (\"{delta}\" vs \"{delay}\").",
185 delta = time_delta_ms,
186 delay = completion_delay,
187 );
188 }
189}