server/
taskpool.rs

1//! # Task Pool Module
2//!
3//! Provides a concurrent task execution pool with semaphore-based concurrency control
4//! and cancellation support. The TaskPool allows limiting the number of concurrent
5//! tasks while providing graceful shutdown capabilities.
6//!
7//! ## Features
8//!
9//! - **Concurrency Control** - Limits the number of simultaneously executing tasks
10//! - **Cancellation Support** - Can cancel all running tasks gracefully
11//! - **Resource Management** - Automatic cleanup and resource disposal
12//! - **Clone Support** - Multiple references to the same task pool
13//!
14//! ## Usage
15//!
16//! ```no_run
17//! use quetty_server::taskpool::TaskPool;
18//!
19//! async fn example() {
20//!     let pool = TaskPool::new(10); // Allow up to 10 concurrent tasks
21//!
22//!     // Execute multiple tasks
23//!     for i in 0..20 {
24//!         pool.execute(async move {
25//!             println!("Task {} executing", i);
26//!             // Simulate work
27//!             tokio::time::sleep(std::time::Duration::from_millis(100)).await;
28//!         });
29//!     }
30//!
31//!     // Later, cancel all tasks if needed
32//!     pool.cancel_all();
33//! }
34//! ```
35
36use futures_util::Future;
37use std::sync::Arc;
38use tokio::sync::Semaphore;
39use tokio_util::sync::CancellationToken;
40
41/// A concurrent task execution pool with semaphore-based concurrency control.
42///
43/// TaskPool manages the execution of asynchronous tasks with a configurable limit
44/// on the number of concurrent tasks. It provides cancellation support and automatic
45/// resource cleanup.
46///
47/// # Thread Safety
48///
49/// TaskPool is thread-safe and can be cloned to share across multiple contexts.
50/// All clones share the same underlying semaphore and cancellation token.
51#[derive(Clone)]
52pub struct TaskPool {
53    semaphore: Arc<Semaphore>,
54    cancel_token: Arc<CancellationToken>,
55}
56
57impl TaskPool {
58    /// Creates a new TaskPool with the specified concurrency limit.
59    ///
60    /// # Arguments
61    ///
62    /// * `n_tasks` - Maximum number of tasks that can execute concurrently
63    ///
64    /// # Examples
65    ///
66    /// ```no_run
67    /// use quetty_server::taskpool::TaskPool;
68    ///
69    /// let pool = TaskPool::new(5); // Allow up to 5 concurrent tasks
70    /// ```
71    pub fn new(n_tasks: usize) -> TaskPool {
72        TaskPool {
73            semaphore: Arc::new(Semaphore::new(n_tasks)),
74            cancel_token: Arc::new(CancellationToken::new()),
75        }
76    }
77
78    /// Executes a future in the task pool with concurrency control.
79    ///
80    /// The task will wait for a semaphore permit before executing. If the pool
81    /// is cancelled while the task is running, it will be interrupted gracefully.
82    ///
83    /// # Type Parameters
84    ///
85    /// * `F` - Future type that implements Send and has a static lifetime
86    /// * `T` - Output type of the future that implements Send
87    ///
88    /// # Arguments
89    ///
90    /// * `func` - The async function/future to execute
91    ///
92    /// # Examples
93    ///
94    /// ```no_run
95    /// use quetty_server::taskpool::TaskPool;
96    ///
97    /// async fn example() {
98    ///     let pool = TaskPool::new(3);
99    ///
100    ///     pool.execute(async {
101    ///         println!("Task is running");
102    ///         // Do some work
103    ///     });
104    /// }
105    /// ```
106    pub fn execute<F, T>(&self, func: F)
107    where
108        F: Future<Output = T> + Send + 'static,
109        T: Send,
110    {
111        let semaphore = self.semaphore.clone();
112        let token = self.cancel_token.clone();
113
114        tokio::spawn(async move {
115            let main = async {
116                if let Ok(_permit) = semaphore.acquire().await {
117                    func.await;
118                } else {
119                    log::error!("TaskPool: Failed to acquire semaphore permit");
120                }
121            };
122
123            tokio::select! {
124                () = main => {},
125                () = token.cancelled() => {
126                    log::debug!("TaskPool: Task cancelled");
127                }
128            }
129        });
130    }
131
132    /// Cancels all currently running and queued tasks.
133    ///
134    /// This sends a cancellation signal to all tasks. Tasks that are currently
135    /// executing will be interrupted at their next cancellation check point.
136    /// Tasks waiting for permits will be cancelled before they start.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// use quetty_server::taskpool::TaskPool;
142    ///
143    /// async fn example() {
144    ///     let pool = TaskPool::new(3);
145    ///
146    ///     // Start some tasks
147    ///     for i in 0..10 {
148    ///         pool.execute(async move {
149    ///             println!("Task {}", i);
150    ///         });
151    ///     }
152    ///
153    ///     // Cancel all tasks
154    ///     pool.cancel_all();
155    /// }
156    /// ```
157    pub fn cancel_all(&self) {
158        self.cancel_token.cancel();
159    }
160
161    /// Closes the task pool to prevent new tasks from starting.
162    ///
163    /// This closes the underlying semaphore, which prevents new tasks from
164    /// acquiring permits. Tasks that are already running will continue to completion.
165    ///
166    /// # Examples
167    ///
168    /// ```no_run
169    /// use quetty_server::taskpool::TaskPool;
170    ///
171    /// async fn example() {
172    ///     let pool = TaskPool::new(3);
173    ///
174    ///     // Use the pool...
175    ///
176    ///     // Close it to prevent new tasks
177    ///     pool.close();
178    /// }
179    /// ```
180    pub fn close(&self) {
181        self.semaphore.close();
182    }
183}
184
185impl Drop for TaskPool {
186    /// Automatically closes the semaphore when the last TaskPool reference is dropped.
187    ///
188    /// This ensures that resources are properly cleaned up when the task pool
189    /// is no longer needed. The semaphore is only closed when this is the last
190    /// remaining reference to prevent premature shutdown.
191    fn drop(&mut self) {
192        // Only close the semaphore when the last reference is dropped
193        if Arc::strong_count(&self.semaphore) == 1 {
194            self.semaphore.close();
195        }
196    }
197}