1use std::sync::atomic::{AtomicU8, Ordering};
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
12#[repr(u8)]
13#[derive(Default)]
14pub enum DegradationLevel {
15 #[default]
17 Normal = 0,
18 Reduced = 1,
20 Minimal = 2,
22 Emergency = 3,
24}
25
26impl DegradationLevel {
27 pub fn skip_data_quality(&self) -> bool {
29 *self >= DegradationLevel::Reduced
30 }
31
32 pub fn skip_anomaly_injection(&self) -> bool {
34 *self >= DegradationLevel::Minimal
35 }
36
37 pub fn skip_optional_fields(&self) -> bool {
39 *self >= DegradationLevel::Minimal
40 }
41
42 pub fn requires_immediate_flush(&self) -> bool {
44 *self >= DegradationLevel::Emergency
45 }
46
47 pub fn should_terminate(&self) -> bool {
49 *self == DegradationLevel::Emergency
50 }
51
52 pub fn batch_size_multiplier(&self) -> f64 {
54 match self {
55 DegradationLevel::Normal => 1.0,
56 DegradationLevel::Reduced => 0.5,
57 DegradationLevel::Minimal => 0.25,
58 DegradationLevel::Emergency => 0.0,
59 }
60 }
61
62 pub fn anomaly_rate_multiplier(&self) -> f64 {
64 match self {
65 DegradationLevel::Normal => 1.0,
66 DegradationLevel::Reduced => 0.5,
67 DegradationLevel::Minimal => 0.0,
68 DegradationLevel::Emergency => 0.0,
69 }
70 }
71
72 pub fn name(&self) -> &'static str {
74 match self {
75 DegradationLevel::Normal => "Normal",
76 DegradationLevel::Reduced => "Reduced",
77 DegradationLevel::Minimal => "Minimal",
78 DegradationLevel::Emergency => "Emergency",
79 }
80 }
81
82 pub fn description(&self) -> &'static str {
84 match self {
85 DegradationLevel::Normal => "Full operation with all features enabled",
86 DegradationLevel::Reduced => {
87 "Reduced batch sizes, skip data quality injection, 50% anomaly rate"
88 }
89 DegradationLevel::Minimal => "Essential data only, no injections, minimal batch sizes",
90 DegradationLevel::Emergency => {
91 "Flush pending writes, save checkpoint, terminate gracefully"
92 }
93 }
94 }
95
96 pub fn from_u8(value: u8) -> Self {
98 match value {
99 0 => DegradationLevel::Normal,
100 1 => DegradationLevel::Reduced,
101 2 => DegradationLevel::Minimal,
102 _ => DegradationLevel::Emergency,
103 }
104 }
105}
106
107impl std::fmt::Display for DegradationLevel {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "{}", self.name())
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct DegradationConfig {
116 pub enabled: bool,
118 pub reduced_memory_threshold: f64,
120 pub minimal_memory_threshold: f64,
122 pub emergency_memory_threshold: f64,
124 pub reduced_disk_threshold_mb: usize,
126 pub minimal_disk_threshold_mb: usize,
128 pub emergency_disk_threshold_mb: usize,
130 pub reduced_cpu_threshold: f64,
132 pub minimal_cpu_threshold: f64,
134 pub auto_recovery: bool,
136 pub recovery_hysteresis: f64,
138}
139
140impl Default for DegradationConfig {
141 fn default() -> Self {
142 Self {
143 enabled: true,
144 reduced_memory_threshold: 0.70,
146 minimal_memory_threshold: 0.85,
147 emergency_memory_threshold: 0.95,
148 reduced_disk_threshold_mb: 1000,
150 minimal_disk_threshold_mb: 500,
151 emergency_disk_threshold_mb: 100,
152 reduced_cpu_threshold: 0.80,
154 minimal_cpu_threshold: 0.90,
155 auto_recovery: true,
157 recovery_hysteresis: 0.05,
158 }
159 }
160}
161
162impl DegradationConfig {
163 pub fn conservative() -> Self {
165 Self {
166 reduced_memory_threshold: 0.60,
167 minimal_memory_threshold: 0.75,
168 emergency_memory_threshold: 0.90,
169 reduced_disk_threshold_mb: 2000,
170 minimal_disk_threshold_mb: 1000,
171 emergency_disk_threshold_mb: 500,
172 reduced_cpu_threshold: 0.70,
173 minimal_cpu_threshold: 0.85,
174 ..Default::default()
175 }
176 }
177
178 pub fn aggressive() -> Self {
180 Self {
181 reduced_memory_threshold: 0.80,
182 minimal_memory_threshold: 0.90,
183 emergency_memory_threshold: 0.98,
184 reduced_disk_threshold_mb: 500,
185 minimal_disk_threshold_mb: 200,
186 emergency_disk_threshold_mb: 50,
187 reduced_cpu_threshold: 0.90,
188 minimal_cpu_threshold: 0.95,
189 ..Default::default()
190 }
191 }
192
193 pub fn disabled() -> Self {
195 Self {
196 enabled: false,
197 ..Default::default()
198 }
199 }
200}
201
202#[derive(Debug, Clone, Default)]
204pub struct ResourceStatus {
205 pub memory_usage: Option<f64>,
207 pub disk_available_mb: Option<usize>,
209 pub cpu_load: Option<f64>,
211}
212
213impl ResourceStatus {
214 pub fn new(
216 memory_usage: Option<f64>,
217 disk_available_mb: Option<usize>,
218 cpu_load: Option<f64>,
219 ) -> Self {
220 Self {
221 memory_usage,
222 disk_available_mb,
223 cpu_load,
224 }
225 }
226}
227
228#[derive(Debug)]
230pub struct DegradationController {
231 config: DegradationConfig,
232 current_level: AtomicU8,
233 level_change_count: std::sync::atomic::AtomicU64,
234}
235
236impl DegradationController {
237 pub fn new(config: DegradationConfig) -> Self {
239 Self {
240 config,
241 current_level: AtomicU8::new(DegradationLevel::Normal as u8),
242 level_change_count: std::sync::atomic::AtomicU64::new(0),
243 }
244 }
245
246 pub fn default_controller() -> Self {
248 Self::new(DegradationConfig::default())
249 }
250
251 pub fn disabled() -> Self {
253 Self::new(DegradationConfig::disabled())
254 }
255
256 pub fn shared(config: DegradationConfig) -> Arc<Self> {
258 Arc::new(Self::new(config))
259 }
260
261 pub fn current_level(&self) -> DegradationLevel {
263 DegradationLevel::from_u8(self.current_level.load(Ordering::Relaxed))
264 }
265
266 pub fn update(&self, status: &ResourceStatus) -> (DegradationLevel, bool) {
269 if !self.config.enabled {
270 return (DegradationLevel::Normal, false);
271 }
272
273 let new_level = self.calculate_level(status);
274 let old_level = self.current_level.swap(new_level as u8, Ordering::Relaxed);
275 let changed = old_level != new_level as u8;
276
277 if changed {
278 self.level_change_count.fetch_add(1, Ordering::Relaxed);
279 }
280
281 (new_level, changed)
282 }
283
284 fn calculate_level(&self, status: &ResourceStatus) -> DegradationLevel {
286 let current = self.current_level();
287
288 let mut level = DegradationLevel::Normal;
290
291 if let Some(mem_usage) = status.memory_usage {
293 let mem_level = if mem_usage >= self.config.emergency_memory_threshold {
294 DegradationLevel::Emergency
295 } else if mem_usage >= self.config.minimal_memory_threshold {
296 DegradationLevel::Minimal
297 } else if mem_usage >= self.config.reduced_memory_threshold {
298 DegradationLevel::Reduced
299 } else {
300 DegradationLevel::Normal
301 };
302 level = level.max(mem_level);
303 }
304
305 if let Some(disk_mb) = status.disk_available_mb {
307 let disk_level = if disk_mb <= self.config.emergency_disk_threshold_mb {
308 DegradationLevel::Emergency
309 } else if disk_mb <= self.config.minimal_disk_threshold_mb {
310 DegradationLevel::Minimal
311 } else if disk_mb <= self.config.reduced_disk_threshold_mb {
312 DegradationLevel::Reduced
313 } else {
314 DegradationLevel::Normal
315 };
316 level = level.max(disk_level);
317 }
318
319 if let Some(cpu) = status.cpu_load {
321 let cpu_level = if cpu >= self.config.minimal_cpu_threshold {
322 DegradationLevel::Minimal
323 } else if cpu >= self.config.reduced_cpu_threshold {
324 DegradationLevel::Reduced
325 } else {
326 DegradationLevel::Normal
327 };
328 level = level.max(cpu_level);
330 }
331
332 if self.config.auto_recovery && level < current {
334 let can_recover = if let Some(mem) = status.memory_usage {
336 match current {
337 DegradationLevel::Emergency => {
338 mem < self.config.emergency_memory_threshold
339 - self.config.recovery_hysteresis
340 }
341 DegradationLevel::Minimal => {
342 mem < self.config.minimal_memory_threshold - self.config.recovery_hysteresis
343 }
344 DegradationLevel::Reduced => {
345 mem < self.config.reduced_memory_threshold - self.config.recovery_hysteresis
346 }
347 DegradationLevel::Normal => true,
348 }
349 } else {
350 true
351 };
352
353 if can_recover {
354 level = level.max(match current {
356 DegradationLevel::Emergency => DegradationLevel::Minimal,
357 DegradationLevel::Minimal => DegradationLevel::Reduced,
358 _ => DegradationLevel::Normal,
359 });
360 } else {
361 level = current;
362 }
363 }
364
365 level
366 }
367
368 pub fn force_level(&self, level: DegradationLevel) {
370 self.current_level.store(level as u8, Ordering::Relaxed);
371 self.level_change_count.fetch_add(1, Ordering::Relaxed);
372 }
373
374 pub fn reset(&self) {
376 self.current_level
377 .store(DegradationLevel::Normal as u8, Ordering::Relaxed);
378 }
379
380 pub fn level_change_count(&self) -> u64 {
382 self.level_change_count.load(Ordering::Relaxed)
383 }
384
385 pub fn is_degraded(&self) -> bool {
387 self.current_level() != DegradationLevel::Normal
388 }
389
390 pub fn config(&self) -> &DegradationConfig {
392 &self.config
393 }
394}
395
396impl Default for DegradationController {
397 fn default() -> Self {
398 Self::default_controller()
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct DegradationActions {
405 pub skip_data_quality: bool,
407 pub skip_anomaly_injection: bool,
409 pub skip_optional_fields: bool,
411 pub batch_size_factor: f64,
413 pub anomaly_rate_factor: f64,
415 pub use_compact_output: bool,
417 pub immediate_flush: bool,
419 pub terminate: bool,
421}
422
423impl DegradationActions {
424 pub fn for_level(level: DegradationLevel) -> Self {
426 match level {
427 DegradationLevel::Normal => Self {
428 skip_data_quality: false,
429 skip_anomaly_injection: false,
430 skip_optional_fields: false,
431 batch_size_factor: 1.0,
432 anomaly_rate_factor: 1.0,
433 use_compact_output: false,
434 immediate_flush: false,
435 terminate: false,
436 },
437 DegradationLevel::Reduced => Self {
438 skip_data_quality: true,
439 skip_anomaly_injection: false,
440 skip_optional_fields: false,
441 batch_size_factor: 0.5,
442 anomaly_rate_factor: 0.5,
443 use_compact_output: true,
444 immediate_flush: false,
445 terminate: false,
446 },
447 DegradationLevel::Minimal => Self {
448 skip_data_quality: true,
449 skip_anomaly_injection: true,
450 skip_optional_fields: true,
451 batch_size_factor: 0.25,
452 anomaly_rate_factor: 0.0,
453 use_compact_output: true,
454 immediate_flush: true,
455 terminate: false,
456 },
457 DegradationLevel::Emergency => Self {
458 skip_data_quality: true,
459 skip_anomaly_injection: true,
460 skip_optional_fields: true,
461 batch_size_factor: 0.0,
462 anomaly_rate_factor: 0.0,
463 use_compact_output: true,
464 immediate_flush: true,
465 terminate: true,
466 },
467 }
468 }
469}
470
471#[cfg(test)]
472#[allow(clippy::unwrap_used)]
473mod tests {
474 use super::*;
475
476 #[test]
477 fn test_degradation_level_ordering() {
478 assert!(DegradationLevel::Normal < DegradationLevel::Reduced);
479 assert!(DegradationLevel::Reduced < DegradationLevel::Minimal);
480 assert!(DegradationLevel::Minimal < DegradationLevel::Emergency);
481 }
482
483 #[test]
484 fn test_level_behavior_flags() {
485 assert!(!DegradationLevel::Normal.skip_data_quality());
486 assert!(DegradationLevel::Reduced.skip_data_quality());
487 assert!(DegradationLevel::Minimal.skip_data_quality());
488
489 assert!(!DegradationLevel::Normal.skip_anomaly_injection());
490 assert!(!DegradationLevel::Reduced.skip_anomaly_injection());
491 assert!(DegradationLevel::Minimal.skip_anomaly_injection());
492
493 assert!(!DegradationLevel::Minimal.should_terminate());
494 assert!(DegradationLevel::Emergency.should_terminate());
495 }
496
497 #[test]
498 fn test_controller_creation() {
499 let controller = DegradationController::default_controller();
500 assert_eq!(controller.current_level(), DegradationLevel::Normal);
501 }
502
503 #[test]
504 fn test_controller_disabled() {
505 let controller = DegradationController::disabled();
506 let status = ResourceStatus::new(Some(0.99), Some(10), Some(0.99));
507 let (level, _) = controller.update(&status);
508 assert_eq!(level, DegradationLevel::Normal);
509 }
510
511 #[test]
512 fn test_memory_degradation() {
513 let controller = DegradationController::default_controller();
514
515 let status = ResourceStatus::new(Some(0.75), None, None);
517 let (level, changed) = controller.update(&status);
518 assert_eq!(level, DegradationLevel::Reduced);
519 assert!(changed);
520
521 let status = ResourceStatus::new(Some(0.90), None, None);
523 let (level, _) = controller.update(&status);
524 assert_eq!(level, DegradationLevel::Minimal);
525
526 let status = ResourceStatus::new(Some(0.96), None, None);
528 let (level, _) = controller.update(&status);
529 assert_eq!(level, DegradationLevel::Emergency);
530 }
531
532 #[test]
533 fn test_disk_degradation() {
534 let controller = DegradationController::default_controller();
535
536 let status = ResourceStatus::new(None, Some(800), None);
538 let (level, _) = controller.update(&status);
539 assert_eq!(level, DegradationLevel::Reduced);
540
541 let status = ResourceStatus::new(None, Some(50), None);
543 let (level, _) = controller.update(&status);
544 assert_eq!(level, DegradationLevel::Emergency);
545 }
546
547 #[test]
548 fn test_force_level() {
549 let controller = DegradationController::default_controller();
550 controller.force_level(DegradationLevel::Minimal);
551 assert_eq!(controller.current_level(), DegradationLevel::Minimal);
552 }
553
554 #[test]
555 fn test_level_change_count() {
556 let controller = DegradationController::default_controller();
557 assert_eq!(controller.level_change_count(), 0);
558
559 controller.force_level(DegradationLevel::Reduced);
560 assert_eq!(controller.level_change_count(), 1);
561
562 controller.force_level(DegradationLevel::Normal);
563 assert_eq!(controller.level_change_count(), 2);
564 }
565
566 #[test]
567 fn test_actions_for_level() {
568 let normal_actions = DegradationActions::for_level(DegradationLevel::Normal);
569 assert!(!normal_actions.skip_data_quality);
570 assert!(!normal_actions.terminate);
571 assert_eq!(normal_actions.batch_size_factor, 1.0);
572
573 let emergency_actions = DegradationActions::for_level(DegradationLevel::Emergency);
574 assert!(emergency_actions.skip_data_quality);
575 assert!(emergency_actions.terminate);
576 assert_eq!(emergency_actions.batch_size_factor, 0.0);
577 }
578}