server/taskpool.rs
1//! # Task Pool Module
2//!
3//! Provides a concurrent task execution pool with semaphore-based concurrency control
4//! and cancellation support. The TaskPool allows limiting the number of concurrent
5//! tasks while providing graceful shutdown capabilities.
6//!
7//! ## Features
8//!
9//! - **Concurrency Control** - Limits the number of simultaneously executing tasks
10//! - **Cancellation Support** - Can cancel all running tasks gracefully
11//! - **Resource Management** - Automatic cleanup and resource disposal
12//! - **Clone Support** - Multiple references to the same task pool
13//!
14//! ## Usage
15//!
16//! ```no_run
17//! use quetty_server::taskpool::TaskPool;
18//!
19//! async fn example() {
20//! let pool = TaskPool::new(10); // Allow up to 10 concurrent tasks
21//!
22//! // Execute multiple tasks
23//! for i in 0..20 {
24//! pool.execute(async move {
25//! println!("Task {} executing", i);
26//! // Simulate work
27//! tokio::time::sleep(std::time::Duration::from_millis(100)).await;
28//! });
29//! }
30//!
31//! // Later, cancel all tasks if needed
32//! pool.cancel_all();
33//! }
34//! ```
35
36use futures_util::Future;
37use std::sync::Arc;
38use tokio::sync::Semaphore;
39use tokio_util::sync::CancellationToken;
40
41/// A concurrent task execution pool with semaphore-based concurrency control.
42///
43/// TaskPool manages the execution of asynchronous tasks with a configurable limit
44/// on the number of concurrent tasks. It provides cancellation support and automatic
45/// resource cleanup.
46///
47/// # Thread Safety
48///
49/// TaskPool is thread-safe and can be cloned to share across multiple contexts.
50/// All clones share the same underlying semaphore and cancellation token.
51#[derive(Clone)]
52pub struct TaskPool {
53 semaphore: Arc<Semaphore>,
54 cancel_token: Arc<CancellationToken>,
55}
56
57impl TaskPool {
58 /// Creates a new TaskPool with the specified concurrency limit.
59 ///
60 /// # Arguments
61 ///
62 /// * `n_tasks` - Maximum number of tasks that can execute concurrently
63 ///
64 /// # Examples
65 ///
66 /// ```no_run
67 /// use quetty_server::taskpool::TaskPool;
68 ///
69 /// let pool = TaskPool::new(5); // Allow up to 5 concurrent tasks
70 /// ```
71 pub fn new(n_tasks: usize) -> TaskPool {
72 TaskPool {
73 semaphore: Arc::new(Semaphore::new(n_tasks)),
74 cancel_token: Arc::new(CancellationToken::new()),
75 }
76 }
77
78 /// Executes a future in the task pool with concurrency control.
79 ///
80 /// The task will wait for a semaphore permit before executing. If the pool
81 /// is cancelled while the task is running, it will be interrupted gracefully.
82 ///
83 /// # Type Parameters
84 ///
85 /// * `F` - Future type that implements Send and has a static lifetime
86 /// * `T` - Output type of the future that implements Send
87 ///
88 /// # Arguments
89 ///
90 /// * `func` - The async function/future to execute
91 ///
92 /// # Examples
93 ///
94 /// ```no_run
95 /// use quetty_server::taskpool::TaskPool;
96 ///
97 /// async fn example() {
98 /// let pool = TaskPool::new(3);
99 ///
100 /// pool.execute(async {
101 /// println!("Task is running");
102 /// // Do some work
103 /// });
104 /// }
105 /// ```
106 pub fn execute<F, T>(&self, func: F)
107 where
108 F: Future<Output = T> + Send + 'static,
109 T: Send,
110 {
111 let semaphore = self.semaphore.clone();
112 let token = self.cancel_token.clone();
113
114 tokio::spawn(async move {
115 let main = async {
116 if let Ok(_permit) = semaphore.acquire().await {
117 func.await;
118 } else {
119 log::error!("TaskPool: Failed to acquire semaphore permit");
120 }
121 };
122
123 tokio::select! {
124 () = main => {},
125 () = token.cancelled() => {
126 log::debug!("TaskPool: Task cancelled");
127 }
128 }
129 });
130 }
131
132 /// Cancels all currently running and queued tasks.
133 ///
134 /// This sends a cancellation signal to all tasks. Tasks that are currently
135 /// executing will be interrupted at their next cancellation check point.
136 /// Tasks waiting for permits will be cancelled before they start.
137 ///
138 /// # Examples
139 ///
140 /// ```no_run
141 /// use quetty_server::taskpool::TaskPool;
142 ///
143 /// async fn example() {
144 /// let pool = TaskPool::new(3);
145 ///
146 /// // Start some tasks
147 /// for i in 0..10 {
148 /// pool.execute(async move {
149 /// println!("Task {}", i);
150 /// });
151 /// }
152 ///
153 /// // Cancel all tasks
154 /// pool.cancel_all();
155 /// }
156 /// ```
157 pub fn cancel_all(&self) {
158 self.cancel_token.cancel();
159 }
160
161 /// Closes the task pool to prevent new tasks from starting.
162 ///
163 /// This closes the underlying semaphore, which prevents new tasks from
164 /// acquiring permits. Tasks that are already running will continue to completion.
165 ///
166 /// # Examples
167 ///
168 /// ```no_run
169 /// use quetty_server::taskpool::TaskPool;
170 ///
171 /// async fn example() {
172 /// let pool = TaskPool::new(3);
173 ///
174 /// // Use the pool...
175 ///
176 /// // Close it to prevent new tasks
177 /// pool.close();
178 /// }
179 /// ```
180 pub fn close(&self) {
181 self.semaphore.close();
182 }
183}
184
185impl Drop for TaskPool {
186 /// Automatically closes the semaphore when the last TaskPool reference is dropped.
187 ///
188 /// This ensures that resources are properly cleaned up when the task pool
189 /// is no longer needed. The semaphore is only closed when this is the last
190 /// remaining reference to prevent premature shutdown.
191 fn drop(&mut self) {
192 // Only close the semaphore when the last reference is dropped
193 if Arc::strong_count(&self.semaphore) == 1 {
194 self.semaphore.close();
195 }
196 }
197}