1use log::{error, warn};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::Semaphore;
11
12#[derive(Debug, Clone)]
14pub struct ResourceLimits {
15 pub max_concurrent_tasks: usize,
17 pub max_recursion_depth: usize,
19 pub task_warning_threshold: usize,
21 pub log_warnings: bool,
23 pub reject_when_full: bool,
25}
26
27impl Default for ResourceLimits {
28 fn default() -> Self {
29 let max_tasks = 1000;
30 Self {
31 max_concurrent_tasks: max_tasks,
32 max_recursion_depth: 100,
33 task_warning_threshold: (max_tasks as f64 * 0.8) as usize,
34 log_warnings: true,
35 reject_when_full: true,
36 }
37 }
38}
39
40impl ResourceLimits {
41 pub fn with_max_tasks(mut self, max: usize) -> Self {
43 self.max_concurrent_tasks = max;
44 self.task_warning_threshold = (max as f64 * 0.8) as usize;
45 self
46 }
47
48 pub fn with_max_recursion(mut self, depth: usize) -> Self {
50 self.max_recursion_depth = depth;
51 self
52 }
53
54 pub fn unlimited() -> Self {
56 Self {
57 max_concurrent_tasks: usize::MAX,
58 max_recursion_depth: usize::MAX,
59 task_warning_threshold: usize::MAX,
60 log_warnings: false,
61 reject_when_full: false,
62 }
63 }
64}
65
66pub struct ResourceMonitor {
68 limits: ResourceLimits,
69 active_tasks: Arc<AtomicUsize>,
70 total_spawned: Arc<AtomicUsize>,
71 total_rejected: Arc<AtomicUsize>,
72 peak_concurrent: Arc<AtomicUsize>,
73 task_semaphore: Arc<Semaphore>,
74 start_time: Instant,
75}
76
77impl Default for ResourceMonitor {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl ResourceMonitor {
84 pub fn new() -> Self {
86 Self::with_limits(ResourceLimits::default())
87 }
88
89 pub fn with_limits(limits: ResourceLimits) -> Self {
91 let semaphore = Arc::new(Semaphore::new(limits.max_concurrent_tasks));
92
93 Self {
94 limits,
95 active_tasks: Arc::new(AtomicUsize::new(0)),
96 total_spawned: Arc::new(AtomicUsize::new(0)),
97 total_rejected: Arc::new(AtomicUsize::new(0)),
98 peak_concurrent: Arc::new(AtomicUsize::new(0)),
99 task_semaphore: semaphore,
100 start_time: Instant::now(),
101 }
102 }
103
104 pub async fn try_acquire_task_permit(&self) -> Result<TaskPermit, ResourceExhausted> {
108 match self.task_semaphore.clone().try_acquire_owned() {
110 Ok(permit) => {
111 let active = self.active_tasks.fetch_add(1, Ordering::SeqCst) + 1;
113 self.total_spawned.fetch_add(1, Ordering::SeqCst);
114
115 let mut peak = self.peak_concurrent.load(Ordering::SeqCst);
117 while active > peak {
118 match self.peak_concurrent.compare_exchange_weak(
119 peak,
120 active,
121 Ordering::SeqCst,
122 Ordering::SeqCst,
123 ) {
124 Ok(_) => break,
125 Err(x) => peak = x,
126 }
127 }
128
129 if self.limits.log_warnings && active >= self.limits.task_warning_threshold {
131 warn!(
132 "High async task count: {}/{} ({}% of limit)",
133 active,
134 self.limits.max_concurrent_tasks,
135 (active as f64 / self.limits.max_concurrent_tasks as f64 * 100.0) as u32
136 );
137 }
138
139 Ok(TaskPermit {
140 _permit: permit,
141 active_tasks: self.active_tasks.clone(),
142 })
143 }
144 Err(_) => {
145 self.total_rejected.fetch_add(1, Ordering::SeqCst);
146
147 if self.limits.log_warnings {
148 error!(
149 "Task limit exceeded: {} active tasks (limit: {})",
150 self.active_tasks.load(Ordering::SeqCst),
151 self.limits.max_concurrent_tasks
152 );
153 }
154
155 if self.limits.reject_when_full {
156 Err(ResourceExhausted::TaskLimit(
157 self.limits.max_concurrent_tasks,
158 ))
159 } else {
160 let permit =
162 self.task_semaphore
163 .clone()
164 .acquire_owned()
165 .await
166 .map_err(|_| {
167 ResourceExhausted::TaskLimit(self.limits.max_concurrent_tasks)
168 })?;
169
170 let _active = self.active_tasks.fetch_add(1, Ordering::SeqCst) + 1;
171 self.total_spawned.fetch_add(1, Ordering::SeqCst);
172
173 Ok(TaskPermit {
174 _permit: permit,
175 active_tasks: self.active_tasks.clone(),
176 })
177 }
178 }
179 }
180 }
181
182 pub fn stats(&self) -> ResourceStats {
184 ResourceStats {
185 active_tasks: self.active_tasks.load(Ordering::SeqCst),
186 total_spawned: self.total_spawned.load(Ordering::SeqCst),
187 total_rejected: self.total_rejected.load(Ordering::SeqCst),
188 peak_concurrent: self.peak_concurrent.load(Ordering::SeqCst),
189 max_concurrent_tasks: self.limits.max_concurrent_tasks,
190 uptime: self.start_time.elapsed(),
191 }
192 }
193
194 pub fn reset_stats(&self) {
196 self.total_spawned.store(0, Ordering::SeqCst);
197 self.total_rejected.store(0, Ordering::SeqCst);
198 self.peak_concurrent
199 .store(self.active_tasks.load(Ordering::SeqCst), Ordering::SeqCst);
200 }
201
202 pub fn check_recursion_depth(&self, depth: usize) -> Result<(), ResourceExhausted> {
204 if depth > self.limits.max_recursion_depth {
205 if self.limits.log_warnings {
206 error!(
207 "Recursion depth limit exceeded: {} (limit: {})",
208 depth, self.limits.max_recursion_depth
209 );
210 }
211 Err(ResourceExhausted::RecursionDepth(
212 self.limits.max_recursion_depth,
213 ))
214 } else {
215 Ok(())
216 }
217 }
218}
219
220pub struct TaskPermit {
222 _permit: tokio::sync::OwnedSemaphorePermit,
223 active_tasks: Arc<AtomicUsize>,
224}
225
226impl Drop for TaskPermit {
227 fn drop(&mut self) {
228 self.active_tasks.fetch_sub(1, Ordering::SeqCst);
229 }
230}
231
232#[derive(Debug, thiserror::Error)]
234pub enum ResourceExhausted {
235 #[error("Async task limit exceeded (limit: {0})")]
237 TaskLimit(usize),
238
239 #[error("Recursion depth limit exceeded (limit: {0})")]
241 RecursionDepth(usize),
242
243 #[error("Memory limit exceeded")]
245 MemoryLimit,
246}
247
248#[derive(Debug, Clone)]
250pub struct ResourceStats {
251 pub active_tasks: usize,
253 pub total_spawned: usize,
255 pub total_rejected: usize,
257 pub peak_concurrent: usize,
259 pub max_concurrent_tasks: usize,
261 pub uptime: Duration,
263}
264
265impl ResourceStats {
266 pub fn display(&self) -> String {
268 format!(
269 "Resource Stats:\n\
270 - Active tasks: {}/{} ({}%)\n\
271 - Peak concurrent: {}\n\
272 - Total spawned: {}\n\
273 - Total rejected: {}\n\
274 - Uptime: {:?}",
275 self.active_tasks,
276 self.max_concurrent_tasks,
277 (self.active_tasks as f64 / self.max_concurrent_tasks as f64 * 100.0) as u32,
278 self.peak_concurrent,
279 self.total_spawned,
280 self.total_rejected,
281 self.uptime
282 )
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[tokio::test]
291 async fn test_task_limits() {
292 let monitor = ResourceMonitor::with_limits(ResourceLimits::default().with_max_tasks(2));
293
294 let permit1 = monitor.try_acquire_task_permit().await;
296 assert!(permit1.is_ok());
297
298 let permit2 = monitor.try_acquire_task_permit().await;
299 assert!(permit2.is_ok());
300
301 let permit3 = monitor.try_acquire_task_permit().await;
303 assert!(permit3.is_err());
304
305 drop(permit1);
307
308 tokio::time::sleep(Duration::from_millis(10)).await;
310 let permit4 = monitor.try_acquire_task_permit().await;
311 assert!(permit4.is_ok());
312 }
313
314 #[test]
315 fn test_recursion_limits() {
316 let monitor = ResourceMonitor::with_limits(ResourceLimits::default().with_max_recursion(5));
317
318 assert!(monitor.check_recursion_depth(3).is_ok());
319 assert!(monitor.check_recursion_depth(5).is_ok());
320 assert!(monitor.check_recursion_depth(6).is_err());
321 }
322}