fuel_core_services/
async_processor.rs1use fuel_core_metrics::futures::{
2 FuturesMetrics,
3 metered_future::MeteredFuture,
4};
5use std::{
6 future::Future,
7 sync::Arc,
8};
9use tokio::{
10 runtime,
11 sync::{
12 OwnedSemaphorePermit,
13 Semaphore,
14 },
15 task::JoinHandle,
16};
17
18pub struct AsyncProcessor {
21 metric: FuturesMetrics,
22 semaphore: Arc<Semaphore>,
23 thread_pool: Option<runtime::Runtime>,
24}
25
26impl Drop for AsyncProcessor {
27 fn drop(&mut self) {
28 if let Some(runtime) = self.thread_pool.take() {
29 runtime.shutdown_background();
30 }
31 }
32}
33
34pub struct AsyncReservation(OwnedSemaphorePermit);
36
37#[derive(Debug, PartialEq, Eq)]
39pub struct OutOfCapacity;
40
41impl AsyncProcessor {
42 pub fn new(
45 metric_name: &str,
46 number_of_threads: usize,
47 number_of_pending_tasks: usize,
48 ) -> anyhow::Result<Self> {
49 let thread_pool = if number_of_threads != 0 {
50 let runtime = runtime::Builder::new_multi_thread()
51 .worker_threads(number_of_threads)
52 .enable_all()
53 .build()
54 .map_err(|e| anyhow::anyhow!("Failed to create a tokio pool: {}", e))?;
55
56 Some(runtime)
57 } else {
58 None
59 };
60 let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks));
61 let metric = FuturesMetrics::obtain_futures_metrics(metric_name);
62 Ok(Self {
63 metric,
64 thread_pool,
65 semaphore,
66 })
67 }
68
69 pub fn reserve(&self) -> Result<AsyncReservation, OutOfCapacity> {
71 let permit = self.semaphore.clone().try_acquire_owned();
72 match permit {
73 Ok(permit) => Ok(AsyncReservation(permit)),
74 _ => Err(OutOfCapacity),
75 }
76 }
77
78 pub fn spawn_reserved<F>(
80 &self,
81 reservation: AsyncReservation,
82 future: F,
83 ) -> JoinHandle<F::Output>
84 where
85 F: Future + Send + 'static,
86 F::Output: Send,
87 {
88 let permit = reservation.0;
89 let future = async move {
90 let permit = permit;
91 let result = future.await;
92 drop(permit);
93 result
94 };
95 let metered_future = MeteredFuture::new(future, self.metric.clone());
96 match &self.thread_pool {
97 Some(runtime) => runtime.spawn(metered_future),
98 _ => tokio::spawn(metered_future),
99 }
100 }
101
102 pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
104 where
105 F: Future + Send + 'static,
106 F::Output: Send,
107 {
108 let reservation = self.reserve()?;
109 Ok(self.spawn_reserved(reservation, future))
110 }
111}
112
113#[cfg(test)]
114#[allow(clippy::bool_assert_comparison)]
115#[allow(non_snake_case)]
116mod tests {
117 use super::*;
118 use futures::future::join_all;
119 use std::{
120 collections::HashSet,
121 iter,
122 thread::sleep,
123 time::Duration,
124 };
125 use tokio::time::Instant;
126
127 #[tokio::test]
128 async fn one_spawn_single_tasks_works() {
129 const NUMBER_OF_PENDING_TASKS: usize = 1;
131 let heavy_task_processor =
132 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
133
134 let (sender, receiver) = tokio::sync::oneshot::channel();
136 let result = heavy_task_processor.try_spawn(async move {
137 sender.send(()).unwrap();
138 });
139
140 result.expect("Expected Ok result");
142 tokio::time::timeout(Duration::from_secs(5), receiver)
143 .await
144 .unwrap()
145 .unwrap();
146 }
147
148 #[tokio::test]
149 async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
150 const MAX_NUMBER_OF_THREADS: usize = 10;
152 const NUMBER_OF_PENDING_TASKS: usize = 10000;
153 let heavy_task_processor =
154 AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
155 .unwrap();
156 let main_handler = tokio::spawn(async move { std::thread::current().id() });
157 let main_id = main_handler.await.unwrap();
158
159 let futures = iter::repeat_with(|| {
161 heavy_task_processor
162 .try_spawn(async move { std::thread::current().id() })
163 .unwrap()
164 })
165 .take(NUMBER_OF_PENDING_TASKS)
166 .collect::<Vec<_>>();
167
168 let thread_ids = join_all(futures).await;
170 let unique_thread_ids = thread_ids
171 .into_iter()
172 .map(|r| r.unwrap())
173 .collect::<HashSet<_>>();
174
175 assert!(!unique_thread_ids.contains(&main_id));
177 assert!(!unique_thread_ids.is_empty());
179 assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
181 }
182
183 #[test]
184 fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
185 const NUMBER_OF_PENDING_TASKS: usize = 1;
187 let heavy_task_processor =
188 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
189 let first_spawn_result = heavy_task_processor.try_spawn(async move {
190 sleep(Duration::from_secs(1));
191 });
192 first_spawn_result.expect("Expected Ok result");
193
194 let second_spawn_result = heavy_task_processor.try_spawn(async move {
196 sleep(Duration::from_secs(1));
197 });
198
199 let err = second_spawn_result.expect_err("Should error");
201 assert_eq!(err, OutOfCapacity);
202 }
203
204 #[tokio::test]
205 async fn second_spawn_works_when_first_is_finished() {
206 const NUMBER_OF_PENDING_TASKS: usize = 1;
207 let heavy_task_processor =
208 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
209
210 let (sender, receiver) = tokio::sync::oneshot::channel();
212 let first_spawn = heavy_task_processor.try_spawn(async move {
213 sleep(Duration::from_secs(1));
214 sender.send(()).unwrap();
215 });
216 first_spawn.expect("Expected Ok result").await.unwrap();
217 receiver.await.unwrap();
218
219 let second_spawn = heavy_task_processor.try_spawn(async move {
221 sleep(Duration::from_secs(1));
222 });
223
224 second_spawn.expect("Expected Ok result");
226 }
227
228 #[test]
229 fn can_spawn_10_tasks_when_limit_is_10() {
230 const NUMBER_OF_PENDING_TASKS: usize = 10;
232 let heavy_task_processor =
233 AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
234
235 for _ in 0..NUMBER_OF_PENDING_TASKS {
236 let result = heavy_task_processor.try_spawn(async move {
238 tokio::time::sleep(Duration::from_secs(1)).await;
239 });
240
241 result.expect("Expected Ok result");
243 }
244 }
245
246 #[tokio::test]
247 async fn executes_5_tasks_for_5_seconds_with_one_thread() {
248 const NUMBER_OF_PENDING_TASKS: usize = 5;
250 const NUMBER_OF_THREADS: usize = 1;
251 let heavy_task_processor =
252 AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
253 .unwrap();
254
255 let (broadcast_sender, mut broadcast_receiver) =
257 tokio::sync::broadcast::channel(1024);
258 let instant = Instant::now();
259 for _ in 0..NUMBER_OF_PENDING_TASKS {
260 let broadcast_sender = broadcast_sender.clone();
261 let result = heavy_task_processor.try_spawn(async move {
262 sleep(Duration::from_secs(1));
263 broadcast_sender.send(()).unwrap();
264 });
265 result.expect("Expected Ok result");
266 }
267 drop(broadcast_sender);
268
269 while broadcast_receiver.recv().await.is_ok() {}
271 const LEEWAY: Duration = Duration::from_millis(300);
275 assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
276 assert!(instant.elapsed() >= Duration::from_secs(5));
278 tokio::time::sleep(Duration::from_secs(1)).await;
280 let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
281 assert_eq!(duration.as_secs(), 5);
282 let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
283 assert_eq!(duration.as_secs(), 0);
284 }
285
286 #[tokio::test]
287 async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
288 {
289 const NUMBER_OF_PENDING_TASKS: usize = 10;
291 const NUMBER_OF_THREADS: usize = 10;
292 let heavy_task_processor =
293 AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
294 .unwrap();
295
296 let (broadcast_sender, mut broadcast_receiver) =
298 tokio::sync::broadcast::channel(1024);
299 let instant = Instant::now();
300 for _ in 0..NUMBER_OF_PENDING_TASKS {
301 let broadcast_sender = broadcast_sender.clone();
302 let result = heavy_task_processor.try_spawn(async move {
303 sleep(Duration::from_secs(1));
304 broadcast_sender.send(()).unwrap();
305 });
306 result.expect("Expected Ok result");
307 }
308 drop(broadcast_sender);
309
310 while broadcast_receiver.recv().await.is_ok() {}
312 const LEEWAY: Duration = Duration::from_millis(300);
316 assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
317 tokio::time::sleep(Duration::from_secs(1)).await;
319 let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
320 assert_eq!(duration.as_secs(), 10);
321 let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
322 assert_eq!(duration.as_secs(), 0);
323 }
324
325 #[tokio::test]
326 async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time()
327 {
328 const NUMBER_OF_PENDING_TASKS: usize = 10;
330 const NUMBER_OF_THREADS: usize = 10;
331 let heavy_task_processor =
332 AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
333 .unwrap();
334
335 let (broadcast_sender, mut broadcast_receiver) =
337 tokio::sync::broadcast::channel(1024);
338 let instant = Instant::now();
339 for _ in 0..NUMBER_OF_PENDING_TASKS {
340 let broadcast_sender = broadcast_sender.clone();
341 let result = heavy_task_processor.try_spawn(async move {
342 tokio::time::sleep(Duration::from_secs(1)).await;
343 broadcast_sender.send(()).unwrap();
344 });
345 result.expect("Expected Ok result");
346 }
347 drop(broadcast_sender);
348
349 while broadcast_receiver.recv().await.is_ok() {}
351 const LEEWAY: Duration = Duration::from_millis(300);
355 assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
356 tokio::time::sleep(Duration::from_secs(1)).await;
358 let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
359 assert_eq!(duration.as_secs(), 0);
360 let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
361 assert_eq!(duration.as_secs(), 10);
362 }
363}