datasynth_core/
degradation.rs

1//! Graceful degradation system for handling resource pressure.
2//!
3//! This module provides a degradation level system that allows the generator
4//! to progressively reduce functionality when system resources become constrained,
5//! rather than failing outright.
6
7use std::sync::atomic::{AtomicU8, Ordering};
8use std::sync::Arc;
9
10/// Degradation level indicating current system resource state.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
12#[repr(u8)]
13#[derive(Default)]
14pub enum DegradationLevel {
15    /// Normal operation - all features enabled
16    #[default]
17    Normal = 0,
18    /// Reduced operation - skip optional features, reduce batch sizes
19    Reduced = 1,
20    /// Minimal operation - essential data only, disable injections
21    Minimal = 2,
22    /// Emergency - flush and terminate gracefully
23    Emergency = 3,
24}
25
26impl DegradationLevel {
27    /// Check if data quality injection should be skipped at this level.
28    pub fn skip_data_quality(&self) -> bool {
29        *self >= DegradationLevel::Reduced
30    }
31
32    /// Check if anomaly injection should be skipped at this level.
33    pub fn skip_anomaly_injection(&self) -> bool {
34        *self >= DegradationLevel::Minimal
35    }
36
37    /// Check if optional fields should be omitted at this level.
38    pub fn skip_optional_fields(&self) -> bool {
39        *self >= DegradationLevel::Minimal
40    }
41
42    /// Check if immediate flush is required at this level.
43    pub fn requires_immediate_flush(&self) -> bool {
44        *self >= DegradationLevel::Emergency
45    }
46
47    /// Check if generation should terminate at this level.
48    pub fn should_terminate(&self) -> bool {
49        *self == DegradationLevel::Emergency
50    }
51
52    /// Get recommended batch size multiplier (1.0 = normal, 0.5 = half, etc.)
53    pub fn batch_size_multiplier(&self) -> f64 {
54        match self {
55            DegradationLevel::Normal => 1.0,
56            DegradationLevel::Reduced => 0.5,
57            DegradationLevel::Minimal => 0.25,
58            DegradationLevel::Emergency => 0.0,
59        }
60    }
61
62    /// Get recommended anomaly injection rate multiplier.
63    pub fn anomaly_rate_multiplier(&self) -> f64 {
64        match self {
65            DegradationLevel::Normal => 1.0,
66            DegradationLevel::Reduced => 0.5,
67            DegradationLevel::Minimal => 0.0,
68            DegradationLevel::Emergency => 0.0,
69        }
70    }
71
72    /// Get display name for this level.
73    pub fn name(&self) -> &'static str {
74        match self {
75            DegradationLevel::Normal => "Normal",
76            DegradationLevel::Reduced => "Reduced",
77            DegradationLevel::Minimal => "Minimal",
78            DegradationLevel::Emergency => "Emergency",
79        }
80    }
81
82    /// Get description of what happens at this level.
83    pub fn description(&self) -> &'static str {
84        match self {
85            DegradationLevel::Normal => "Full operation with all features enabled",
86            DegradationLevel::Reduced => {
87                "Reduced batch sizes, skip data quality injection, 50% anomaly rate"
88            }
89            DegradationLevel::Minimal => "Essential data only, no injections, minimal batch sizes",
90            DegradationLevel::Emergency => {
91                "Flush pending writes, save checkpoint, terminate gracefully"
92            }
93        }
94    }
95
96    /// Convert from u8.
97    pub fn from_u8(value: u8) -> Self {
98        match value {
99            0 => DegradationLevel::Normal,
100            1 => DegradationLevel::Reduced,
101            2 => DegradationLevel::Minimal,
102            _ => DegradationLevel::Emergency,
103        }
104    }
105}
106
107impl std::fmt::Display for DegradationLevel {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "{}", self.name())
110    }
111}
112
113/// Configuration for degradation thresholds.
114#[derive(Debug, Clone)]
115pub struct DegradationConfig {
116    /// Enable graceful degradation
117    pub enabled: bool,
118    /// Memory usage threshold for Reduced level (0.0 - 1.0)
119    pub reduced_memory_threshold: f64,
120    /// Memory usage threshold for Minimal level (0.0 - 1.0)
121    pub minimal_memory_threshold: f64,
122    /// Memory usage threshold for Emergency level (0.0 - 1.0)
123    pub emergency_memory_threshold: f64,
124    /// Disk space threshold for Reduced level (MB remaining)
125    pub reduced_disk_threshold_mb: usize,
126    /// Disk space threshold for Minimal level (MB remaining)
127    pub minimal_disk_threshold_mb: usize,
128    /// Disk space threshold for Emergency level (MB remaining)
129    pub emergency_disk_threshold_mb: usize,
130    /// CPU threshold for Reduced level (0.0 - 1.0)
131    pub reduced_cpu_threshold: f64,
132    /// CPU threshold for Minimal level (0.0 - 1.0)
133    pub minimal_cpu_threshold: f64,
134    /// Enable auto-recovery when resources improve
135    pub auto_recovery: bool,
136    /// Recovery hysteresis (must improve by this much before recovering)
137    pub recovery_hysteresis: f64,
138}
139
140impl Default for DegradationConfig {
141    fn default() -> Self {
142        Self {
143            enabled: true,
144            // Memory thresholds (percentage of limit)
145            reduced_memory_threshold: 0.70,
146            minimal_memory_threshold: 0.85,
147            emergency_memory_threshold: 0.95,
148            // Disk thresholds (MB remaining)
149            reduced_disk_threshold_mb: 1000,
150            minimal_disk_threshold_mb: 500,
151            emergency_disk_threshold_mb: 100,
152            // CPU thresholds
153            reduced_cpu_threshold: 0.80,
154            minimal_cpu_threshold: 0.90,
155            // Recovery
156            auto_recovery: true,
157            recovery_hysteresis: 0.05,
158        }
159    }
160}
161
162impl DegradationConfig {
163    /// Create a conservative configuration (triggers earlier).
164    pub fn conservative() -> Self {
165        Self {
166            reduced_memory_threshold: 0.60,
167            minimal_memory_threshold: 0.75,
168            emergency_memory_threshold: 0.90,
169            reduced_disk_threshold_mb: 2000,
170            minimal_disk_threshold_mb: 1000,
171            emergency_disk_threshold_mb: 500,
172            reduced_cpu_threshold: 0.70,
173            minimal_cpu_threshold: 0.85,
174            ..Default::default()
175        }
176    }
177
178    /// Create an aggressive configuration (triggers later, maximizes throughput).
179    pub fn aggressive() -> Self {
180        Self {
181            reduced_memory_threshold: 0.80,
182            minimal_memory_threshold: 0.90,
183            emergency_memory_threshold: 0.98,
184            reduced_disk_threshold_mb: 500,
185            minimal_disk_threshold_mb: 200,
186            emergency_disk_threshold_mb: 50,
187            reduced_cpu_threshold: 0.90,
188            minimal_cpu_threshold: 0.95,
189            ..Default::default()
190        }
191    }
192
193    /// Disable graceful degradation.
194    pub fn disabled() -> Self {
195        Self {
196            enabled: false,
197            ..Default::default()
198        }
199    }
200}
201
202/// Current resource status for degradation decisions.
203#[derive(Debug, Clone, Default)]
204pub struct ResourceStatus {
205    /// Memory usage as percentage of limit (0.0 - 1.0), None if no limit
206    pub memory_usage: Option<f64>,
207    /// Available disk space in MB
208    pub disk_available_mb: Option<usize>,
209    /// CPU load (0.0 - 1.0)
210    pub cpu_load: Option<f64>,
211}
212
213impl ResourceStatus {
214    /// Create status from individual measurements.
215    pub fn new(
216        memory_usage: Option<f64>,
217        disk_available_mb: Option<usize>,
218        cpu_load: Option<f64>,
219    ) -> Self {
220        Self {
221            memory_usage,
222            disk_available_mb,
223            cpu_load,
224        }
225    }
226}
227
228/// Thread-safe degradation controller.
229#[derive(Debug)]
230pub struct DegradationController {
231    config: DegradationConfig,
232    current_level: AtomicU8,
233    level_change_count: std::sync::atomic::AtomicU64,
234}
235
236impl DegradationController {
237    /// Create a new degradation controller with the given configuration.
238    pub fn new(config: DegradationConfig) -> Self {
239        Self {
240            config,
241            current_level: AtomicU8::new(DegradationLevel::Normal as u8),
242            level_change_count: std::sync::atomic::AtomicU64::new(0),
243        }
244    }
245
246    /// Create a controller with default configuration.
247    pub fn default_controller() -> Self {
248        Self::new(DegradationConfig::default())
249    }
250
251    /// Create a disabled controller (always returns Normal).
252    pub fn disabled() -> Self {
253        Self::new(DegradationConfig::disabled())
254    }
255
256    /// Create an Arc-wrapped controller for sharing across threads.
257    pub fn shared(config: DegradationConfig) -> Arc<Self> {
258        Arc::new(Self::new(config))
259    }
260
261    /// Get current degradation level.
262    pub fn current_level(&self) -> DegradationLevel {
263        DegradationLevel::from_u8(self.current_level.load(Ordering::Relaxed))
264    }
265
266    /// Update degradation level based on resource status.
267    /// Returns the new level and whether it changed.
268    pub fn update(&self, status: &ResourceStatus) -> (DegradationLevel, bool) {
269        if !self.config.enabled {
270            return (DegradationLevel::Normal, false);
271        }
272
273        let new_level = self.calculate_level(status);
274        let old_level = self.current_level.swap(new_level as u8, Ordering::Relaxed);
275        let changed = old_level != new_level as u8;
276
277        if changed {
278            self.level_change_count.fetch_add(1, Ordering::Relaxed);
279        }
280
281        (new_level, changed)
282    }
283
284    /// Calculate appropriate degradation level based on resource status.
285    fn calculate_level(&self, status: &ResourceStatus) -> DegradationLevel {
286        let current = self.current_level();
287
288        // Check each resource type and find the highest degradation level needed
289        let mut level = DegradationLevel::Normal;
290
291        // Memory check
292        if let Some(mem_usage) = status.memory_usage {
293            let mem_level = if mem_usage >= self.config.emergency_memory_threshold {
294                DegradationLevel::Emergency
295            } else if mem_usage >= self.config.minimal_memory_threshold {
296                DegradationLevel::Minimal
297            } else if mem_usage >= self.config.reduced_memory_threshold {
298                DegradationLevel::Reduced
299            } else {
300                DegradationLevel::Normal
301            };
302            level = level.max(mem_level);
303        }
304
305        // Disk check
306        if let Some(disk_mb) = status.disk_available_mb {
307            let disk_level = if disk_mb <= self.config.emergency_disk_threshold_mb {
308                DegradationLevel::Emergency
309            } else if disk_mb <= self.config.minimal_disk_threshold_mb {
310                DegradationLevel::Minimal
311            } else if disk_mb <= self.config.reduced_disk_threshold_mb {
312                DegradationLevel::Reduced
313            } else {
314                DegradationLevel::Normal
315            };
316            level = level.max(disk_level);
317        }
318
319        // CPU check
320        if let Some(cpu) = status.cpu_load {
321            let cpu_level = if cpu >= self.config.minimal_cpu_threshold {
322                DegradationLevel::Minimal
323            } else if cpu >= self.config.reduced_cpu_threshold {
324                DegradationLevel::Reduced
325            } else {
326                DegradationLevel::Normal
327            };
328            // CPU doesn't trigger Emergency - only memory and disk do
329            level = level.max(cpu_level);
330        }
331
332        // Apply hysteresis for recovery (only allow stepping down one level at a time)
333        if self.config.auto_recovery && level < current {
334            // Allow recovery only if significantly improved
335            let can_recover = if let Some(mem) = status.memory_usage {
336                match current {
337                    DegradationLevel::Emergency => {
338                        mem < self.config.emergency_memory_threshold
339                            - self.config.recovery_hysteresis
340                    }
341                    DegradationLevel::Minimal => {
342                        mem < self.config.minimal_memory_threshold - self.config.recovery_hysteresis
343                    }
344                    DegradationLevel::Reduced => {
345                        mem < self.config.reduced_memory_threshold - self.config.recovery_hysteresis
346                    }
347                    DegradationLevel::Normal => true,
348                }
349            } else {
350                true
351            };
352
353            if can_recover {
354                // Step down one level at a time for smooth recovery
355                level = level.max(match current {
356                    DegradationLevel::Emergency => DegradationLevel::Minimal,
357                    DegradationLevel::Minimal => DegradationLevel::Reduced,
358                    _ => DegradationLevel::Normal,
359                });
360            } else {
361                level = current;
362            }
363        }
364
365        level
366    }
367
368    /// Force a specific degradation level (for testing or manual intervention).
369    pub fn force_level(&self, level: DegradationLevel) {
370        self.current_level.store(level as u8, Ordering::Relaxed);
371        self.level_change_count.fetch_add(1, Ordering::Relaxed);
372    }
373
374    /// Reset to Normal level.
375    pub fn reset(&self) {
376        self.current_level
377            .store(DegradationLevel::Normal as u8, Ordering::Relaxed);
378    }
379
380    /// Get number of level changes.
381    pub fn level_change_count(&self) -> u64 {
382        self.level_change_count.load(Ordering::Relaxed)
383    }
384
385    /// Check if currently degraded (not Normal).
386    pub fn is_degraded(&self) -> bool {
387        self.current_level() != DegradationLevel::Normal
388    }
389
390    /// Get the configuration.
391    pub fn config(&self) -> &DegradationConfig {
392        &self.config
393    }
394}
395
396impl Default for DegradationController {
397    fn default() -> Self {
398        Self::default_controller()
399    }
400}
401
402/// Actions to take at each degradation level.
403#[derive(Debug, Clone)]
404pub struct DegradationActions {
405    /// Skip data quality injection
406    pub skip_data_quality: bool,
407    /// Skip anomaly injection
408    pub skip_anomaly_injection: bool,
409    /// Skip optional fields in output
410    pub skip_optional_fields: bool,
411    /// Reduce batch size by this factor
412    pub batch_size_factor: f64,
413    /// Reduce anomaly injection rate by this factor
414    pub anomaly_rate_factor: f64,
415    /// Use compact output format
416    pub use_compact_output: bool,
417    /// Flush output immediately after each batch
418    pub immediate_flush: bool,
419    /// Terminate generation
420    pub terminate: bool,
421}
422
423impl DegradationActions {
424    /// Get actions for a given degradation level.
425    pub fn for_level(level: DegradationLevel) -> Self {
426        match level {
427            DegradationLevel::Normal => Self {
428                skip_data_quality: false,
429                skip_anomaly_injection: false,
430                skip_optional_fields: false,
431                batch_size_factor: 1.0,
432                anomaly_rate_factor: 1.0,
433                use_compact_output: false,
434                immediate_flush: false,
435                terminate: false,
436            },
437            DegradationLevel::Reduced => Self {
438                skip_data_quality: true,
439                skip_anomaly_injection: false,
440                skip_optional_fields: false,
441                batch_size_factor: 0.5,
442                anomaly_rate_factor: 0.5,
443                use_compact_output: true,
444                immediate_flush: false,
445                terminate: false,
446            },
447            DegradationLevel::Minimal => Self {
448                skip_data_quality: true,
449                skip_anomaly_injection: true,
450                skip_optional_fields: true,
451                batch_size_factor: 0.25,
452                anomaly_rate_factor: 0.0,
453                use_compact_output: true,
454                immediate_flush: true,
455                terminate: false,
456            },
457            DegradationLevel::Emergency => Self {
458                skip_data_quality: true,
459                skip_anomaly_injection: true,
460                skip_optional_fields: true,
461                batch_size_factor: 0.0,
462                anomaly_rate_factor: 0.0,
463                use_compact_output: true,
464                immediate_flush: true,
465                terminate: true,
466            },
467        }
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474
475    #[test]
476    fn test_degradation_level_ordering() {
477        assert!(DegradationLevel::Normal < DegradationLevel::Reduced);
478        assert!(DegradationLevel::Reduced < DegradationLevel::Minimal);
479        assert!(DegradationLevel::Minimal < DegradationLevel::Emergency);
480    }
481
482    #[test]
483    fn test_level_behavior_flags() {
484        assert!(!DegradationLevel::Normal.skip_data_quality());
485        assert!(DegradationLevel::Reduced.skip_data_quality());
486        assert!(DegradationLevel::Minimal.skip_data_quality());
487
488        assert!(!DegradationLevel::Normal.skip_anomaly_injection());
489        assert!(!DegradationLevel::Reduced.skip_anomaly_injection());
490        assert!(DegradationLevel::Minimal.skip_anomaly_injection());
491
492        assert!(!DegradationLevel::Minimal.should_terminate());
493        assert!(DegradationLevel::Emergency.should_terminate());
494    }
495
496    #[test]
497    fn test_controller_creation() {
498        let controller = DegradationController::default_controller();
499        assert_eq!(controller.current_level(), DegradationLevel::Normal);
500    }
501
502    #[test]
503    fn test_controller_disabled() {
504        let controller = DegradationController::disabled();
505        let status = ResourceStatus::new(Some(0.99), Some(10), Some(0.99));
506        let (level, _) = controller.update(&status);
507        assert_eq!(level, DegradationLevel::Normal);
508    }
509
510    #[test]
511    fn test_memory_degradation() {
512        let controller = DegradationController::default_controller();
513
514        // High memory usage should trigger Reduced
515        let status = ResourceStatus::new(Some(0.75), None, None);
516        let (level, changed) = controller.update(&status);
517        assert_eq!(level, DegradationLevel::Reduced);
518        assert!(changed);
519
520        // Very high memory usage should trigger Minimal
521        let status = ResourceStatus::new(Some(0.90), None, None);
522        let (level, _) = controller.update(&status);
523        assert_eq!(level, DegradationLevel::Minimal);
524
525        // Critical memory usage should trigger Emergency
526        let status = ResourceStatus::new(Some(0.96), None, None);
527        let (level, _) = controller.update(&status);
528        assert_eq!(level, DegradationLevel::Emergency);
529    }
530
531    #[test]
532    fn test_disk_degradation() {
533        let controller = DegradationController::default_controller();
534
535        // Low disk space should trigger Reduced
536        let status = ResourceStatus::new(None, Some(800), None);
537        let (level, _) = controller.update(&status);
538        assert_eq!(level, DegradationLevel::Reduced);
539
540        // Very low disk space should trigger Emergency
541        let status = ResourceStatus::new(None, Some(50), None);
542        let (level, _) = controller.update(&status);
543        assert_eq!(level, DegradationLevel::Emergency);
544    }
545
546    #[test]
547    fn test_force_level() {
548        let controller = DegradationController::default_controller();
549        controller.force_level(DegradationLevel::Minimal);
550        assert_eq!(controller.current_level(), DegradationLevel::Minimal);
551    }
552
553    #[test]
554    fn test_level_change_count() {
555        let controller = DegradationController::default_controller();
556        assert_eq!(controller.level_change_count(), 0);
557
558        controller.force_level(DegradationLevel::Reduced);
559        assert_eq!(controller.level_change_count(), 1);
560
561        controller.force_level(DegradationLevel::Normal);
562        assert_eq!(controller.level_change_count(), 2);
563    }
564
565    #[test]
566    fn test_actions_for_level() {
567        let normal_actions = DegradationActions::for_level(DegradationLevel::Normal);
568        assert!(!normal_actions.skip_data_quality);
569        assert!(!normal_actions.terminate);
570        assert_eq!(normal_actions.batch_size_factor, 1.0);
571
572        let emergency_actions = DegradationActions::for_level(DegradationLevel::Emergency);
573        assert!(emergency_actions.skip_data_quality);
574        assert!(emergency_actions.terminate);
575        assert_eq!(emergency_actions.batch_size_factor, 0.0);
576    }
577}