fuel_core_services/
async_processor.rs

1use fuel_core_metrics::futures::{
2    metered_future::MeteredFuture,
3    FuturesMetrics,
4};
5use std::{
6    future::Future,
7    sync::Arc,
8};
9use tokio::{
10    runtime,
11    sync::{
12        OwnedSemaphorePermit,
13        Semaphore,
14    },
15    task::JoinHandle,
16};
17
18/// A processor that can execute async tasks with a limit on the number of tasks that can be
19/// executed concurrently.
20pub struct AsyncProcessor {
21    metric: FuturesMetrics,
22    semaphore: Arc<Semaphore>,
23    thread_pool: Option<runtime::Runtime>,
24}
25
26impl Drop for AsyncProcessor {
27    fn drop(&mut self) {
28        if let Some(runtime) = self.thread_pool.take() {
29            runtime.shutdown_background();
30        }
31    }
32}
33
34/// A reservation for a task to be executed by the `AsyncProcessor`.
35pub struct AsyncReservation(OwnedSemaphorePermit);
36
37/// Out of capacity error.
38#[derive(Debug, PartialEq, Eq)]
39pub struct OutOfCapacity;
40
41impl AsyncProcessor {
42    /// Create a new `AsyncProcessor` with the given number of threads and the number of pending
43    /// tasks.
44    pub fn new(
45        metric_name: &str,
46        number_of_threads: usize,
47        number_of_pending_tasks: usize,
48    ) -> anyhow::Result<Self> {
49        let thread_pool = if number_of_threads != 0 {
50            let runtime = runtime::Builder::new_multi_thread()
51                .worker_threads(number_of_threads)
52                .enable_all()
53                .build()
54                .map_err(|e| anyhow::anyhow!("Failed to create a tokio pool: {}", e))?;
55
56            Some(runtime)
57        } else {
58            None
59        };
60        let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks));
61        let metric = FuturesMetrics::obtain_futures_metrics(metric_name);
62        Ok(Self {
63            metric,
64            thread_pool,
65            semaphore,
66        })
67    }
68
69    /// Reserve a slot for a task to be executed.
70    pub fn reserve(&self) -> Result<AsyncReservation, OutOfCapacity> {
71        let permit = self.semaphore.clone().try_acquire_owned();
72        if let Ok(permit) = permit {
73            Ok(AsyncReservation(permit))
74        } else {
75            Err(OutOfCapacity)
76        }
77    }
78
79    /// Spawn a task with a reservation.
80    pub fn spawn_reserved<F>(
81        &self,
82        reservation: AsyncReservation,
83        future: F,
84    ) -> JoinHandle<F::Output>
85    where
86        F: Future + Send + 'static,
87        F::Output: Send,
88    {
89        let permit = reservation.0;
90        let future = async move {
91            let permit = permit;
92            let result = future.await;
93            drop(permit);
94            result
95        };
96        let metered_future = MeteredFuture::new(future, self.metric.clone());
97        if let Some(runtime) = &self.thread_pool {
98            runtime.spawn(metered_future)
99        } else {
100            tokio::spawn(metered_future)
101        }
102    }
103
104    /// Tries to spawn a task. If the task cannot be spawned, returns an error.
105    pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
106    where
107        F: Future + Send + 'static,
108        F::Output: Send,
109    {
110        let reservation = self.reserve()?;
111        Ok(self.spawn_reserved(reservation, future))
112    }
113}
114
115#[cfg(test)]
116#[allow(clippy::bool_assert_comparison)]
117#[allow(non_snake_case)]
118mod tests {
119    use super::*;
120    use futures::future::join_all;
121    use std::{
122        collections::HashSet,
123        iter,
124        thread::sleep,
125        time::Duration,
126    };
127    use tokio::time::Instant;
128
129    #[tokio::test]
130    async fn one_spawn_single_tasks_works() {
131        // Given
132        const NUMBER_OF_PENDING_TASKS: usize = 1;
133        let heavy_task_processor =
134            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
135
136        // When
137        let (sender, receiver) = tokio::sync::oneshot::channel();
138        let result = heavy_task_processor.try_spawn(async move {
139            sender.send(()).unwrap();
140        });
141
142        // Then
143        result.expect("Expected Ok result");
144        tokio::time::timeout(Duration::from_secs(5), receiver)
145            .await
146            .unwrap()
147            .unwrap();
148    }
149
150    #[tokio::test]
151    async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
152        // Given
153        const MAX_NUMBER_OF_THREADS: usize = 10;
154        const NUMBER_OF_PENDING_TASKS: usize = 10000;
155        let heavy_task_processor =
156            AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
157                .unwrap();
158        let main_handler = tokio::spawn(async move { std::thread::current().id() });
159        let main_id = main_handler.await.unwrap();
160
161        // When
162        let futures = iter::repeat_with(|| {
163            heavy_task_processor
164                .try_spawn(async move { std::thread::current().id() })
165                .unwrap()
166        })
167        .take(NUMBER_OF_PENDING_TASKS)
168        .collect::<Vec<_>>();
169
170        // Then
171        let thread_ids = join_all(futures).await;
172        let unique_thread_ids = thread_ids
173            .into_iter()
174            .map(|r| r.unwrap())
175            .collect::<HashSet<_>>();
176
177        // Main thread was not used.
178        assert!(!unique_thread_ids.contains(&main_id));
179        // There's been at least one worker thread used.
180        assert!(!unique_thread_ids.is_empty());
181        // There were no more worker threads above the threshold.
182        assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
183    }
184
185    #[test]
186    fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
187        // Given
188        const NUMBER_OF_PENDING_TASKS: usize = 1;
189        let heavy_task_processor =
190            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
191        let first_spawn_result = heavy_task_processor.try_spawn(async move {
192            sleep(Duration::from_secs(1));
193        });
194        first_spawn_result.expect("Expected Ok result");
195
196        // When
197        let second_spawn_result = heavy_task_processor.try_spawn(async move {
198            sleep(Duration::from_secs(1));
199        });
200
201        // Then
202        let err = second_spawn_result.expect_err("Should error");
203        assert_eq!(err, OutOfCapacity);
204    }
205
206    #[tokio::test]
207    async fn second_spawn_works_when_first_is_finished() {
208        const NUMBER_OF_PENDING_TASKS: usize = 1;
209        let heavy_task_processor =
210            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
211
212        // Given
213        let (sender, receiver) = tokio::sync::oneshot::channel();
214        let first_spawn = heavy_task_processor.try_spawn(async move {
215            sleep(Duration::from_secs(1));
216            sender.send(()).unwrap();
217        });
218        first_spawn.expect("Expected Ok result").await.unwrap();
219        receiver.await.unwrap();
220
221        // When
222        let second_spawn = heavy_task_processor.try_spawn(async move {
223            sleep(Duration::from_secs(1));
224        });
225
226        // Then
227        second_spawn.expect("Expected Ok result");
228    }
229
230    #[test]
231    fn can_spawn_10_tasks_when_limit_is_10() {
232        // Given
233        const NUMBER_OF_PENDING_TASKS: usize = 10;
234        let heavy_task_processor =
235            AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
236
237        for _ in 0..NUMBER_OF_PENDING_TASKS {
238            // When
239            let result = heavy_task_processor.try_spawn(async move {
240                tokio::time::sleep(Duration::from_secs(1)).await;
241            });
242
243            // Then
244            result.expect("Expected Ok result");
245        }
246    }
247
248    #[tokio::test]
249    async fn executes_5_tasks_for_5_seconds_with_one_thread() {
250        // Given
251        const NUMBER_OF_PENDING_TASKS: usize = 5;
252        const NUMBER_OF_THREADS: usize = 1;
253        let heavy_task_processor =
254            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
255                .unwrap();
256
257        // When
258        let (broadcast_sender, mut broadcast_receiver) =
259            tokio::sync::broadcast::channel(1024);
260        let instant = Instant::now();
261        for _ in 0..NUMBER_OF_PENDING_TASKS {
262            let broadcast_sender = broadcast_sender.clone();
263            let result = heavy_task_processor.try_spawn(async move {
264                sleep(Duration::from_secs(1));
265                broadcast_sender.send(()).unwrap();
266            });
267            result.expect("Expected Ok result");
268        }
269        drop(broadcast_sender);
270
271        // Then
272        while broadcast_receiver.recv().await.is_ok() {}
273        // 5 tasks running on 1 thread, each task taking 1 second,
274        // should complete in approximately 5 seconds overall.
275        // Allowing some LEEWAY to account for runtime overhead.
276        const LEEWAY: Duration = Duration::from_millis(300);
277        assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
278        // Make sure that the tasks were not executed in parallel.
279        assert!(instant.elapsed() >= Duration::from_secs(5));
280        // Wait for the metrics to be updated.
281        tokio::time::sleep(Duration::from_secs(1)).await;
282        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
283        assert_eq!(duration.as_secs(), 5);
284        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
285        assert_eq!(duration.as_secs(), 0);
286    }
287
288    #[tokio::test]
289    async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
290    {
291        // Given
292        const NUMBER_OF_PENDING_TASKS: usize = 10;
293        const NUMBER_OF_THREADS: usize = 10;
294        let heavy_task_processor =
295            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
296                .unwrap();
297
298        // When
299        let (broadcast_sender, mut broadcast_receiver) =
300            tokio::sync::broadcast::channel(1024);
301        let instant = Instant::now();
302        for _ in 0..NUMBER_OF_PENDING_TASKS {
303            let broadcast_sender = broadcast_sender.clone();
304            let result = heavy_task_processor.try_spawn(async move {
305                sleep(Duration::from_secs(1));
306                broadcast_sender.send(()).unwrap();
307            });
308            result.expect("Expected Ok result");
309        }
310        drop(broadcast_sender);
311
312        // Then
313        while broadcast_receiver.recv().await.is_ok() {}
314        // 10 blocking tasks running on 10 threads, each task taking 1 second,
315        // should complete in approximately 1 second overall.
316        // Allowing some LEEWAY to account for runtime overhead.
317        const LEEWAY: Duration = Duration::from_millis(300);
318        assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
319        // Wait for the metrics to be updated.
320        tokio::time::sleep(Duration::from_secs(1)).await;
321        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
322        assert_eq!(duration.as_secs(), 10);
323        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
324        assert_eq!(duration.as_secs(), 0);
325    }
326
327    #[tokio::test]
328    async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time(
329    ) {
330        // Given
331        const NUMBER_OF_PENDING_TASKS: usize = 10;
332        const NUMBER_OF_THREADS: usize = 10;
333        let heavy_task_processor =
334            AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
335                .unwrap();
336
337        // When
338        let (broadcast_sender, mut broadcast_receiver) =
339            tokio::sync::broadcast::channel(1024);
340        let instant = Instant::now();
341        for _ in 0..NUMBER_OF_PENDING_TASKS {
342            let broadcast_sender = broadcast_sender.clone();
343            let result = heavy_task_processor.try_spawn(async move {
344                tokio::time::sleep(Duration::from_secs(1)).await;
345                broadcast_sender.send(()).unwrap();
346            });
347            result.expect("Expected Ok result");
348        }
349        drop(broadcast_sender);
350
351        // Then
352        while broadcast_receiver.recv().await.is_ok() {}
353        // 10 non-blocking tasks running on 10 threads, each task taking 1 second,
354        // should complete in approximately 1 second overall.
355        // Allowing some LEEWAY to account for runtime overhead.
356        const LEEWAY: Duration = Duration::from_millis(300);
357        assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
358        // Wait for the metrics to be updated.
359        tokio::time::sleep(Duration::from_secs(1)).await;
360        let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
361        assert_eq!(duration.as_secs(), 0);
362        let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
363        assert_eq!(duration.as_secs(), 10);
364    }
365}