adaptive_pipeline/infrastructure/runtime/resource_manager.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Global Resource Manager
9//!
10//! This module provides centralized resource governance across the entire
11//! application, preventing resource oversubscription when processing multiple
12//! files concurrently.
13//!
14//! ## Architecture Pattern: Two-Level Resource Governance
15//!
16//! **Problem:** Without global limits, multiple concurrent files can overwhelm
17//! the system:
18//! - 10 files × 8 workers/file = 80 concurrent tasks on an 8-core machine
19//! - Result: CPU oversubscription, cache thrashing, poor throughput
20//!
21//! **Solution:** Two-level coordination:
22//! 1. **Global limits** (this module) - Cap total system resources
23//! 2. **Local limits** (per-file semaphores) - Cap per-file concurrency
24//!
25//! ## Educational Example
26//!
27//! ```rust,ignore
28//! use adaptive_pipeline::infrastructure::runtime::RESOURCE_MANAGER;
29//!
30//! async fn process_file() -> Result<()> {
31//! // 1. Acquire global CPU token (waits if system is saturated)
32//! let _cpu_permit = RESOURCE_MANAGER.acquire_cpu().await?;
33//!
34//! // 2. Acquire local per-file token
35//! let _local_permit = file_semaphore.acquire().await?;
36//!
37//! // 3. Do CPU-intensive work
38//! compress_data().await?;
39//!
40//! // 4. Both permits released automatically (RAII)
41//! Ok(())
42//! }
43//! ```
44//!
45//! ## Resource Types
46//!
47//! ### CPU Tokens
48//! - **Purpose:** Limit total CPU-bound work across all files
49//! - **Default:** `available_cores - 1` (leave one for OS/I/O)
50//! - **Use:** Acquire before Rayon work or CPU-intensive operations
51//!
52//! ### I/O Tokens
53//! - **Purpose:** Prevent I/O queue overrun
54//! - **Default:** Device-specific (NVMe: 24, SSD: 12, HDD: 4)
55//! - **Use:** Acquire before file reads/writes
56//!
57//! ### Memory Tracking
58//! - **Purpose:** Monitor memory usage (gauge only, no enforcement yet)
59//! - **Default:** No limit (soft monitoring)
60//! - **Future:** Can add hard cap in Phase 3
61
62use adaptive_pipeline_domain::PipelineError;
63use std::sync::atomic::{AtomicUsize, Ordering};
64use std::sync::Arc;
65use tokio::sync::{Semaphore, SemaphorePermit};
66
67/// Storage device type for I/O queue depth optimization
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum StorageType {
70 /// NVMe SSD - High queue depth (24-32)
71 NVMe,
72 /// SATA SSD - Medium queue depth (8-16)
73 Ssd,
74 /// Hard Disk Drive - Low queue depth (2-4)
75 Hdd,
76 /// Auto-detect based on system
77 Auto,
78 /// Custom queue depth
79 Custom(usize),
80}
81
82/// Configuration for global resource manager
83#[derive(Debug, Clone)]
84pub struct ResourceConfig {
85 /// Number of CPU worker tokens (default: cores - 1)
86 pub cpu_tokens: Option<usize>,
87
88 /// Number of I/O tokens (default: device-specific)
89 pub io_tokens: Option<usize>,
90
91 /// Storage device type for I/O optimization
92 pub storage_type: StorageType,
93
94 /// Soft memory limit in bytes (gauge only, no enforcement)
95 pub memory_limit: Option<usize>,
96}
97
98impl Default for ResourceConfig {
99 fn default() -> Self {
100 Self {
101 cpu_tokens: None, // Will use cores - 1
102 io_tokens: None, // Will use device-specific
103 storage_type: StorageType::Auto,
104 memory_limit: None, // No limit by default
105 }
106 }
107}
108
109/// Global resource manager for system-wide resource coordination
110///
111/// ## Design Pattern: Centralized Resource Governance
112///
113/// This manager prevents resource oversubscription by providing a global
114/// pool of CPU and I/O tokens that must be acquired before work begins.
115///
116/// ## Educational Notes
117///
118/// **Why semaphores?**
119/// - Semaphores provide backpressure: work waits when resources are saturated
120/// - RAII permits auto-release resources on drop
121/// - Async-aware: integrates with Tokio runtime
122///
123/// **Why separate CPU and I/O tokens?**
124/// - CPU work and I/O work have different characteristics
125/// - CPU: Limited by cores, benefits from parallelism = cores
126/// - I/O: Limited by device queue depth, different optimal values
127///
128/// **Why memory as gauge only?**
129/// - Memory is harder to predict and control
130/// - Start with monitoring, add enforcement later if needed
131/// - Avoids complexity in Phase 1
132pub struct GlobalResourceManager {
133 /// CPU worker tokens (semaphore permits)
134 ///
135 /// **Purpose:** Prevent CPU oversubscription
136 /// **Typical value:** cores - 1
137 /// **Educational:** This is a "counting semaphore" that allows N concurrent
138 /// operations
139 cpu_tokens: Arc<Semaphore>,
140
141 /// I/O operation tokens (semaphore permits)
142 ///
143 /// **Purpose:** Prevent I/O queue overrun
144 /// **Typical value:** Device-specific (NVMe: 24, SSD: 12, HDD: 4)
145 /// **Educational:** Different devices have different optimal queue depths
146 io_tokens: Arc<Semaphore>,
147
148 /// Memory usage gauge (bytes)
149 ///
150 /// **Purpose:** Monitor memory pressure (no enforcement yet)
151 /// **Educational:** Start simple (gauge), add limits later (Phase 3)
152 memory_used: Arc<AtomicUsize>,
153
154 /// Total memory capacity for reporting
155 memory_capacity: usize,
156
157 /// Number of CPU tokens configured
158 cpu_token_count: usize,
159
160 /// Number of I/O tokens configured
161 io_token_count: usize,
162}
163
164impl GlobalResourceManager {
165 /// Creates a new global resource manager with the given configuration
166 ///
167 /// ## Educational: Resource Detection and Configuration
168 ///
169 /// This method demonstrates:
170 /// - Auto-detection of system resources (CPU cores)
171 /// - Device-specific I/O optimization
172 /// - Sensible defaults with override capability
173 ///
174 /// ## Examples
175 ///
176 /// ```rust,ignore
177 /// // Use defaults (auto-detected)
178 /// let manager = GlobalResourceManager::new(Default::default())?;
179 ///
180 /// // Custom configuration
181 /// let manager = GlobalResourceManager::new(ResourceConfig {
182 /// cpu_tokens: Some(6), // Override: use 6 CPU workers
183 /// storage_type: StorageType::NVMe,
184 /// ..Default::default()
185 /// })?;
186 /// ```
187 pub fn new(config: ResourceConfig) -> Result<Self, PipelineError> {
188 // Detect available CPU cores
189 let available_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4); // Conservative fallback
190
191 // Educational: Why cores - 1?
192 // Leave one core for OS, I/O threads, and system tasks
193 // Prevents complete CPU saturation which hurts overall system responsiveness
194 let cpu_token_count = config.cpu_tokens.unwrap_or_else(|| (available_cores - 1).max(1));
195
196 // Educational: Device-specific I/O queue depths
197 // Different storage devices have different optimal concurrency levels
198 let io_token_count = config
199 .io_tokens
200 .unwrap_or_else(|| Self::detect_optimal_io_tokens(config.storage_type));
201
202 // Educational: Memory capacity detection
203 // On most systems, we can query available RAM
204 // For now, use a conservative default if not specified
205 let memory_capacity = config.memory_limit.unwrap_or(40 * 1024 * 1024 * 1024); // 40GB default
206
207 Ok(Self {
208 cpu_tokens: Arc::new(Semaphore::new(cpu_token_count)),
209 io_tokens: Arc::new(Semaphore::new(io_token_count)),
210 memory_used: Arc::new(AtomicUsize::new(0)),
211 memory_capacity,
212 cpu_token_count,
213 io_token_count,
214 })
215 }
216
217 /// Detect optimal I/O token count based on storage type
218 ///
219 /// ## Educational: Device Characteristics
220 ///
221 /// **NVMe (24-32 tokens):**
222 /// - Multiple parallel channels
223 /// - Low latency, high throughput
224 /// - Benefits from high queue depth
225 ///
226 /// **SSD (8-16 tokens):**
227 /// - Medium parallelism
228 /// - Good random access
229 /// - Moderate queue depth optimal
230 ///
231 /// **HDD (2-4 tokens):**
232 /// - Sequential access preferred
233 /// - High seek latency
234 /// - Low queue depth prevents thrashing
235 fn detect_optimal_io_tokens(storage_type: StorageType) -> usize {
236 match storage_type {
237 StorageType::NVMe => 24,
238 StorageType::Ssd => 12,
239 StorageType::Hdd => 4,
240 StorageType::Auto => {
241 // Educational: Simple heuristic
242 // In production, would query device capabilities
243 // For now, assume SSD as reasonable default
244 12
245 }
246 StorageType::Custom(n) => n,
247 }
248 }
249
250 /// Acquire a CPU token (explicit style - pedagogical)
251 ///
252 /// ## Educational Pattern: Explicit Acquisition
253 ///
254 /// This method shows the explicit pattern where you:
255 /// 1. Call acquire
256 /// 2. Get back a permit
257 /// 3. Permit is held as long as the guard lives
258 /// 4. Permit is auto-released when dropped (RAII)
259 ///
260 /// ## Usage
261 ///
262 /// ```rust,ignore
263 /// let _cpu_permit = RESOURCE_MANAGER.acquire_cpu().await?;
264 /// // Do CPU work
265 /// // Permit auto-released here when _cpu_permit goes out of scope
266 /// ```
267 ///
268 /// ## Backpressure
269 ///
270 /// If all CPU tokens are in use, this method **waits** until one becomes
271 /// available. This creates natural backpressure and prevents
272 /// oversubscription.
273 pub async fn acquire_cpu(&self) -> Result<SemaphorePermit<'_>, PipelineError> {
274 self.cpu_tokens
275 .acquire()
276 .await
277 .map_err(|_| PipelineError::InternalError("CPU semaphore closed".to_string()))
278 }
279
280 /// Acquire an I/O token
281 ///
282 /// ## Educational: Same pattern as CPU tokens
283 ///
284 /// Uses the same semaphore pattern but for I/O operations.
285 /// Prevents too many concurrent I/O operations from overwhelming
286 /// the storage device.
287 ///
288 /// ## Usage
289 ///
290 /// ```rust,ignore
291 /// let _io_permit = RESOURCE_MANAGER.acquire_io().await?;
292 /// // Do I/O operation (read/write)
293 /// // Permit auto-released
294 /// ```
295 pub async fn acquire_io(&self) -> Result<SemaphorePermit<'_>, PipelineError> {
296 self.io_tokens
297 .acquire()
298 .await
299 .map_err(|_| PipelineError::InternalError("I/O semaphore closed".to_string()))
300 }
301
302 /// Track memory allocation (gauge only, no enforcement)
303 ///
304 /// ## Educational: Simple Atomic Counter
305 ///
306 /// Uses `Ordering::Relaxed` because:
307 /// - We only need atomicity (no torn reads/writes)
308 /// - No coordination with other atomic variables needed
309 /// - This is just a gauge for monitoring
310 ///
311 /// See atomic_ordering.rs for more on ordering choices.
312 pub fn allocate_memory(&self, bytes: usize) {
313 self.memory_used.fetch_add(bytes, Ordering::Relaxed);
314 }
315
316 /// Track memory deallocation
317 pub fn deallocate_memory(&self, bytes: usize) {
318 self.memory_used.fetch_sub(bytes, Ordering::Relaxed);
319 }
320
321 /// Get current memory usage
322 pub fn memory_used(&self) -> usize {
323 self.memory_used.load(Ordering::Relaxed)
324 }
325
326 /// Get memory capacity
327 pub fn memory_capacity(&self) -> usize {
328 self.memory_capacity
329 }
330
331 /// Get number of available CPU tokens
332 ///
333 /// ## Educational: Observability
334 ///
335 /// This method provides visibility into resource saturation.
336 /// If available_permits() is consistently 0, you're CPU-saturated.
337 pub fn cpu_tokens_available(&self) -> usize {
338 self.cpu_tokens.available_permits()
339 }
340
341 /// Get total number of CPU tokens
342 pub fn cpu_tokens_total(&self) -> usize {
343 self.cpu_token_count
344 }
345
346 /// Get number of available I/O tokens
347 pub fn io_tokens_available(&self) -> usize {
348 self.io_tokens.available_permits()
349 }
350
351 /// Get total number of I/O tokens
352 pub fn io_tokens_total(&self) -> usize {
353 self.io_token_count
354 }
355}
356
357/// Global singleton instance of the resource manager
358///
359/// ## Educational: Singleton Pattern with Configuration
360///
361/// Uses `OnceLock` for:
362/// - Thread-safe one-time initialization
363/// - Allows custom configuration from CLI
364/// - Initialized exactly once via `init_resource_manager()`
365/// - Static lifetime for global access
366///
367/// ## Usage
368///
369/// ```rust,ignore
370/// use adaptive_pipeline::infrastructure::runtime::{init_resource_manager, ResourceConfig};
371///
372/// // In main(), before any operations:
373/// init_resource_manager(ResourceConfig::default())?;
374///
375/// // Later, anywhere in code:
376/// async fn my_function() {
377/// let _permit = RESOURCE_MANAGER.acquire_cpu().await?;
378/// // ...
379/// }
380/// ```
381static RESOURCE_MANAGER_CELL: std::sync::OnceLock<GlobalResourceManager> = std::sync::OnceLock::new();
382
383/// Initialize the global resource manager with custom configuration
384///
385/// ## Educational: Explicit Initialization Pattern
386///
387/// This must be called exactly once, early in main(), before any code
388/// accesses RESOURCE_MANAGER. Subsequent calls will return an error.
389///
390/// ## Why This Pattern?
391///
392/// - Allows CLI flags to configure resource limits
393/// - Makes initialization explicit and debuggable
394/// - Avoids "lazy initialization with hidden defaults"
395/// - Better testability (each test can configure differently)
396///
397/// ## Errors
398///
399/// Returns error if:
400/// - Already initialized (called twice)
401/// - Configuration is invalid (e.g., 0 CPU threads)
402pub fn init_resource_manager(config: ResourceConfig) -> Result<(), String> {
403 let manager =
404 GlobalResourceManager::new(config).map_err(|e| format!("Failed to create resource manager: {}", e))?;
405
406 RESOURCE_MANAGER_CELL
407 .set(manager)
408 .map_err(|_| "Resource manager already initialized".to_string())
409}
410
411/// Access the global resource manager
412///
413/// ## Panics
414///
415/// Panics if called before `init_resource_manager()`. This is intentional -
416/// using the resource manager before initialization is a programming error.
417#[allow(clippy::expect_used)]
418pub fn resource_manager() -> &'static GlobalResourceManager {
419 RESOURCE_MANAGER_CELL
420 .get()
421 .expect("Resource manager not initialized! Call init_resource_manager() in main().")
422}
423
424/// Legacy alias for backward compatibility
425///
426/// **Pattern**: Both `RESOURCE_MANAGER` (static) and `resource_manager()`
427/// (function) are supported. New code should prefer the function style for
428/// consistency.
429#[allow(non_upper_case_globals)]
430pub static RESOURCE_MANAGER: std::sync::LazyLock<&'static GlobalResourceManager> =
431 std::sync::LazyLock::new(resource_manager);
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_resource_manager_creation() {
439 let manager = GlobalResourceManager::new(ResourceConfig::default()).unwrap();
440
441 // Should have at least 1 CPU token
442 assert!(manager.cpu_tokens_total() >= 1);
443
444 // Should have I/O tokens
445 assert!(manager.io_tokens_total() > 0);
446
447 // Initially all tokens available
448 assert_eq!(manager.cpu_tokens_available(), manager.cpu_tokens_total());
449 assert_eq!(manager.io_tokens_available(), manager.io_tokens_total());
450 }
451
452 #[test]
453 fn test_device_type_queue_depths() {
454 let nvme_qd = GlobalResourceManager::detect_optimal_io_tokens(StorageType::NVMe);
455 let ssd_qd = GlobalResourceManager::detect_optimal_io_tokens(StorageType::Ssd);
456 let hdd_qd = GlobalResourceManager::detect_optimal_io_tokens(StorageType::Hdd);
457
458 // NVMe should have highest queue depth
459 assert!(nvme_qd > ssd_qd);
460 assert!(ssd_qd > hdd_qd);
461
462 // Specific values
463 assert_eq!(nvme_qd, 24);
464 assert_eq!(ssd_qd, 12);
465 assert_eq!(hdd_qd, 4);
466 }
467
468 #[tokio::test]
469 async fn test_cpu_token_acquisition() {
470 let manager = GlobalResourceManager::new(ResourceConfig {
471 cpu_tokens: Some(2),
472 ..Default::default()
473 })
474 .unwrap();
475
476 // Initially 2 available
477 assert_eq!(manager.cpu_tokens_available(), 2);
478
479 // Acquire one
480 let _permit1 = manager.acquire_cpu().await.unwrap();
481 assert_eq!(manager.cpu_tokens_available(), 1);
482
483 // Acquire another
484 let _permit2 = manager.acquire_cpu().await.unwrap();
485 assert_eq!(manager.cpu_tokens_available(), 0);
486
487 // Drop first permit
488 drop(_permit1);
489 assert_eq!(manager.cpu_tokens_available(), 1);
490 }
491
492 #[tokio::test]
493 async fn test_io_token_acquisition() {
494 let manager = GlobalResourceManager::new(ResourceConfig {
495 io_tokens: Some(4),
496 ..Default::default()
497 })
498 .unwrap();
499
500 assert_eq!(manager.io_tokens_available(), 4);
501
502 let _permit = manager.acquire_io().await.unwrap();
503 assert_eq!(manager.io_tokens_available(), 3);
504 }
505
506 #[test]
507 fn test_memory_tracking() {
508 let manager = GlobalResourceManager::new(ResourceConfig::default()).unwrap();
509
510 assert_eq!(manager.memory_used(), 0);
511
512 manager.allocate_memory(1000);
513 assert_eq!(manager.memory_used(), 1000);
514
515 manager.allocate_memory(500);
516 assert_eq!(manager.memory_used(), 1500);
517
518 manager.deallocate_memory(700);
519 assert_eq!(manager.memory_used(), 800);
520 }
521
522 #[test]
523 fn test_global_singleton_access() {
524 // Initialize the global resource manager for this test
525 // Each test runs in its own process, so this is safe
526 let _ = init_resource_manager(ResourceConfig::default());
527
528 // Should be able to access the global instance after initialization
529 let rm = resource_manager();
530 let available = rm.cpu_tokens_available();
531 assert!(available > 0);
532
533 // Also test backward-compatible RESOURCE_MANAGER accessor
534 let available2 = RESOURCE_MANAGER.cpu_tokens_available();
535 assert_eq!(available, available2);
536 }
537}