Skip to main content

datasynth_core/
resource_guard.rs

1//! Unified resource guard combining memory, disk, and CPU monitoring.
2//!
3//! This module provides a single interface for checking all system resources
4//! and coordinating graceful degradation when resources become constrained.
5
6use std::path::Path;
7use std::sync::Arc;
8
9use crate::cpu_monitor::{CpuMonitor, CpuMonitorConfig, CpuStats};
10use crate::degradation::{
11    DegradationActions, DegradationConfig, DegradationController, DegradationLevel, ResourceStatus,
12};
13use crate::disk_guard::{DiskSpaceGuard, DiskSpaceGuardConfig, DiskStats};
14use crate::error::{SynthError, SynthResult};
15use crate::memory_guard::{MemoryGuard, MemoryGuardConfig, MemoryStats};
16
17/// Combined resource statistics.
18#[derive(Debug, Clone, Default)]
19pub struct ResourceStats {
20    /// Memory statistics
21    pub memory: MemoryStats,
22    /// Disk space statistics
23    pub disk: DiskStats,
24    /// CPU statistics
25    pub cpu: CpuStats,
26    /// Current degradation level
27    pub degradation_level: DegradationLevel,
28    /// Number of resource checks performed
29    pub checks_performed: u64,
30}
31
32/// Configuration for the unified resource guard.
33#[derive(Debug, Clone)]
34pub struct ResourceGuardConfig {
35    /// Memory guard configuration
36    pub memory: MemoryGuardConfig,
37    /// Disk space guard configuration
38    pub disk: DiskSpaceGuardConfig,
39    /// CPU monitor configuration
40    pub cpu: CpuMonitorConfig,
41    /// Degradation configuration
42    pub degradation: DegradationConfig,
43    /// Check interval (every N operations)
44    pub check_interval: usize,
45}
46
47impl Default for ResourceGuardConfig {
48    fn default() -> Self {
49        Self {
50            memory: MemoryGuardConfig::default(),
51            disk: DiskSpaceGuardConfig::default(),
52            cpu: CpuMonitorConfig::default(),
53            degradation: DegradationConfig::default(),
54            check_interval: 500,
55        }
56    }
57}
58
59impl ResourceGuardConfig {
60    /// Create config with specified memory limit.
61    pub fn with_memory_limit(limit_mb: usize) -> Self {
62        Self {
63            memory: MemoryGuardConfig::with_limit_mb(limit_mb),
64            ..Default::default()
65        }
66    }
67
68    /// Set output path for disk monitoring.
69    pub fn with_output_path<P: AsRef<Path>>(mut self, path: P) -> Self {
70        self.disk.monitor_path = Some(path.as_ref().to_path_buf());
71        self
72    }
73
74    /// Enable CPU monitoring with thresholds.
75    pub fn with_cpu_monitoring(mut self, high_threshold: f64, critical_threshold: f64) -> Self {
76        self.cpu.enabled = true;
77        self.cpu.high_load_threshold = high_threshold;
78        self.cpu.critical_load_threshold = critical_threshold;
79        self
80    }
81
82    /// Use conservative degradation thresholds.
83    pub fn conservative(mut self) -> Self {
84        self.degradation = DegradationConfig::conservative();
85        self
86    }
87
88    /// Use aggressive degradation thresholds.
89    pub fn aggressive(mut self) -> Self {
90        self.degradation = DegradationConfig::aggressive();
91        self
92    }
93
94    /// Disable all monitoring (for testing or when resources are managed externally).
95    pub fn disabled() -> Self {
96        Self {
97            memory: MemoryGuardConfig {
98                hard_limit_mb: 0,
99                ..Default::default()
100            },
101            disk: DiskSpaceGuardConfig {
102                hard_limit_mb: 0,
103                ..Default::default()
104            },
105            cpu: CpuMonitorConfig {
106                enabled: false,
107                ..Default::default()
108            },
109            degradation: DegradationConfig::disabled(),
110            check_interval: 1000,
111        }
112    }
113}
114
115/// Unified resource guard for monitoring all system resources.
116#[derive(Debug)]
117pub struct ResourceGuard {
118    config: ResourceGuardConfig,
119    memory_guard: MemoryGuard,
120    disk_guard: DiskSpaceGuard,
121    cpu_monitor: CpuMonitor,
122    degradation_controller: DegradationController,
123    check_counter: std::sync::atomic::AtomicU64,
124}
125
126impl ResourceGuard {
127    /// Create a new resource guard with the given configuration.
128    pub fn new(config: ResourceGuardConfig) -> Self {
129        Self {
130            memory_guard: MemoryGuard::new(config.memory.clone()),
131            disk_guard: DiskSpaceGuard::new(config.disk.clone()),
132            cpu_monitor: CpuMonitor::new(config.cpu.clone()),
133            degradation_controller: DegradationController::new(config.degradation.clone()),
134            check_counter: std::sync::atomic::AtomicU64::new(0),
135            config,
136        }
137    }
138
139    /// Create a resource guard with default configuration.
140    pub fn default_guard() -> Self {
141        Self::new(ResourceGuardConfig::default())
142    }
143
144    /// Create a resource guard with specified memory limit.
145    pub fn with_memory_limit(limit_mb: usize) -> Self {
146        Self::new(ResourceGuardConfig::with_memory_limit(limit_mb))
147    }
148
149    /// Create a disabled resource guard.
150    pub fn disabled() -> Self {
151        Self::new(ResourceGuardConfig::disabled())
152    }
153
154    /// Create an Arc-wrapped resource guard for sharing across threads.
155    pub fn shared(config: ResourceGuardConfig) -> Arc<Self> {
156        Arc::new(Self::new(config))
157    }
158
159    /// Check all resources (memory, disk, CPU).
160    /// Returns Ok with current degradation level or Err if hard limits exceeded.
161    pub fn check(&self) -> SynthResult<DegradationLevel> {
162        let count = self
163            .check_counter
164            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
165
166        // Only perform actual checks at intervals
167        if count % self.config.check_interval as u64 != 0 {
168            return Ok(self.degradation_controller.current_level());
169        }
170
171        self.check_now()
172    }
173
174    /// Force an immediate check of all resources (bypasses interval).
175    pub fn check_now(&self) -> SynthResult<DegradationLevel> {
176        // Check memory
177        if let Err(e) = self.memory_guard.check_now() {
178            return Err(SynthError::memory_exhausted(e.current_mb, e.limit_mb));
179        }
180
181        // Check disk
182        if let Err(e) = self.disk_guard.check_now() {
183            return Err(SynthError::disk_exhausted(e.available_mb, e.required_mb));
184        }
185
186        // Sample CPU
187        let _ = self.cpu_monitor.sample();
188
189        // Update degradation level
190        let status = self.build_resource_status();
191        let (level, _changed) = self.degradation_controller.update(&status);
192
193        // If at Emergency level, return error to trigger graceful shutdown
194        if level == DegradationLevel::Emergency {
195            return Err(SynthError::degradation(
196                level.name(),
197                "Resource limits critically exceeded, initiating graceful shutdown",
198            ));
199        }
200
201        Ok(level)
202    }
203
204    /// Build resource status for degradation calculation.
205    fn build_resource_status(&self) -> ResourceStatus {
206        let memory_usage = if self.config.memory.hard_limit_mb > 0 {
207            let current = self.memory_guard.current_usage_mb();
208            Some(current as f64 / self.config.memory.hard_limit_mb as f64)
209        } else {
210            None
211        };
212
213        let disk_available = if self.config.disk.hard_limit_mb > 0 {
214            Some(self.disk_guard.available_space_mb())
215        } else {
216            None
217        };
218
219        let cpu_load = if self.cpu_monitor.is_enabled() {
220            Some(self.cpu_monitor.current_load())
221        } else {
222            None
223        };
224
225        ResourceStatus::new(memory_usage, disk_available, cpu_load)
226    }
227
228    /// Get actions to take based on current degradation level.
229    pub fn get_actions(&self) -> DegradationActions {
230        DegradationActions::for_level(self.degradation_controller.current_level())
231    }
232
233    /// Check if currently degraded (not Normal).
234    pub fn is_degraded(&self) -> bool {
235        self.degradation_controller.is_degraded()
236    }
237
238    /// Get current degradation level.
239    pub fn degradation_level(&self) -> DegradationLevel {
240        self.degradation_controller.current_level()
241    }
242
243    /// Get combined resource statistics.
244    pub fn stats(&self) -> ResourceStats {
245        ResourceStats {
246            memory: self.memory_guard.stats(),
247            disk: self.disk_guard.stats(),
248            cpu: self.cpu_monitor.stats(),
249            degradation_level: self.degradation_controller.current_level(),
250            checks_performed: self
251                .check_counter
252                .load(std::sync::atomic::Ordering::Relaxed),
253        }
254    }
255
256    /// Pre-check before a potentially expensive operation.
257    /// Returns recommended action based on current resource state.
258    pub fn pre_check(&self) -> PreCheckResult {
259        let level = self.degradation_controller.current_level();
260        let actions = DegradationActions::for_level(level);
261
262        if actions.terminate {
263            PreCheckResult::Abort("Resources critically low, cannot proceed")
264        } else if actions.immediate_flush {
265            PreCheckResult::ProceedWithCaution("Resources constrained, reduce batch size")
266        } else if level != DegradationLevel::Normal {
267            PreCheckResult::Reduced("Operating in degraded mode")
268        } else {
269            PreCheckResult::Proceed
270        }
271    }
272
273    /// Pre-check before writing data.
274    pub fn check_before_write(&self, estimated_bytes: u64) -> SynthResult<()> {
275        self.disk_guard
276            .check_before_write(estimated_bytes)
277            .map_err(|e| SynthError::disk_exhausted(e.available_mb, e.required_mb))
278    }
279
280    /// Record bytes written (for tracking).
281    pub fn record_write(&self, bytes: u64) {
282        self.disk_guard.record_write(bytes);
283    }
284
285    /// Get reference to memory guard.
286    pub fn memory(&self) -> &MemoryGuard {
287        &self.memory_guard
288    }
289
290    /// Get reference to disk guard.
291    pub fn disk(&self) -> &DiskSpaceGuard {
292        &self.disk_guard
293    }
294
295    /// Get reference to CPU monitor.
296    pub fn cpu(&self) -> &CpuMonitor {
297        &self.cpu_monitor
298    }
299
300    /// Get reference to degradation controller.
301    pub fn degradation(&self) -> &DegradationController {
302        &self.degradation_controller
303    }
304
305    /// Apply throttle delay if CPU is overloaded.
306    pub fn maybe_throttle(&self) {
307        self.cpu_monitor.maybe_throttle();
308    }
309
310    /// Reset all statistics (for testing).
311    pub fn reset_stats(&self) {
312        self.memory_guard.reset_stats();
313        self.disk_guard.reset_stats();
314        self.cpu_monitor.reset_stats();
315        self.degradation_controller.reset();
316        self.check_counter
317            .store(0, std::sync::atomic::Ordering::Relaxed);
318    }
319
320    /// Check if resource monitoring is available on this platform.
321    pub fn is_available() -> bool {
322        MemoryGuard::is_available() || DiskSpaceGuard::is_available() || CpuMonitor::is_available()
323    }
324
325    /// Get current memory usage in MB.
326    pub fn current_memory_mb(&self) -> usize {
327        self.memory_guard.current_usage_mb()
328    }
329
330    /// Get current available disk space in MB.
331    pub fn available_disk_mb(&self) -> usize {
332        self.disk_guard.available_space_mb()
333    }
334
335    /// Get current CPU load.
336    pub fn current_cpu_load(&self) -> f64 {
337        self.cpu_monitor.current_load()
338    }
339}
340
341impl Default for ResourceGuard {
342    fn default() -> Self {
343        Self::default_guard()
344    }
345}
346
347/// Result of a pre-check before an operation.
348#[derive(Debug, Clone, Copy, PartialEq, Eq)]
349pub enum PreCheckResult {
350    /// Proceed normally
351    Proceed,
352    /// Proceed with reduced functionality
353    Reduced(&'static str),
354    /// Proceed but with caution (flush more frequently, reduce batch size)
355    ProceedWithCaution(&'static str),
356    /// Abort the operation
357    Abort(&'static str),
358}
359
360impl PreCheckResult {
361    /// Check if operation should proceed (any variant except Abort).
362    pub fn should_proceed(&self) -> bool {
363        !matches!(self, PreCheckResult::Abort(_))
364    }
365
366    /// Get the message if any.
367    pub fn message(&self) -> Option<&'static str> {
368        match self {
369            PreCheckResult::Proceed => None,
370            PreCheckResult::Reduced(msg) => Some(msg),
371            PreCheckResult::ProceedWithCaution(msg) => Some(msg),
372            PreCheckResult::Abort(msg) => Some(msg),
373        }
374    }
375}
376
377/// Builder for creating a ResourceGuard with a fluent interface.
378#[derive(Debug, Clone, Default)]
379pub struct ResourceGuardBuilder {
380    config: ResourceGuardConfig,
381}
382
383impl ResourceGuardBuilder {
384    /// Create a new builder with default configuration.
385    pub fn new() -> Self {
386        Self::default()
387    }
388
389    /// Set memory limit in MB.
390    pub fn memory_limit(mut self, limit_mb: usize) -> Self {
391        self.config.memory = MemoryGuardConfig::with_limit_mb(limit_mb);
392        self
393    }
394
395    /// Set minimum free disk space in MB.
396    pub fn min_free_disk(mut self, min_free_mb: usize) -> Self {
397        self.config.disk = DiskSpaceGuardConfig::with_min_free_mb(min_free_mb);
398        self
399    }
400
401    /// Set output path for disk monitoring.
402    pub fn output_path<P: AsRef<Path>>(mut self, path: P) -> Self {
403        self.config.disk.monitor_path = Some(path.as_ref().to_path_buf());
404        self
405    }
406
407    /// Enable CPU monitoring.
408    pub fn cpu_monitoring(mut self, high_threshold: f64, critical_threshold: f64) -> Self {
409        self.config.cpu.enabled = true;
410        self.config.cpu.high_load_threshold = high_threshold;
411        self.config.cpu.critical_load_threshold = critical_threshold;
412        self
413    }
414
415    /// Enable auto-throttling when CPU is overloaded.
416    pub fn auto_throttle(mut self, delay_ms: u64) -> Self {
417        self.config.cpu.auto_throttle = true;
418        self.config.cpu.throttle_delay_ms = delay_ms;
419        self
420    }
421
422    /// Set degradation thresholds.
423    pub fn degradation_config(mut self, config: DegradationConfig) -> Self {
424        self.config.degradation = config;
425        self
426    }
427
428    /// Use conservative degradation settings.
429    pub fn conservative(mut self) -> Self {
430        self.config.degradation = DegradationConfig::conservative();
431        self
432    }
433
434    /// Use aggressive degradation settings.
435    pub fn aggressive(mut self) -> Self {
436        self.config.degradation = DegradationConfig::aggressive();
437        self
438    }
439
440    /// Set check interval.
441    pub fn check_interval(mut self, interval: usize) -> Self {
442        self.config.check_interval = interval;
443        self
444    }
445
446    /// Build the ResourceGuard.
447    pub fn build(self) -> ResourceGuard {
448        ResourceGuard::new(self.config)
449    }
450
451    /// Build an Arc-wrapped ResourceGuard.
452    pub fn build_shared(self) -> Arc<ResourceGuard> {
453        Arc::new(ResourceGuard::new(self.config))
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    #[test]
462    fn test_resource_guard_creation() {
463        let guard = ResourceGuard::with_memory_limit(1024);
464        assert_eq!(guard.config.memory.hard_limit_mb, 1024);
465    }
466
467    #[test]
468    fn test_resource_guard_disabled() {
469        let guard = ResourceGuard::disabled();
470        assert!(guard.check().is_ok());
471        assert_eq!(guard.degradation_level(), DegradationLevel::Normal);
472    }
473
474    #[test]
475    fn test_builder() {
476        let guard = ResourceGuardBuilder::new()
477            .memory_limit(2048)
478            .min_free_disk(500)
479            .cpu_monitoring(0.8, 0.95)
480            .conservative()
481            .build();
482
483        assert_eq!(guard.config.memory.hard_limit_mb, 2048);
484        assert_eq!(guard.config.disk.hard_limit_mb, 500);
485        assert!(guard.config.cpu.enabled);
486    }
487
488    #[test]
489    fn test_pre_check() {
490        let guard = ResourceGuard::disabled();
491        let result = guard.pre_check();
492        assert!(result.should_proceed());
493        assert_eq!(result, PreCheckResult::Proceed);
494    }
495
496    #[test]
497    fn test_stats() {
498        let guard = ResourceGuard::default_guard();
499        let stats = guard.stats();
500        assert_eq!(stats.degradation_level, DegradationLevel::Normal);
501    }
502
503    #[test]
504    fn test_pre_check_messages() {
505        assert!(PreCheckResult::Proceed.message().is_none());
506        assert!(PreCheckResult::Abort("test").message().is_some());
507    }
508
509    #[test]
510    fn test_is_available() {
511        // Should be true on at least one of the monitored resources
512        #[cfg(unix)]
513        assert!(ResourceGuard::is_available());
514    }
515}