1use std::path::Path;
7use std::sync::Arc;
8
9use crate::cpu_monitor::{CpuMonitor, CpuMonitorConfig, CpuStats};
10use crate::degradation::{
11 DegradationActions, DegradationConfig, DegradationController, DegradationLevel, ResourceStatus,
12};
13use crate::disk_guard::{DiskSpaceGuard, DiskSpaceGuardConfig, DiskStats};
14use crate::error::{SynthError, SynthResult};
15use crate::memory_guard::{MemoryGuard, MemoryGuardConfig, MemoryStats};
16
17#[derive(Debug, Clone, Default)]
19pub struct ResourceStats {
20 pub memory: MemoryStats,
22 pub disk: DiskStats,
24 pub cpu: CpuStats,
26 pub degradation_level: DegradationLevel,
28 pub checks_performed: u64,
30}
31
32#[derive(Debug, Clone)]
34pub struct ResourceGuardConfig {
35 pub memory: MemoryGuardConfig,
37 pub disk: DiskSpaceGuardConfig,
39 pub cpu: CpuMonitorConfig,
41 pub degradation: DegradationConfig,
43 pub check_interval: usize,
45}
46
47impl Default for ResourceGuardConfig {
48 fn default() -> Self {
49 Self {
50 memory: MemoryGuardConfig::default(),
51 disk: DiskSpaceGuardConfig::default(),
52 cpu: CpuMonitorConfig::default(),
53 degradation: DegradationConfig::default(),
54 check_interval: 500,
55 }
56 }
57}
58
59impl ResourceGuardConfig {
60 pub fn with_memory_limit(limit_mb: usize) -> Self {
62 Self {
63 memory: MemoryGuardConfig::with_limit_mb(limit_mb),
64 ..Default::default()
65 }
66 }
67
68 pub fn with_output_path<P: AsRef<Path>>(mut self, path: P) -> Self {
70 self.disk.monitor_path = Some(path.as_ref().to_path_buf());
71 self
72 }
73
74 pub fn with_cpu_monitoring(mut self, high_threshold: f64, critical_threshold: f64) -> Self {
76 self.cpu.enabled = true;
77 self.cpu.high_load_threshold = high_threshold;
78 self.cpu.critical_load_threshold = critical_threshold;
79 self
80 }
81
82 pub fn conservative(mut self) -> Self {
84 self.degradation = DegradationConfig::conservative();
85 self
86 }
87
88 pub fn aggressive(mut self) -> Self {
90 self.degradation = DegradationConfig::aggressive();
91 self
92 }
93
94 pub fn disabled() -> Self {
96 Self {
97 memory: MemoryGuardConfig {
98 hard_limit_mb: 0,
99 ..Default::default()
100 },
101 disk: DiskSpaceGuardConfig {
102 hard_limit_mb: 0,
103 ..Default::default()
104 },
105 cpu: CpuMonitorConfig {
106 enabled: false,
107 ..Default::default()
108 },
109 degradation: DegradationConfig::disabled(),
110 check_interval: 1000,
111 }
112 }
113}
114
115#[derive(Debug)]
117pub struct ResourceGuard {
118 config: ResourceGuardConfig,
119 memory_guard: MemoryGuard,
120 disk_guard: DiskSpaceGuard,
121 cpu_monitor: CpuMonitor,
122 degradation_controller: DegradationController,
123 check_counter: std::sync::atomic::AtomicU64,
124}
125
126impl ResourceGuard {
127 pub fn new(config: ResourceGuardConfig) -> Self {
129 Self {
130 memory_guard: MemoryGuard::new(config.memory.clone()),
131 disk_guard: DiskSpaceGuard::new(config.disk.clone()),
132 cpu_monitor: CpuMonitor::new(config.cpu.clone()),
133 degradation_controller: DegradationController::new(config.degradation.clone()),
134 check_counter: std::sync::atomic::AtomicU64::new(0),
135 config,
136 }
137 }
138
139 pub fn default_guard() -> Self {
141 Self::new(ResourceGuardConfig::default())
142 }
143
144 pub fn with_memory_limit(limit_mb: usize) -> Self {
146 Self::new(ResourceGuardConfig::with_memory_limit(limit_mb))
147 }
148
149 pub fn disabled() -> Self {
151 Self::new(ResourceGuardConfig::disabled())
152 }
153
154 pub fn shared(config: ResourceGuardConfig) -> Arc<Self> {
156 Arc::new(Self::new(config))
157 }
158
159 pub fn check(&self) -> SynthResult<DegradationLevel> {
162 let count = self
163 .check_counter
164 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
165
166 if count % self.config.check_interval as u64 != 0 {
168 return Ok(self.degradation_controller.current_level());
169 }
170
171 self.check_now()
172 }
173
174 pub fn check_now(&self) -> SynthResult<DegradationLevel> {
176 if let Err(e) = self.memory_guard.check_now() {
178 return Err(SynthError::memory_exhausted(e.current_mb, e.limit_mb));
179 }
180
181 if let Err(e) = self.disk_guard.check_now() {
183 return Err(SynthError::disk_exhausted(e.available_mb, e.required_mb));
184 }
185
186 let _ = self.cpu_monitor.sample();
188
189 let status = self.build_resource_status();
191 let (level, _changed) = self.degradation_controller.update(&status);
192
193 if level == DegradationLevel::Emergency {
195 return Err(SynthError::degradation(
196 level.name(),
197 "Resource limits critically exceeded, initiating graceful shutdown",
198 ));
199 }
200
201 Ok(level)
202 }
203
204 fn build_resource_status(&self) -> ResourceStatus {
206 let memory_usage = if self.config.memory.hard_limit_mb > 0 {
207 let current = self.memory_guard.current_usage_mb();
208 Some(current as f64 / self.config.memory.hard_limit_mb as f64)
209 } else {
210 None
211 };
212
213 let disk_available = if self.config.disk.hard_limit_mb > 0 {
214 Some(self.disk_guard.available_space_mb())
215 } else {
216 None
217 };
218
219 let cpu_load = if self.cpu_monitor.is_enabled() {
220 Some(self.cpu_monitor.current_load())
221 } else {
222 None
223 };
224
225 ResourceStatus::new(memory_usage, disk_available, cpu_load)
226 }
227
228 pub fn get_actions(&self) -> DegradationActions {
230 DegradationActions::for_level(self.degradation_controller.current_level())
231 }
232
233 pub fn is_degraded(&self) -> bool {
235 self.degradation_controller.is_degraded()
236 }
237
238 pub fn degradation_level(&self) -> DegradationLevel {
240 self.degradation_controller.current_level()
241 }
242
243 pub fn stats(&self) -> ResourceStats {
245 ResourceStats {
246 memory: self.memory_guard.stats(),
247 disk: self.disk_guard.stats(),
248 cpu: self.cpu_monitor.stats(),
249 degradation_level: self.degradation_controller.current_level(),
250 checks_performed: self
251 .check_counter
252 .load(std::sync::atomic::Ordering::Relaxed),
253 }
254 }
255
256 pub fn pre_check(&self) -> PreCheckResult {
259 let level = self.degradation_controller.current_level();
260 let actions = DegradationActions::for_level(level);
261
262 if actions.terminate {
263 PreCheckResult::Abort("Resources critically low, cannot proceed")
264 } else if actions.immediate_flush {
265 PreCheckResult::ProceedWithCaution("Resources constrained, reduce batch size")
266 } else if level != DegradationLevel::Normal {
267 PreCheckResult::Reduced("Operating in degraded mode")
268 } else {
269 PreCheckResult::Proceed
270 }
271 }
272
273 pub fn check_before_write(&self, estimated_bytes: u64) -> SynthResult<()> {
275 self.disk_guard
276 .check_before_write(estimated_bytes)
277 .map_err(|e| SynthError::disk_exhausted(e.available_mb, e.required_mb))
278 }
279
280 pub fn record_write(&self, bytes: u64) {
282 self.disk_guard.record_write(bytes);
283 }
284
285 pub fn memory(&self) -> &MemoryGuard {
287 &self.memory_guard
288 }
289
290 pub fn disk(&self) -> &DiskSpaceGuard {
292 &self.disk_guard
293 }
294
295 pub fn cpu(&self) -> &CpuMonitor {
297 &self.cpu_monitor
298 }
299
300 pub fn degradation(&self) -> &DegradationController {
302 &self.degradation_controller
303 }
304
305 pub fn maybe_throttle(&self) {
307 self.cpu_monitor.maybe_throttle();
308 }
309
310 pub fn reset_stats(&self) {
312 self.memory_guard.reset_stats();
313 self.disk_guard.reset_stats();
314 self.cpu_monitor.reset_stats();
315 self.degradation_controller.reset();
316 self.check_counter
317 .store(0, std::sync::atomic::Ordering::Relaxed);
318 }
319
320 pub fn is_available() -> bool {
322 MemoryGuard::is_available() || DiskSpaceGuard::is_available() || CpuMonitor::is_available()
323 }
324
325 pub fn current_memory_mb(&self) -> usize {
327 self.memory_guard.current_usage_mb()
328 }
329
330 pub fn available_disk_mb(&self) -> usize {
332 self.disk_guard.available_space_mb()
333 }
334
335 pub fn current_cpu_load(&self) -> f64 {
337 self.cpu_monitor.current_load()
338 }
339}
340
341impl Default for ResourceGuard {
342 fn default() -> Self {
343 Self::default_guard()
344 }
345}
346
347#[derive(Debug, Clone, Copy, PartialEq, Eq)]
349pub enum PreCheckResult {
350 Proceed,
352 Reduced(&'static str),
354 ProceedWithCaution(&'static str),
356 Abort(&'static str),
358}
359
360impl PreCheckResult {
361 pub fn should_proceed(&self) -> bool {
363 !matches!(self, PreCheckResult::Abort(_))
364 }
365
366 pub fn message(&self) -> Option<&'static str> {
368 match self {
369 PreCheckResult::Proceed => None,
370 PreCheckResult::Reduced(msg) => Some(msg),
371 PreCheckResult::ProceedWithCaution(msg) => Some(msg),
372 PreCheckResult::Abort(msg) => Some(msg),
373 }
374 }
375}
376
377#[derive(Debug, Clone, Default)]
379pub struct ResourceGuardBuilder {
380 config: ResourceGuardConfig,
381}
382
383impl ResourceGuardBuilder {
384 pub fn new() -> Self {
386 Self::default()
387 }
388
389 pub fn memory_limit(mut self, limit_mb: usize) -> Self {
391 self.config.memory = MemoryGuardConfig::with_limit_mb(limit_mb);
392 self
393 }
394
395 pub fn min_free_disk(mut self, min_free_mb: usize) -> Self {
397 self.config.disk = DiskSpaceGuardConfig::with_min_free_mb(min_free_mb);
398 self
399 }
400
401 pub fn output_path<P: AsRef<Path>>(mut self, path: P) -> Self {
403 self.config.disk.monitor_path = Some(path.as_ref().to_path_buf());
404 self
405 }
406
407 pub fn cpu_monitoring(mut self, high_threshold: f64, critical_threshold: f64) -> Self {
409 self.config.cpu.enabled = true;
410 self.config.cpu.high_load_threshold = high_threshold;
411 self.config.cpu.critical_load_threshold = critical_threshold;
412 self
413 }
414
415 pub fn auto_throttle(mut self, delay_ms: u64) -> Self {
417 self.config.cpu.auto_throttle = true;
418 self.config.cpu.throttle_delay_ms = delay_ms;
419 self
420 }
421
422 pub fn degradation_config(mut self, config: DegradationConfig) -> Self {
424 self.config.degradation = config;
425 self
426 }
427
428 pub fn conservative(mut self) -> Self {
430 self.config.degradation = DegradationConfig::conservative();
431 self
432 }
433
434 pub fn aggressive(mut self) -> Self {
436 self.config.degradation = DegradationConfig::aggressive();
437 self
438 }
439
440 pub fn check_interval(mut self, interval: usize) -> Self {
442 self.config.check_interval = interval;
443 self
444 }
445
446 pub fn build(self) -> ResourceGuard {
448 ResourceGuard::new(self.config)
449 }
450
451 pub fn build_shared(self) -> Arc<ResourceGuard> {
453 Arc::new(ResourceGuard::new(self.config))
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[test]
462 fn test_resource_guard_creation() {
463 let guard = ResourceGuard::with_memory_limit(1024);
464 assert_eq!(guard.config.memory.hard_limit_mb, 1024);
465 }
466
467 #[test]
468 fn test_resource_guard_disabled() {
469 let guard = ResourceGuard::disabled();
470 assert!(guard.check().is_ok());
471 assert_eq!(guard.degradation_level(), DegradationLevel::Normal);
472 }
473
474 #[test]
475 fn test_builder() {
476 let guard = ResourceGuardBuilder::new()
477 .memory_limit(2048)
478 .min_free_disk(500)
479 .cpu_monitoring(0.8, 0.95)
480 .conservative()
481 .build();
482
483 assert_eq!(guard.config.memory.hard_limit_mb, 2048);
484 assert_eq!(guard.config.disk.hard_limit_mb, 500);
485 assert!(guard.config.cpu.enabled);
486 }
487
488 #[test]
489 fn test_pre_check() {
490 let guard = ResourceGuard::disabled();
491 let result = guard.pre_check();
492 assert!(result.should_proceed());
493 assert_eq!(result, PreCheckResult::Proceed);
494 }
495
496 #[test]
497 fn test_stats() {
498 let guard = ResourceGuard::default_guard();
499 let stats = guard.stats();
500 assert_eq!(stats.degradation_level, DegradationLevel::Normal);
501 }
502
503 #[test]
504 fn test_pre_check_messages() {
505 assert!(PreCheckResult::Proceed.message().is_none());
506 assert!(PreCheckResult::Abort("test").message().is_some());
507 }
508
509 #[test]
510 fn test_is_available() {
511 #[cfg(unix)]
513 assert!(ResourceGuard::is_available());
514 }
515}