Skip to main content

fuel_core_services/
async_processor.rs

1use fuel_core_metrics::futures::{
2    FuturesMetrics,
3    metered_future::MeteredFuture,
4};
5use std::{
6    future::Future,
7    sync::Arc,
8};
9use tokio::{
10    runtime,
11    sync::{
12        OwnedSemaphorePermit,
13        Semaphore,
14    },
15    task::JoinHandle,
16};
17
18/// A processor that can execute async tasks with a limit on the number of tasks that can be
19/// executed concurrently.
20pub struct AsyncProcessor {
21    metric: FuturesMetrics,
22    semaphore: Arc<Semaphore>,
23    thread_pool: Option<runtime::Runtime>,
24}
25
26impl Drop for AsyncProcessor {
27    fn drop(&mut self) {
28        if let Some(runtime) = self.thread_pool.take() {
29            runtime.shutdown_background();
30        }
31    }
32}
33
34/// A reservation for a task to be executed by the `AsyncProcessor`.
35pub struct AsyncReservation(OwnedSemaphorePermit);
36
37/// Out of capacity error.
38#[derive(Debug, PartialEq, Eq)]
39pub struct OutOfCapacity;
40
41impl AsyncProcessor {
42    /// Create a new `AsyncProcessor` with the given number of threads and the number of pending
43    /// tasks.
44    pub fn new(
45        metric_name: &str,
46        number_of_threads: usize,
47        number_of_pending_tasks: usize,
48    ) -> anyhow::Result<Self> {
49        let thread_pool = if number_of_threads != 0 {
50            let runtime = runtime::Builder::new_multi_thread()
51                .worker_threads(number_of_threads)
52                .enable_all()
53                .build()
54                .map_err(|e| anyhow::anyhow!("Failed to create a tokio pool: {}", e))?;
55
56            Some(runtime)
57        } else {
58            None
59        };
60        let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks));
61        let metric = FuturesMetrics::obtain_futures_metrics(metric_name);
62        Ok(Self {
63            metric,
64            thread_pool,
65            semaphore,
66        })
67    }
68
69    /// Reserve a slot for a task to be executed.
70    pub fn reserve(&self) -> Result<AsyncReservation, OutOfCapacity> {
71        let permit = self.semaphore.clone().try_acquire_owned();
72        match permit {
73            Ok(permit) => Ok(AsyncReservation(permit)),
74            _ => Err(OutOfCapacity),
75        }
76    }
77
78    /// Spawn a task with a reservation.
79    pub fn spawn_reserved<F>(
80        &self,
81        reservation: AsyncReservation,
82        future: F,
83    ) -> JoinHandle<F::Output>
84    where
85        F: Future + Send + 'static,
86        F::Output: Send,
87    {
88        let permit = reservation.0;
89        let future = async move {
90            let permit = permit;
91            let result = future.await;
92            drop(permit);
93            result
94        };
95        let metered_future = MeteredFuture::new(future, self.metric.clone());
96        match &self.thread_pool {
97            Some(runtime) => runtime.spawn(metered_future),
98            _ => tokio::spawn(metered_future),
99        }
100    }
101
102    /// Tries to spawn a task. If the task cannot be spawned, returns an error.
103    pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
104    where
105        F: Future + Send + 'static,
106        F::Output: Send,
107    {
108        let reservation = self.reserve()?;
109        Ok(self.spawn_reserved(reservation, future))
110    }
111}
112
113#[cfg(test)]
114#[allow(clippy::bool_assert_comparison)]
115#[allow(non_snake_case)]
116mod tests {
117    use super::*;
118    use futures::future::join_all;
119    use std::{
120        collections::HashSet,
121        iter,
122        thread::sleep,
123        time::Duration,
124    };
125    use tokio::time::Instant;
126
127    #[tokio::test]
128    async fn one_spawn_single_tasks_works() {
129        // Given
130        const NUMBER_OF_PENDING_TASKS: usize = 1;
131        let heavy_task_processor =
132            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
133
134        // When
135        let (sender, receiver) = tokio::sync::oneshot::channel();
136        let result = heavy_task_processor.try_spawn(async move {
137            sender.send(()).unwrap();
138        });
139
140        // Then
141        result.expect("Expected Ok result");
142        tokio::time::timeout(Duration::from_secs(5), receiver)
143            .await
144            .unwrap()
145            .unwrap();
146    }
147
148    #[tokio::test]
149    async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
150        // Given
151        const MAX_NUMBER_OF_THREADS: usize = 10;
152        const NUMBER_OF_PENDING_TASKS: usize = 10000;
153        let heavy_task_processor =
154            AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
155                .unwrap();
156        let main_handler = tokio::spawn(async move { std::thread::current().id() });
157        let main_id = main_handler.await.unwrap();
158
159        // When
160        let futures = iter::repeat_with(|| {
161            heavy_task_processor
162                .try_spawn(async move { std::thread::current().id() })
163                .unwrap()
164        })
165        .take(NUMBER_OF_PENDING_TASKS)
166        .collect::<Vec<_>>();
167
168        // Then
169        let thread_ids = join_all(futures).await;
170        let unique_thread_ids = thread_ids
171            .into_iter()
172            .map(|r| r.unwrap())
173            .collect::<HashSet<_>>();
174
175        // Main thread was not used.
176        assert!(!unique_thread_ids.contains(&main_id));
177        // There's been at least one worker thread used.
178        assert!(!unique_thread_ids.is_empty());
179        // There were no more worker threads above the threshold.
180        assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
181    }
182
183    #[test]
184    fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
185        // Given
186        const NUMBER_OF_PENDING_TASKS: usize = 1;
187        let heavy_task_processor =
188            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
189        let first_spawn_result = heavy_task_processor.try_spawn(async move {
190            sleep(Duration::from_secs(1));
191        });
192        first_spawn_result.expect("Expected Ok result");
193
194        // When
195        let second_spawn_result = heavy_task_processor.try_spawn(async move {
196            sleep(Duration::from_secs(1));
197        });
198
199        // Then
200        let err = second_spawn_result.expect_err("Should error");
201        assert_eq!(err, OutOfCapacity);
202    }
203
204    #[tokio::test]
205    async fn second_spawn_works_when_first_is_finished() {
206        const NUMBER_OF_PENDING_TASKS: usize = 1;
207        let heavy_task_processor =
208            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
209
210        // Given
211        let (sender, receiver) = tokio::sync::oneshot::channel();
212        let first_spawn = heavy_task_processor.try_spawn(async move {
213            sleep(Duration::from_secs(1));
214            sender.send(()).unwrap();
215        });
216        first_spawn.expect("Expected Ok result").await.unwrap();
217        receiver.await.unwrap();
218
219        // When
220        let second_spawn = heavy_task_processor.try_spawn(async move {
221            sleep(Duration::from_secs(1));
222        });
223
224        // Then
225        second_spawn.expect("Expected Ok result");
226    }
227
228    #[test]
229    fn can_spawn_10_tasks_when_limit_is_10() {
230        // Given
231        const NUMBER_OF_PENDING_TASKS: usize = 10;
232        let heavy_task_processor =
233            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
234
235        for _ in 0..NUMBER_OF_PENDING_TASKS {
236            // When
237            let result = heavy_task_processor.try_spawn(async move {
238                tokio::time::sleep(Duration::from_secs(1)).await;
239            });
240
241            // Then
242            result.expect("Expected Ok result");
243        }
244    }
245
246    #[tokio::test]
247    async fn executes_5_tasks_for_5_seconds_with_one_thread() {
248        // Given
249        const NUMBER_OF_PENDING_TASKS: usize = 5;
250        const NUMBER_OF_THREADS: usize = 1;
251        let heavy_task_processor =
252            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
253                .unwrap();
254
255        // When
256        let (broadcast_sender, mut broadcast_receiver) =
257            tokio::sync::broadcast::channel(1024);
258        let instant = Instant::now();
259        for _ in 0..NUMBER_OF_PENDING_TASKS {
260            let broadcast_sender = broadcast_sender.clone();
261            let result = heavy_task_processor.try_spawn(async move {
262                sleep(Duration::from_secs(1));
263                broadcast_sender.send(()).unwrap();
264            });
265            result.expect("Expected Ok result");
266        }
267        drop(broadcast_sender);
268
269        // Then
270        while broadcast_receiver.recv().await.is_ok() {}
271        // 5 tasks running on 1 thread, each task taking 1 second,
272        // should complete in approximately 5 seconds overall.
273        // Allowing some LEEWAY to account for runtime overhead.
274        const LEEWAY: Duration = Duration::from_millis(300);
275        assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
276        // Make sure that the tasks were not executed in parallel.
277        assert!(instant.elapsed() >= Duration::from_secs(5));
278        // Wait for the metrics to be updated.
279        tokio::time::sleep(Duration::from_secs(1)).await;
280        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
281        assert_eq!(duration.as_secs(), 5);
282        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
283        assert_eq!(duration.as_secs(), 0);
284    }
285
286    #[tokio::test]
287    async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
288    {
289        // Given
290        const NUMBER_OF_PENDING_TASKS: usize = 10;
291        const NUMBER_OF_THREADS: usize = 10;
292        let heavy_task_processor =
293            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
294                .unwrap();
295
296        // When
297        let (broadcast_sender, mut broadcast_receiver) =
298            tokio::sync::broadcast::channel(1024);
299        let instant = Instant::now();
300        for _ in 0..NUMBER_OF_PENDING_TASKS {
301            let broadcast_sender = broadcast_sender.clone();
302            let result = heavy_task_processor.try_spawn(async move {
303                sleep(Duration::from_secs(1));
304                broadcast_sender.send(()).unwrap();
305            });
306            result.expect("Expected Ok result");
307        }
308        drop(broadcast_sender);
309
310        // Then
311        while broadcast_receiver.recv().await.is_ok() {}
312        // 10 blocking tasks running on 10 threads, each task taking 1 second,
313        // should complete in approximately 1 second overall.
314        // Allowing some LEEWAY to account for runtime overhead.
315        const LEEWAY: Duration = Duration::from_millis(300);
316        assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
317        // Wait for the metrics to be updated.
318        tokio::time::sleep(Duration::from_secs(1)).await;
319        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
320        assert_eq!(duration.as_secs(), 10);
321        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
322        assert_eq!(duration.as_secs(), 0);
323    }
324
325    #[tokio::test]
326    async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time()
327     {
328        // Given
329        const NUMBER_OF_PENDING_TASKS: usize = 10;
330        const NUMBER_OF_THREADS: usize = 10;
331        let heavy_task_processor =
332            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
333                .unwrap();
334
335        // When
336        let (broadcast_sender, mut broadcast_receiver) =
337            tokio::sync::broadcast::channel(1024);
338        let instant = Instant::now();
339        for _ in 0..NUMBER_OF_PENDING_TASKS {
340            let broadcast_sender = broadcast_sender.clone();
341            let result = heavy_task_processor.try_spawn(async move {
342                tokio::time::sleep(Duration::from_secs(1)).await;
343                broadcast_sender.send(()).unwrap();
344            });
345            result.expect("Expected Ok result");
346        }
347        drop(broadcast_sender);
348
349        // Then
350        while broadcast_receiver.recv().await.is_ok() {}
351        // 10 non-blocking tasks running on 10 threads, each task taking 1 second,
352        // should complete in approximately 1 second overall.
353        // Allowing some LEEWAY to account for runtime overhead.
354        const LEEWAY: Duration = Duration::from_millis(300);
355        assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
356        // Wait for the metrics to be updated.
357        tokio::time::sleep(Duration::from_secs(1)).await;
358        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
359        assert_eq!(duration.as_secs(), 0);
360        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
361        assert_eq!(duration.as_secs(), 10);
362    }
363}