termusiclib/taskpool.rs
1use std::sync::Arc;
2
3use futures_util::Future;
4use tokio::sync::Semaphore;
5use tokio_util::sync::CancellationToken;
6
7/// Manages a taskpool of a given size of how many task to execute at once.
8///
9/// Also cancels all tasks spawned by this pool on [`Drop`]
10#[must_use]
11pub struct TaskPool {
12 /// Semaphore to manage how many active tasks there at a time
13 semaphore: Arc<Semaphore>,
14 /// Cancel Token to stop a task on drop
15 cancel_token: CancellationToken,
16}
17
18impl TaskPool {
19 /// Creates a new [`TaskPool`] with a given amount of active tasks
20 pub fn new(n_tasks: usize) -> TaskPool {
21 let semaphore = Arc::new(Semaphore::new(n_tasks));
22 let cancel_token = CancellationToken::new();
23
24 TaskPool {
25 semaphore,
26 cancel_token,
27 }
28 }
29
30 /// Adds a new task to the [`TaskPool`]
31 ///
32 /// see [`tokio::spawn`]
33 ///
34 /// Provided task will be cancelled on [`TaskPool`] [`Drop`]
35 pub fn execute<F, T>(&self, func: F)
36 where
37 F: Future<Output = T> + Send + 'static,
38 T: Send,
39 {
40 let semaphore = self.semaphore.clone();
41 let token = self.cancel_token.clone();
42 tokio::spawn(async move {
43 // multiple "await" points, so combine them to a single future for the select
44 let main = async {
45 let Ok(_permit) = semaphore.acquire().await else {
46 // ignore / cancel task if semaphore is closed
47 // just for clarity, this "return" cancels the whole spawned task and does not execute "func.await"
48 return;
49 };
50 func.await;
51 };
52
53 tokio::select! {
54 () = main => {},
55 () = token.cancelled() => {}
56 }
57 });
58 }
59}
60
61impl Drop for TaskPool {
62 fn drop(&mut self) {
63 // prevent new tasks from being added / executed
64 self.semaphore.close();
65 // cancel all tasks that were spawned with this token
66 self.cancel_token.cancel();
67 }
68}