hojicha_runtime/
resource_limits.rs

1//! Resource limits and monitoring for async task execution
2//!
3//! This module provides configurable limits and monitoring for system resources
4//! to prevent exhaustion attacks and runaway resource consumption.
5
6use log::{error, warn};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::Semaphore;
11
12/// Configuration for resource limits
13#[derive(Debug, Clone)]
14pub struct ResourceLimits {
15    /// Maximum number of concurrent async tasks (default: 1000)
16    pub max_concurrent_tasks: usize,
17    /// Maximum recursion depth for commands (default: 100)
18    pub max_recursion_depth: usize,
19    /// Warning threshold for concurrent tasks (default: 80% of max)
20    pub task_warning_threshold: usize,
21    /// Whether to log resource warnings (default: true)
22    pub log_warnings: bool,
23    /// Whether to reject new tasks when at limit (default: true)
24    pub reject_when_full: bool,
25}
26
27impl Default for ResourceLimits {
28    fn default() -> Self {
29        let max_tasks = 1000;
30        Self {
31            max_concurrent_tasks: max_tasks,
32            max_recursion_depth: 100,
33            task_warning_threshold: (max_tasks as f64 * 0.8) as usize,
34            log_warnings: true,
35            reject_when_full: true,
36        }
37    }
38}
39
40impl ResourceLimits {
41    /// Create limits with a specific max concurrent tasks
42    pub fn with_max_tasks(mut self, max: usize) -> Self {
43        self.max_concurrent_tasks = max;
44        self.task_warning_threshold = (max as f64 * 0.8) as usize;
45        self
46    }
47
48    /// Set the recursion depth limit
49    pub fn with_max_recursion(mut self, depth: usize) -> Self {
50        self.max_recursion_depth = depth;
51        self
52    }
53
54    /// Disable all limits (use with caution!)
55    pub fn unlimited() -> Self {
56        Self {
57            max_concurrent_tasks: usize::MAX,
58            max_recursion_depth: usize::MAX,
59            task_warning_threshold: usize::MAX,
60            log_warnings: false,
61            reject_when_full: false,
62        }
63    }
64}
65
66/// Monitors and enforces resource limits
67pub struct ResourceMonitor {
68    limits: ResourceLimits,
69    active_tasks: Arc<AtomicUsize>,
70    total_spawned: Arc<AtomicUsize>,
71    total_rejected: Arc<AtomicUsize>,
72    peak_concurrent: Arc<AtomicUsize>,
73    task_semaphore: Arc<Semaphore>,
74    start_time: Instant,
75}
76
77impl Default for ResourceMonitor {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl ResourceMonitor {
84    /// Create a new resource monitor with default limits
85    pub fn new() -> Self {
86        Self::with_limits(ResourceLimits::default())
87    }
88
89    /// Create a new resource monitor with custom limits
90    pub fn with_limits(limits: ResourceLimits) -> Self {
91        let semaphore = Arc::new(Semaphore::new(limits.max_concurrent_tasks));
92
93        Self {
94            limits,
95            active_tasks: Arc::new(AtomicUsize::new(0)),
96            total_spawned: Arc::new(AtomicUsize::new(0)),
97            total_rejected: Arc::new(AtomicUsize::new(0)),
98            peak_concurrent: Arc::new(AtomicUsize::new(0)),
99            task_semaphore: semaphore,
100            start_time: Instant::now(),
101        }
102    }
103
104    /// Try to acquire a permit to spawn a new task
105    ///
106    /// Returns Ok(permit) if under limits, Err if at capacity
107    pub async fn try_acquire_task_permit(&self) -> Result<TaskPermit, ResourceExhausted> {
108        // Try to acquire semaphore permit
109        match self.task_semaphore.clone().try_acquire_owned() {
110            Ok(permit) => {
111                // Update counters
112                let active = self.active_tasks.fetch_add(1, Ordering::SeqCst) + 1;
113                self.total_spawned.fetch_add(1, Ordering::SeqCst);
114
115                // Update peak if needed
116                let mut peak = self.peak_concurrent.load(Ordering::SeqCst);
117                while active > peak {
118                    match self.peak_concurrent.compare_exchange_weak(
119                        peak,
120                        active,
121                        Ordering::SeqCst,
122                        Ordering::SeqCst,
123                    ) {
124                        Ok(_) => break,
125                        Err(x) => peak = x,
126                    }
127                }
128
129                // Log warning if approaching limit
130                if self.limits.log_warnings && active >= self.limits.task_warning_threshold {
131                    warn!(
132                        "High async task count: {}/{} ({}% of limit)",
133                        active,
134                        self.limits.max_concurrent_tasks,
135                        (active as f64 / self.limits.max_concurrent_tasks as f64 * 100.0) as u32
136                    );
137                }
138
139                Ok(TaskPermit {
140                    _permit: permit,
141                    active_tasks: self.active_tasks.clone(),
142                })
143            }
144            Err(_) => {
145                self.total_rejected.fetch_add(1, Ordering::SeqCst);
146
147                if self.limits.log_warnings {
148                    error!(
149                        "Task limit exceeded: {} active tasks (limit: {})",
150                        self.active_tasks.load(Ordering::SeqCst),
151                        self.limits.max_concurrent_tasks
152                    );
153                }
154
155                if self.limits.reject_when_full {
156                    Err(ResourceExhausted::TaskLimit(
157                        self.limits.max_concurrent_tasks,
158                    ))
159                } else {
160                    // If not rejecting, wait for a permit
161                    let permit =
162                        self.task_semaphore
163                            .clone()
164                            .acquire_owned()
165                            .await
166                            .map_err(|_| {
167                                ResourceExhausted::TaskLimit(self.limits.max_concurrent_tasks)
168                            })?;
169
170                    let _active = self.active_tasks.fetch_add(1, Ordering::SeqCst) + 1;
171                    self.total_spawned.fetch_add(1, Ordering::SeqCst);
172
173                    Ok(TaskPermit {
174                        _permit: permit,
175                        active_tasks: self.active_tasks.clone(),
176                    })
177                }
178            }
179        }
180    }
181
182    /// Get current resource statistics
183    pub fn stats(&self) -> ResourceStats {
184        ResourceStats {
185            active_tasks: self.active_tasks.load(Ordering::SeqCst),
186            total_spawned: self.total_spawned.load(Ordering::SeqCst),
187            total_rejected: self.total_rejected.load(Ordering::SeqCst),
188            peak_concurrent: self.peak_concurrent.load(Ordering::SeqCst),
189            max_concurrent_tasks: self.limits.max_concurrent_tasks,
190            uptime: self.start_time.elapsed(),
191        }
192    }
193
194    /// Reset statistics (keeps limits)
195    pub fn reset_stats(&self) {
196        self.total_spawned.store(0, Ordering::SeqCst);
197        self.total_rejected.store(0, Ordering::SeqCst);
198        self.peak_concurrent
199            .store(self.active_tasks.load(Ordering::SeqCst), Ordering::SeqCst);
200    }
201
202    /// Check if recursion depth exceeds limit
203    pub fn check_recursion_depth(&self, depth: usize) -> Result<(), ResourceExhausted> {
204        if depth > self.limits.max_recursion_depth {
205            if self.limits.log_warnings {
206                error!(
207                    "Recursion depth limit exceeded: {} (limit: {})",
208                    depth, self.limits.max_recursion_depth
209                );
210            }
211            Err(ResourceExhausted::RecursionDepth(
212                self.limits.max_recursion_depth,
213            ))
214        } else {
215            Ok(())
216        }
217    }
218}
219
220/// A permit to spawn an async task
221pub struct TaskPermit {
222    _permit: tokio::sync::OwnedSemaphorePermit,
223    active_tasks: Arc<AtomicUsize>,
224}
225
226impl Drop for TaskPermit {
227    fn drop(&mut self) {
228        self.active_tasks.fetch_sub(1, Ordering::SeqCst);
229    }
230}
231
232/// Resource exhaustion error
233#[derive(Debug, thiserror::Error)]
234pub enum ResourceExhausted {
235    /// Async task limit was exceeded
236    #[error("Async task limit exceeded (limit: {0})")]
237    TaskLimit(usize),
238
239    /// Recursion depth limit was exceeded
240    #[error("Recursion depth limit exceeded (limit: {0})")]
241    RecursionDepth(usize),
242
243    /// Memory usage limit was exceeded
244    #[error("Memory limit exceeded")]
245    MemoryLimit,
246}
247
248/// Resource usage statistics
249#[derive(Debug, Clone)]
250pub struct ResourceStats {
251    /// Currently active async tasks
252    pub active_tasks: usize,
253    /// Total tasks spawned since start
254    pub total_spawned: usize,
255    /// Total tasks rejected due to limits
256    pub total_rejected: usize,
257    /// Peak concurrent tasks seen
258    pub peak_concurrent: usize,
259    /// Configured maximum concurrent tasks
260    pub max_concurrent_tasks: usize,
261    /// Time since monitor started
262    pub uptime: Duration,
263}
264
265impl ResourceStats {
266    /// Display statistics as a formatted string
267    pub fn display(&self) -> String {
268        format!(
269            "Resource Stats:\n\
270             - Active tasks: {}/{} ({}%)\n\
271             - Peak concurrent: {}\n\
272             - Total spawned: {}\n\
273             - Total rejected: {}\n\
274             - Uptime: {:?}",
275            self.active_tasks,
276            self.max_concurrent_tasks,
277            (self.active_tasks as f64 / self.max_concurrent_tasks as f64 * 100.0) as u32,
278            self.peak_concurrent,
279            self.total_spawned,
280            self.total_rejected,
281            self.uptime
282        )
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[tokio::test]
291    async fn test_task_limits() {
292        let monitor = ResourceMonitor::with_limits(ResourceLimits::default().with_max_tasks(2));
293
294        // Should succeed for first two tasks
295        let permit1 = monitor.try_acquire_task_permit().await;
296        assert!(permit1.is_ok());
297
298        let permit2 = monitor.try_acquire_task_permit().await;
299        assert!(permit2.is_ok());
300
301        // Third should fail
302        let permit3 = monitor.try_acquire_task_permit().await;
303        assert!(permit3.is_err());
304
305        // Drop one permit
306        drop(permit1);
307
308        // Now should succeed again
309        tokio::time::sleep(Duration::from_millis(10)).await;
310        let permit4 = monitor.try_acquire_task_permit().await;
311        assert!(permit4.is_ok());
312    }
313
314    #[test]
315    fn test_recursion_limits() {
316        let monitor = ResourceMonitor::with_limits(ResourceLimits::default().with_max_recursion(5));
317
318        assert!(monitor.check_recursion_depth(3).is_ok());
319        assert!(monitor.check_recursion_depth(5).is_ok());
320        assert!(monitor.check_recursion_depth(6).is_err());
321    }
322}