termusiclib/
taskpool.rs

1use std::sync::Arc;
2
3use futures::Future;
4use tokio::sync::Semaphore;
5use tokio_util::sync::CancellationToken;
6
7/// Manages a taskpool of a given size of how many task to execute at once.
8///
9/// Also cancels all tasks spawned by this pool on [`Drop`]
10#[must_use]
11pub struct TaskPool {
12    /// Semaphore to manage how many active tasks there at a time
13    semaphore: Arc<Semaphore>,
14    /// Cancel Token to stop a task on drop
15    cancel_token: CancellationToken,
16}
17
18impl TaskPool {
19    /// Creates a new [`TaskPool`] with a given amount of active tasks
20    pub fn new(n_tasks: usize) -> TaskPool {
21        let semaphore = Arc::new(Semaphore::new(n_tasks));
22        let cancel_token = CancellationToken::new();
23
24        TaskPool {
25            semaphore,
26            cancel_token,
27        }
28    }
29
30    /// Adds a new task to the [`TaskPool`]
31    ///
32    /// see [`tokio::spawn`]
33    ///
34    /// Provided task will be cancelled on [`TaskPool`] [`Drop`]
35    pub fn execute<F, T>(&self, func: F)
36    where
37        F: Future<Output = T> + Send + 'static,
38        T: Send,
39    {
40        let semaphore = self.semaphore.clone();
41        let token = self.cancel_token.clone();
42        tokio::spawn(async move {
43            // multiple "await" points, so combine them to a single future for the select
44            let main = async {
45                let Ok(_permit) = semaphore.acquire().await else {
46                    // ignore / cancel task if semaphore is closed
47                    // just for clarity, this "return" cancels the whole spawned task and does not execute "func.await"
48                    return;
49                };
50                func.await;
51            };
52
53            tokio::select! {
54                () = main => {},
55                () = token.cancelled() => {}
56            }
57        });
58    }
59}
60
61impl Drop for TaskPool {
62    fn drop(&mut self) {
63        // prevent new tasks from being added / executed
64        self.semaphore.close();
65        // cancel all tasks that were spawned with this token
66        self.cancel_token.cancel();
67    }
68}