adaptive_pipeline/infrastructure/runtime/
resource_manager.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Global Resource Manager
9//!
10//! This module provides centralized resource governance across the entire
11//! application, preventing resource oversubscription when processing multiple
12//! files concurrently.
13//!
14//! ## Architecture Pattern: Two-Level Resource Governance
15//!
16//! **Problem:** Without global limits, multiple concurrent files can overwhelm
17//! the system:
18//! - 10 files × 8 workers/file = 80 concurrent tasks on an 8-core machine
19//! - Result: CPU oversubscription, cache thrashing, poor throughput
20//!
21//! **Solution:** Two-level coordination:
22//! 1. **Global limits** (this module) - Cap total system resources
23//! 2. **Local limits** (per-file semaphores) - Cap per-file concurrency
24//!
25//! ## Educational Example
26//!
27//! ```rust,ignore
28//! use adaptive_pipeline::infrastructure::runtime::RESOURCE_MANAGER;
29//!
30//! async fn process_file() -> Result<()> {
31//!     // 1. Acquire global CPU token (waits if system is saturated)
32//!     let _cpu_permit = RESOURCE_MANAGER.acquire_cpu().await?;
33//!
34//!     // 2. Acquire local per-file token
35//!     let _local_permit = file_semaphore.acquire().await?;
36//!
37//!     // 3. Do CPU-intensive work
38//!     compress_data().await?;
39//!
40//!     // 4. Both permits released automatically (RAII)
41//!     Ok(())
42//! }
43//! ```
44//!
45//! ## Resource Types
46//!
47//! ### CPU Tokens
48//! - **Purpose:** Limit total CPU-bound work across all files
49//! - **Default:** `available_cores - 1` (leave one for OS/I/O)
50//! - **Use:** Acquire before Rayon work or CPU-intensive operations
51//!
52//! ### I/O Tokens
53//! - **Purpose:** Prevent I/O queue overrun
54//! - **Default:** Device-specific (NVMe: 24, SSD: 12, HDD: 4)
55//! - **Use:** Acquire before file reads/writes
56//!
57//! ### Memory Tracking
58//! - **Purpose:** Monitor memory usage (gauge only, no enforcement yet)
59//! - **Default:** No limit (soft monitoring)
60//! - **Future:** Can add hard cap in Phase 3
61
62use adaptive_pipeline_domain::PipelineError;
63use std::sync::atomic::{AtomicUsize, Ordering};
64use std::sync::Arc;
65use tokio::sync::{Semaphore, SemaphorePermit};
66
67/// Storage device type for I/O queue depth optimization
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum StorageType {
70    /// NVMe SSD - High queue depth (24-32)
71    NVMe,
72    /// SATA SSD - Medium queue depth (8-16)
73    Ssd,
74    /// Hard Disk Drive - Low queue depth (2-4)
75    Hdd,
76    /// Auto-detect based on system
77    Auto,
78    /// Custom queue depth
79    Custom(usize),
80}
81
82/// Configuration for global resource manager
83#[derive(Debug, Clone)]
84pub struct ResourceConfig {
85    /// Number of CPU worker tokens (default: cores - 1)
86    pub cpu_tokens: Option<usize>,
87
88    /// Number of I/O tokens (default: device-specific)
89    pub io_tokens: Option<usize>,
90
91    /// Storage device type for I/O optimization
92    pub storage_type: StorageType,
93
94    /// Soft memory limit in bytes (gauge only, no enforcement)
95    pub memory_limit: Option<usize>,
96}
97
98impl Default for ResourceConfig {
99    fn default() -> Self {
100        Self {
101            cpu_tokens: None, // Will use cores - 1
102            io_tokens: None,  // Will use device-specific
103            storage_type: StorageType::Auto,
104            memory_limit: None, // No limit by default
105        }
106    }
107}
108
109/// Global resource manager for system-wide resource coordination
110///
111/// ## Design Pattern: Centralized Resource Governance
112///
113/// This manager prevents resource oversubscription by providing a global
114/// pool of CPU and I/O tokens that must be acquired before work begins.
115///
116/// ## Educational Notes
117///
118/// **Why semaphores?**
119/// - Semaphores provide backpressure: work waits when resources are saturated
120/// - RAII permits auto-release resources on drop
121/// - Async-aware: integrates with Tokio runtime
122///
123/// **Why separate CPU and I/O tokens?**
124/// - CPU work and I/O work have different characteristics
125/// - CPU: Limited by cores, benefits from parallelism = cores
126/// - I/O: Limited by device queue depth, different optimal values
127///
128/// **Why memory as gauge only?**
129/// - Memory is harder to predict and control
130/// - Start with monitoring, add enforcement later if needed
131/// - Avoids complexity in Phase 1
132pub struct GlobalResourceManager {
133    /// CPU worker tokens (semaphore permits)
134    ///
135    /// **Purpose:** Prevent CPU oversubscription
136    /// **Typical value:** cores - 1
137    /// **Educational:** This is a "counting semaphore" that allows N concurrent
138    /// operations
139    cpu_tokens: Arc<Semaphore>,
140
141    /// I/O operation tokens (semaphore permits)
142    ///
143    /// **Purpose:** Prevent I/O queue overrun
144    /// **Typical value:** Device-specific (NVMe: 24, SSD: 12, HDD: 4)
145    /// **Educational:** Different devices have different optimal queue depths
146    io_tokens: Arc<Semaphore>,
147
148    /// Memory usage gauge (bytes)
149    ///
150    /// **Purpose:** Monitor memory pressure (no enforcement yet)
151    /// **Educational:** Start simple (gauge), add limits later (Phase 3)
152    memory_used: Arc<AtomicUsize>,
153
154    /// Total memory capacity for reporting
155    memory_capacity: usize,
156
157    /// Number of CPU tokens configured
158    cpu_token_count: usize,
159
160    /// Number of I/O tokens configured
161    io_token_count: usize,
162}
163
164impl GlobalResourceManager {
165    /// Creates a new global resource manager with the given configuration
166    ///
167    /// ## Educational: Resource Detection and Configuration
168    ///
169    /// This method demonstrates:
170    /// - Auto-detection of system resources (CPU cores)
171    /// - Device-specific I/O optimization
172    /// - Sensible defaults with override capability
173    ///
174    /// ## Examples
175    ///
176    /// ```rust,ignore
177    /// // Use defaults (auto-detected)
178    /// let manager = GlobalResourceManager::new(Default::default())?;
179    ///
180    /// // Custom configuration
181    /// let manager = GlobalResourceManager::new(ResourceConfig {
182    ///     cpu_tokens: Some(6),  // Override: use 6 CPU workers
183    ///     storage_type: StorageType::NVMe,
184    ///     ..Default::default()
185    /// })?;
186    /// ```
187    pub fn new(config: ResourceConfig) -> Result<Self, PipelineError> {
188        // Detect available CPU cores
189        let available_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4); // Conservative fallback
190
191        // Educational: Why cores - 1?
192        // Leave one core for OS, I/O threads, and system tasks
193        // Prevents complete CPU saturation which hurts overall system responsiveness
194        let cpu_token_count = config.cpu_tokens.unwrap_or_else(|| (available_cores - 1).max(1));
195
196        // Educational: Device-specific I/O queue depths
197        // Different storage devices have different optimal concurrency levels
198        let io_token_count = config
199            .io_tokens
200            .unwrap_or_else(|| Self::detect_optimal_io_tokens(config.storage_type));
201
202        // Educational: Memory capacity detection
203        // On most systems, we can query available RAM
204        // For now, use a conservative default if not specified
205        let memory_capacity = config.memory_limit.unwrap_or(40 * 1024 * 1024 * 1024); // 40GB default
206
207        Ok(Self {
208            cpu_tokens: Arc::new(Semaphore::new(cpu_token_count)),
209            io_tokens: Arc::new(Semaphore::new(io_token_count)),
210            memory_used: Arc::new(AtomicUsize::new(0)),
211            memory_capacity,
212            cpu_token_count,
213            io_token_count,
214        })
215    }
216
217    /// Detect optimal I/O token count based on storage type
218    ///
219    /// ## Educational: Device Characteristics
220    ///
221    /// **NVMe (24-32 tokens):**
222    /// - Multiple parallel channels
223    /// - Low latency, high throughput
224    /// - Benefits from high queue depth
225    ///
226    /// **SSD (8-16 tokens):**
227    /// - Medium parallelism
228    /// - Good random access
229    /// - Moderate queue depth optimal
230    ///
231    /// **HDD (2-4 tokens):**
232    /// - Sequential access preferred
233    /// - High seek latency
234    /// - Low queue depth prevents thrashing
235    fn detect_optimal_io_tokens(storage_type: StorageType) -> usize {
236        match storage_type {
237            StorageType::NVMe => 24,
238            StorageType::Ssd => 12,
239            StorageType::Hdd => 4,
240            StorageType::Auto => {
241                // Educational: Simple heuristic
242                // In production, would query device capabilities
243                // For now, assume SSD as reasonable default
244                12
245            }
246            StorageType::Custom(n) => n,
247        }
248    }
249
250    /// Acquire a CPU token (explicit style - pedagogical)
251    ///
252    /// ## Educational Pattern: Explicit Acquisition
253    ///
254    /// This method shows the explicit pattern where you:
255    /// 1. Call acquire
256    /// 2. Get back a permit
257    /// 3. Permit is held as long as the guard lives
258    /// 4. Permit is auto-released when dropped (RAII)
259    ///
260    /// ## Usage
261    ///
262    /// ```rust,ignore
263    /// let _cpu_permit = RESOURCE_MANAGER.acquire_cpu().await?;
264    /// // Do CPU work
265    /// // Permit auto-released here when _cpu_permit goes out of scope
266    /// ```
267    ///
268    /// ## Backpressure
269    ///
270    /// If all CPU tokens are in use, this method **waits** until one becomes
271    /// available. This creates natural backpressure and prevents
272    /// oversubscription.
273    pub async fn acquire_cpu(&self) -> Result<SemaphorePermit<'_>, PipelineError> {
274        self.cpu_tokens
275            .acquire()
276            .await
277            .map_err(|_| PipelineError::InternalError("CPU semaphore closed".to_string()))
278    }
279
280    /// Acquire an I/O token
281    ///
282    /// ## Educational: Same pattern as CPU tokens
283    ///
284    /// Uses the same semaphore pattern but for I/O operations.
285    /// Prevents too many concurrent I/O operations from overwhelming
286    /// the storage device.
287    ///
288    /// ## Usage
289    ///
290    /// ```rust,ignore
291    /// let _io_permit = RESOURCE_MANAGER.acquire_io().await?;
292    /// // Do I/O operation (read/write)
293    /// // Permit auto-released
294    /// ```
295    pub async fn acquire_io(&self) -> Result<SemaphorePermit<'_>, PipelineError> {
296        self.io_tokens
297            .acquire()
298            .await
299            .map_err(|_| PipelineError::InternalError("I/O semaphore closed".to_string()))
300    }
301
302    /// Track memory allocation (gauge only, no enforcement)
303    ///
304    /// ## Educational: Simple Atomic Counter
305    ///
306    /// Uses `Ordering::Relaxed` because:
307    /// - We only need atomicity (no torn reads/writes)
308    /// - No coordination with other atomic variables needed
309    /// - This is just a gauge for monitoring
310    ///
311    /// See atomic_ordering.rs for more on ordering choices.
312    pub fn allocate_memory(&self, bytes: usize) {
313        self.memory_used.fetch_add(bytes, Ordering::Relaxed);
314    }
315
316    /// Track memory deallocation
317    pub fn deallocate_memory(&self, bytes: usize) {
318        self.memory_used.fetch_sub(bytes, Ordering::Relaxed);
319    }
320
321    /// Get current memory usage
322    pub fn memory_used(&self) -> usize {
323        self.memory_used.load(Ordering::Relaxed)
324    }
325
326    /// Get memory capacity
327    pub fn memory_capacity(&self) -> usize {
328        self.memory_capacity
329    }
330
331    /// Get number of available CPU tokens
332    ///
333    /// ## Educational: Observability
334    ///
335    /// This method provides visibility into resource saturation.
336    /// If available_permits() is consistently 0, you're CPU-saturated.
337    pub fn cpu_tokens_available(&self) -> usize {
338        self.cpu_tokens.available_permits()
339    }
340
341    /// Get total number of CPU tokens
342    pub fn cpu_tokens_total(&self) -> usize {
343        self.cpu_token_count
344    }
345
346    /// Get number of available I/O tokens
347    pub fn io_tokens_available(&self) -> usize {
348        self.io_tokens.available_permits()
349    }
350
351    /// Get total number of I/O tokens
352    pub fn io_tokens_total(&self) -> usize {
353        self.io_token_count
354    }
355}
356
357/// Global singleton instance of the resource manager
358///
359/// ## Educational: Singleton Pattern with Configuration
360///
361/// Uses `OnceLock` for:
362/// - Thread-safe one-time initialization
363/// - Allows custom configuration from CLI
364/// - Initialized exactly once via `init_resource_manager()`
365/// - Static lifetime for global access
366///
367/// ## Usage
368///
369/// ```rust,ignore
370/// use adaptive_pipeline::infrastructure::runtime::{init_resource_manager, ResourceConfig};
371///
372/// // In main(), before any operations:
373/// init_resource_manager(ResourceConfig::default())?;
374///
375/// // Later, anywhere in code:
376/// async fn my_function() {
377///     let _permit = RESOURCE_MANAGER.acquire_cpu().await?;
378///     // ...
379/// }
380/// ```
381static RESOURCE_MANAGER_CELL: std::sync::OnceLock<GlobalResourceManager> = std::sync::OnceLock::new();
382
383/// Initialize the global resource manager with custom configuration
384///
385/// ## Educational: Explicit Initialization Pattern
386///
387/// This must be called exactly once, early in main(), before any code
388/// accesses RESOURCE_MANAGER. Subsequent calls will return an error.
389///
390/// ## Why This Pattern?
391///
392/// - Allows CLI flags to configure resource limits
393/// - Makes initialization explicit and debuggable
394/// - Avoids "lazy initialization with hidden defaults"
395/// - Better testability (each test can configure differently)
396///
397/// ## Errors
398///
399/// Returns error if:
400/// - Already initialized (called twice)
401/// - Configuration is invalid (e.g., 0 CPU threads)
402pub fn init_resource_manager(config: ResourceConfig) -> Result<(), String> {
403    let manager =
404        GlobalResourceManager::new(config).map_err(|e| format!("Failed to create resource manager: {}", e))?;
405
406    RESOURCE_MANAGER_CELL
407        .set(manager)
408        .map_err(|_| "Resource manager already initialized".to_string())
409}
410
411/// Access the global resource manager
412///
413/// ## Panics
414///
415/// Panics if called before `init_resource_manager()`. This is intentional -
416/// using the resource manager before initialization is a programming error.
417#[allow(clippy::expect_used)]
418pub fn resource_manager() -> &'static GlobalResourceManager {
419    RESOURCE_MANAGER_CELL
420        .get()
421        .expect("Resource manager not initialized! Call init_resource_manager() in main().")
422}
423
424/// Legacy alias for backward compatibility
425///
426/// **Pattern**: Both `RESOURCE_MANAGER` (static) and `resource_manager()`
427/// (function) are supported. New code should prefer the function style for
428/// consistency.
429#[allow(non_upper_case_globals)]
430pub static RESOURCE_MANAGER: std::sync::LazyLock<&'static GlobalResourceManager> =
431    std::sync::LazyLock::new(resource_manager);
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn test_resource_manager_creation() {
439        let manager = GlobalResourceManager::new(ResourceConfig::default()).unwrap();
440
441        // Should have at least 1 CPU token
442        assert!(manager.cpu_tokens_total() >= 1);
443
444        // Should have I/O tokens
445        assert!(manager.io_tokens_total() > 0);
446
447        // Initially all tokens available
448        assert_eq!(manager.cpu_tokens_available(), manager.cpu_tokens_total());
449        assert_eq!(manager.io_tokens_available(), manager.io_tokens_total());
450    }
451
452    #[test]
453    fn test_device_type_queue_depths() {
454        let nvme_qd = GlobalResourceManager::detect_optimal_io_tokens(StorageType::NVMe);
455        let ssd_qd = GlobalResourceManager::detect_optimal_io_tokens(StorageType::Ssd);
456        let hdd_qd = GlobalResourceManager::detect_optimal_io_tokens(StorageType::Hdd);
457
458        // NVMe should have highest queue depth
459        assert!(nvme_qd > ssd_qd);
460        assert!(ssd_qd > hdd_qd);
461
462        // Specific values
463        assert_eq!(nvme_qd, 24);
464        assert_eq!(ssd_qd, 12);
465        assert_eq!(hdd_qd, 4);
466    }
467
468    #[tokio::test]
469    async fn test_cpu_token_acquisition() {
470        let manager = GlobalResourceManager::new(ResourceConfig {
471            cpu_tokens: Some(2),
472            ..Default::default()
473        })
474        .unwrap();
475
476        // Initially 2 available
477        assert_eq!(manager.cpu_tokens_available(), 2);
478
479        // Acquire one
480        let _permit1 = manager.acquire_cpu().await.unwrap();
481        assert_eq!(manager.cpu_tokens_available(), 1);
482
483        // Acquire another
484        let _permit2 = manager.acquire_cpu().await.unwrap();
485        assert_eq!(manager.cpu_tokens_available(), 0);
486
487        // Drop first permit
488        drop(_permit1);
489        assert_eq!(manager.cpu_tokens_available(), 1);
490    }
491
492    #[tokio::test]
493    async fn test_io_token_acquisition() {
494        let manager = GlobalResourceManager::new(ResourceConfig {
495            io_tokens: Some(4),
496            ..Default::default()
497        })
498        .unwrap();
499
500        assert_eq!(manager.io_tokens_available(), 4);
501
502        let _permit = manager.acquire_io().await.unwrap();
503        assert_eq!(manager.io_tokens_available(), 3);
504    }
505
506    #[test]
507    fn test_memory_tracking() {
508        let manager = GlobalResourceManager::new(ResourceConfig::default()).unwrap();
509
510        assert_eq!(manager.memory_used(), 0);
511
512        manager.allocate_memory(1000);
513        assert_eq!(manager.memory_used(), 1000);
514
515        manager.allocate_memory(500);
516        assert_eq!(manager.memory_used(), 1500);
517
518        manager.deallocate_memory(700);
519        assert_eq!(manager.memory_used(), 800);
520    }
521
522    #[test]
523    fn test_global_singleton_access() {
524        // Initialize the global resource manager for this test
525        // Each test runs in its own process, so this is safe
526        let _ = init_resource_manager(ResourceConfig::default());
527
528        // Should be able to access the global instance after initialization
529        let rm = resource_manager();
530        let available = rm.cpu_tokens_available();
531        assert!(available > 0);
532
533        // Also test backward-compatible RESOURCE_MANAGER accessor
534        let available2 = RESOURCE_MANAGER.cpu_tokens_available();
535        assert_eq!(available, available2);
536    }
537}