termusiclib/
taskpool.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::sync::Arc;

use futures::Future;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;

/// Manages a taskpool of a given size of how many task to execute at once.
///
/// Also cancels all tasks spawned by this pool on [`Drop`]
pub struct TaskPool {
    /// Semaphore to manage how many active tasks there at a time
    semaphore: Arc<Semaphore>,
    /// Cancel Token to stop a task on drop
    cancel_token: CancellationToken,
}

impl TaskPool {
    /// Creates a new [`TaskPool`] with a given amount of active tasks
    pub fn new(n_tasks: usize) -> TaskPool {
        let semaphore = Arc::new(Semaphore::new(n_tasks));
        let cancel_token = CancellationToken::new();

        TaskPool {
            semaphore,
            cancel_token,
        }
    }

    /// Adds a new task to the [`TaskPool`]
    ///
    /// see [`tokio::spawn`]
    ///
    /// Provided task will be cancelled on [`TaskPool`] [`Drop`]
    pub fn execute<F, T>(&self, func: F)
    where
        F: Future<Output = T> + Send + 'static,
        T: Send,
    {
        let semaphore = self.semaphore.clone();
        let token = self.cancel_token.clone();
        tokio::spawn(async move {
            // multiple "await" points, so combine them to a single future for the select
            let main = async {
                let Ok(_permit) = semaphore.acquire().await else {
                    // ignore / cancel task if semaphore is closed
                    // just for clarity, this "return" cancels the whole spawned task and does not execute "func.await"
                    return;
                };
                func.await;
            };

            tokio::select! {
                () = main => {},
                () = token.cancelled() => {}
            }
        });
    }
}

impl Drop for TaskPool {
    fn drop(&mut self) {
        // prevent new tasks from being added / executed
        self.semaphore.close();
        // cancel all tasks that were spawned with this token
        self.cancel_token.cancel();
    }
}