1use std::sync::atomic::{AtomicU8, Ordering};
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
12#[repr(u8)]
13#[derive(Default)]
14pub enum DegradationLevel {
15 #[default]
17 Normal = 0,
18 Reduced = 1,
20 Minimal = 2,
22 Emergency = 3,
24}
25
26impl DegradationLevel {
27 pub fn skip_data_quality(&self) -> bool {
29 *self >= DegradationLevel::Reduced
30 }
31
32 pub fn skip_anomaly_injection(&self) -> bool {
34 *self >= DegradationLevel::Minimal
35 }
36
37 pub fn skip_optional_fields(&self) -> bool {
39 *self >= DegradationLevel::Minimal
40 }
41
42 pub fn requires_immediate_flush(&self) -> bool {
44 *self >= DegradationLevel::Emergency
45 }
46
47 pub fn should_terminate(&self) -> bool {
49 *self == DegradationLevel::Emergency
50 }
51
52 pub fn batch_size_multiplier(&self) -> f64 {
54 match self {
55 DegradationLevel::Normal => 1.0,
56 DegradationLevel::Reduced => 0.5,
57 DegradationLevel::Minimal => 0.25,
58 DegradationLevel::Emergency => 0.0,
59 }
60 }
61
62 pub fn anomaly_rate_multiplier(&self) -> f64 {
64 match self {
65 DegradationLevel::Normal => 1.0,
66 DegradationLevel::Reduced => 0.5,
67 DegradationLevel::Minimal => 0.0,
68 DegradationLevel::Emergency => 0.0,
69 }
70 }
71
72 pub fn name(&self) -> &'static str {
74 match self {
75 DegradationLevel::Normal => "Normal",
76 DegradationLevel::Reduced => "Reduced",
77 DegradationLevel::Minimal => "Minimal",
78 DegradationLevel::Emergency => "Emergency",
79 }
80 }
81
82 pub fn description(&self) -> &'static str {
84 match self {
85 DegradationLevel::Normal => "Full operation with all features enabled",
86 DegradationLevel::Reduced => {
87 "Reduced batch sizes, skip data quality injection, 50% anomaly rate"
88 }
89 DegradationLevel::Minimal => "Essential data only, no injections, minimal batch sizes",
90 DegradationLevel::Emergency => {
91 "Flush pending writes, save checkpoint, terminate gracefully"
92 }
93 }
94 }
95
96 pub fn from_u8(value: u8) -> Self {
98 match value {
99 0 => DegradationLevel::Normal,
100 1 => DegradationLevel::Reduced,
101 2 => DegradationLevel::Minimal,
102 _ => DegradationLevel::Emergency,
103 }
104 }
105}
106
107impl std::fmt::Display for DegradationLevel {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "{}", self.name())
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct DegradationConfig {
116 pub enabled: bool,
118 pub reduced_memory_threshold: f64,
120 pub minimal_memory_threshold: f64,
122 pub emergency_memory_threshold: f64,
124 pub reduced_disk_threshold_mb: usize,
126 pub minimal_disk_threshold_mb: usize,
128 pub emergency_disk_threshold_mb: usize,
130 pub reduced_cpu_threshold: f64,
132 pub minimal_cpu_threshold: f64,
134 pub auto_recovery: bool,
136 pub recovery_hysteresis: f64,
138}
139
140impl Default for DegradationConfig {
141 fn default() -> Self {
142 Self {
143 enabled: true,
144 reduced_memory_threshold: 0.70,
146 minimal_memory_threshold: 0.85,
147 emergency_memory_threshold: 0.95,
148 reduced_disk_threshold_mb: 1000,
150 minimal_disk_threshold_mb: 500,
151 emergency_disk_threshold_mb: 100,
152 reduced_cpu_threshold: 0.80,
154 minimal_cpu_threshold: 0.90,
155 auto_recovery: true,
157 recovery_hysteresis: 0.05,
158 }
159 }
160}
161
162impl DegradationConfig {
163 pub fn conservative() -> Self {
165 Self {
166 reduced_memory_threshold: 0.60,
167 minimal_memory_threshold: 0.75,
168 emergency_memory_threshold: 0.90,
169 reduced_disk_threshold_mb: 2000,
170 minimal_disk_threshold_mb: 1000,
171 emergency_disk_threshold_mb: 500,
172 reduced_cpu_threshold: 0.70,
173 minimal_cpu_threshold: 0.85,
174 ..Default::default()
175 }
176 }
177
178 pub fn aggressive() -> Self {
180 Self {
181 reduced_memory_threshold: 0.80,
182 minimal_memory_threshold: 0.90,
183 emergency_memory_threshold: 0.98,
184 reduced_disk_threshold_mb: 500,
185 minimal_disk_threshold_mb: 200,
186 emergency_disk_threshold_mb: 50,
187 reduced_cpu_threshold: 0.90,
188 minimal_cpu_threshold: 0.95,
189 ..Default::default()
190 }
191 }
192
193 pub fn disabled() -> Self {
195 Self {
196 enabled: false,
197 ..Default::default()
198 }
199 }
200}
201
202#[derive(Debug, Clone, Default)]
204pub struct ResourceStatus {
205 pub memory_usage: Option<f64>,
207 pub disk_available_mb: Option<usize>,
209 pub cpu_load: Option<f64>,
211}
212
213impl ResourceStatus {
214 pub fn new(
216 memory_usage: Option<f64>,
217 disk_available_mb: Option<usize>,
218 cpu_load: Option<f64>,
219 ) -> Self {
220 Self {
221 memory_usage,
222 disk_available_mb,
223 cpu_load,
224 }
225 }
226}
227
228#[derive(Debug)]
230pub struct DegradationController {
231 config: DegradationConfig,
232 current_level: AtomicU8,
233 level_change_count: std::sync::atomic::AtomicU64,
234}
235
236impl DegradationController {
237 pub fn new(config: DegradationConfig) -> Self {
239 Self {
240 config,
241 current_level: AtomicU8::new(DegradationLevel::Normal as u8),
242 level_change_count: std::sync::atomic::AtomicU64::new(0),
243 }
244 }
245
246 pub fn default_controller() -> Self {
248 Self::new(DegradationConfig::default())
249 }
250
251 pub fn disabled() -> Self {
253 Self::new(DegradationConfig::disabled())
254 }
255
256 pub fn shared(config: DegradationConfig) -> Arc<Self> {
258 Arc::new(Self::new(config))
259 }
260
261 pub fn current_level(&self) -> DegradationLevel {
263 DegradationLevel::from_u8(self.current_level.load(Ordering::Relaxed))
264 }
265
266 pub fn update(&self, status: &ResourceStatus) -> (DegradationLevel, bool) {
269 if !self.config.enabled {
270 return (DegradationLevel::Normal, false);
271 }
272
273 let new_level = self.calculate_level(status);
274 let old_level = self.current_level.swap(new_level as u8, Ordering::Relaxed);
275 let changed = old_level != new_level as u8;
276
277 if changed {
278 self.level_change_count.fetch_add(1, Ordering::Relaxed);
279 }
280
281 (new_level, changed)
282 }
283
284 fn calculate_level(&self, status: &ResourceStatus) -> DegradationLevel {
286 let current = self.current_level();
287
288 let mut level = DegradationLevel::Normal;
290
291 if let Some(mem_usage) = status.memory_usage {
293 let mem_level = if mem_usage >= self.config.emergency_memory_threshold {
294 DegradationLevel::Emergency
295 } else if mem_usage >= self.config.minimal_memory_threshold {
296 DegradationLevel::Minimal
297 } else if mem_usage >= self.config.reduced_memory_threshold {
298 DegradationLevel::Reduced
299 } else {
300 DegradationLevel::Normal
301 };
302 level = level.max(mem_level);
303 }
304
305 if let Some(disk_mb) = status.disk_available_mb {
307 let disk_level = if disk_mb <= self.config.emergency_disk_threshold_mb {
308 DegradationLevel::Emergency
309 } else if disk_mb <= self.config.minimal_disk_threshold_mb {
310 DegradationLevel::Minimal
311 } else if disk_mb <= self.config.reduced_disk_threshold_mb {
312 DegradationLevel::Reduced
313 } else {
314 DegradationLevel::Normal
315 };
316 level = level.max(disk_level);
317 }
318
319 if let Some(cpu) = status.cpu_load {
321 let cpu_level = if cpu >= self.config.minimal_cpu_threshold {
322 DegradationLevel::Minimal
323 } else if cpu >= self.config.reduced_cpu_threshold {
324 DegradationLevel::Reduced
325 } else {
326 DegradationLevel::Normal
327 };
328 level = level.max(cpu_level);
330 }
331
332 if self.config.auto_recovery && level < current {
334 let can_recover = if let Some(mem) = status.memory_usage {
336 match current {
337 DegradationLevel::Emergency => {
338 mem < self.config.emergency_memory_threshold
339 - self.config.recovery_hysteresis
340 }
341 DegradationLevel::Minimal => {
342 mem < self.config.minimal_memory_threshold - self.config.recovery_hysteresis
343 }
344 DegradationLevel::Reduced => {
345 mem < self.config.reduced_memory_threshold - self.config.recovery_hysteresis
346 }
347 DegradationLevel::Normal => true,
348 }
349 } else {
350 true
351 };
352
353 if can_recover {
354 level = level.max(match current {
356 DegradationLevel::Emergency => DegradationLevel::Minimal,
357 DegradationLevel::Minimal => DegradationLevel::Reduced,
358 _ => DegradationLevel::Normal,
359 });
360 } else {
361 level = current;
362 }
363 }
364
365 level
366 }
367
368 pub fn force_level(&self, level: DegradationLevel) {
370 self.current_level.store(level as u8, Ordering::Relaxed);
371 self.level_change_count.fetch_add(1, Ordering::Relaxed);
372 }
373
374 pub fn reset(&self) {
376 self.current_level
377 .store(DegradationLevel::Normal as u8, Ordering::Relaxed);
378 }
379
380 pub fn level_change_count(&self) -> u64 {
382 self.level_change_count.load(Ordering::Relaxed)
383 }
384
385 pub fn is_degraded(&self) -> bool {
387 self.current_level() != DegradationLevel::Normal
388 }
389
390 pub fn config(&self) -> &DegradationConfig {
392 &self.config
393 }
394}
395
396impl Default for DegradationController {
397 fn default() -> Self {
398 Self::default_controller()
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct DegradationActions {
405 pub skip_data_quality: bool,
407 pub skip_anomaly_injection: bool,
409 pub skip_optional_fields: bool,
411 pub batch_size_factor: f64,
413 pub anomaly_rate_factor: f64,
415 pub use_compact_output: bool,
417 pub immediate_flush: bool,
419 pub terminate: bool,
421}
422
423impl DegradationActions {
424 pub fn for_level(level: DegradationLevel) -> Self {
426 match level {
427 DegradationLevel::Normal => Self {
428 skip_data_quality: false,
429 skip_anomaly_injection: false,
430 skip_optional_fields: false,
431 batch_size_factor: 1.0,
432 anomaly_rate_factor: 1.0,
433 use_compact_output: false,
434 immediate_flush: false,
435 terminate: false,
436 },
437 DegradationLevel::Reduced => Self {
438 skip_data_quality: true,
439 skip_anomaly_injection: false,
440 skip_optional_fields: false,
441 batch_size_factor: 0.5,
442 anomaly_rate_factor: 0.5,
443 use_compact_output: true,
444 immediate_flush: false,
445 terminate: false,
446 },
447 DegradationLevel::Minimal => Self {
448 skip_data_quality: true,
449 skip_anomaly_injection: true,
450 skip_optional_fields: true,
451 batch_size_factor: 0.25,
452 anomaly_rate_factor: 0.0,
453 use_compact_output: true,
454 immediate_flush: true,
455 terminate: false,
456 },
457 DegradationLevel::Emergency => Self {
458 skip_data_quality: true,
459 skip_anomaly_injection: true,
460 skip_optional_fields: true,
461 batch_size_factor: 0.0,
462 anomaly_rate_factor: 0.0,
463 use_compact_output: true,
464 immediate_flush: true,
465 terminate: true,
466 },
467 }
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474
475 #[test]
476 fn test_degradation_level_ordering() {
477 assert!(DegradationLevel::Normal < DegradationLevel::Reduced);
478 assert!(DegradationLevel::Reduced < DegradationLevel::Minimal);
479 assert!(DegradationLevel::Minimal < DegradationLevel::Emergency);
480 }
481
482 #[test]
483 fn test_level_behavior_flags() {
484 assert!(!DegradationLevel::Normal.skip_data_quality());
485 assert!(DegradationLevel::Reduced.skip_data_quality());
486 assert!(DegradationLevel::Minimal.skip_data_quality());
487
488 assert!(!DegradationLevel::Normal.skip_anomaly_injection());
489 assert!(!DegradationLevel::Reduced.skip_anomaly_injection());
490 assert!(DegradationLevel::Minimal.skip_anomaly_injection());
491
492 assert!(!DegradationLevel::Minimal.should_terminate());
493 assert!(DegradationLevel::Emergency.should_terminate());
494 }
495
496 #[test]
497 fn test_controller_creation() {
498 let controller = DegradationController::default_controller();
499 assert_eq!(controller.current_level(), DegradationLevel::Normal);
500 }
501
502 #[test]
503 fn test_controller_disabled() {
504 let controller = DegradationController::disabled();
505 let status = ResourceStatus::new(Some(0.99), Some(10), Some(0.99));
506 let (level, _) = controller.update(&status);
507 assert_eq!(level, DegradationLevel::Normal);
508 }
509
510 #[test]
511 fn test_memory_degradation() {
512 let controller = DegradationController::default_controller();
513
514 let status = ResourceStatus::new(Some(0.75), None, None);
516 let (level, changed) = controller.update(&status);
517 assert_eq!(level, DegradationLevel::Reduced);
518 assert!(changed);
519
520 let status = ResourceStatus::new(Some(0.90), None, None);
522 let (level, _) = controller.update(&status);
523 assert_eq!(level, DegradationLevel::Minimal);
524
525 let status = ResourceStatus::new(Some(0.96), None, None);
527 let (level, _) = controller.update(&status);
528 assert_eq!(level, DegradationLevel::Emergency);
529 }
530
531 #[test]
532 fn test_disk_degradation() {
533 let controller = DegradationController::default_controller();
534
535 let status = ResourceStatus::new(None, Some(800), None);
537 let (level, _) = controller.update(&status);
538 assert_eq!(level, DegradationLevel::Reduced);
539
540 let status = ResourceStatus::new(None, Some(50), None);
542 let (level, _) = controller.update(&status);
543 assert_eq!(level, DegradationLevel::Emergency);
544 }
545
546 #[test]
547 fn test_force_level() {
548 let controller = DegradationController::default_controller();
549 controller.force_level(DegradationLevel::Minimal);
550 assert_eq!(controller.current_level(), DegradationLevel::Minimal);
551 }
552
553 #[test]
554 fn test_level_change_count() {
555 let controller = DegradationController::default_controller();
556 assert_eq!(controller.level_change_count(), 0);
557
558 controller.force_level(DegradationLevel::Reduced);
559 assert_eq!(controller.level_change_count(), 1);
560
561 controller.force_level(DegradationLevel::Normal);
562 assert_eq!(controller.level_change_count(), 2);
563 }
564
565 #[test]
566 fn test_actions_for_level() {
567 let normal_actions = DegradationActions::for_level(DegradationLevel::Normal);
568 assert!(!normal_actions.skip_data_quality);
569 assert!(!normal_actions.terminate);
570 assert_eq!(normal_actions.batch_size_factor, 1.0);
571
572 let emergency_actions = DegradationActions::for_level(DegradationLevel::Emergency);
573 assert!(emergency_actions.skip_data_quality);
574 assert!(emergency_actions.terminate);
575 assert_eq!(emergency_actions.batch_size_factor, 0.0);
576 }
577}