1use std::{
34 sync::atomic::{AtomicU64, Ordering},
35 time::{Duration, Instant},
36};
37
38use parking_lot::Mutex;
39
40#[derive(Debug, Clone)]
42pub struct CleanupConfig {
43 pub base_interval: Duration,
45
46 pub min_interval: Duration,
48
49 pub max_interval: Duration,
51
52 pub high_utilization_threshold: f64,
54
55 pub low_utilization_threshold: f64,
57
58 pub utilization_hysteresis: f64,
61
62 pub drop_some_threshold: f64,
65
66 pub drop_most_threshold: f64,
69
70 pub block_new_threshold: f64,
73
74 pub target_cleanup_duration: Duration,
76
77 pub high_load_message_rate: f64,
79
80 pub rate_window: Duration,
82
83 pub batch_size_smoothing: f64,
86
87 pub ttl_divisor: f64,
91
92 pub efficiency_weight: f64,
96
97 pub efficiency_smoothing: f64,
100
101 pub min_cycles_for_efficiency: u64,
104
105 pub max_ttl_interval_factor: f64,
108}
109
110impl Default for CleanupConfig {
111 fn default() -> Self {
112 Self {
113 base_interval: Duration::from_secs(30),
114 min_interval: Duration::from_secs(5),
115 max_interval: Duration::from_secs(120),
116 high_utilization_threshold: 0.8,
117 low_utilization_threshold: 0.3,
118 utilization_hysteresis: 0.03,
119 drop_some_threshold: 0.93,
120 drop_most_threshold: 0.96,
121 block_new_threshold: 0.98,
122 target_cleanup_duration: Duration::from_millis(50),
123 high_load_message_rate: 1000.0,
124 rate_window: Duration::from_secs(10),
125 batch_size_smoothing: 0.15,
126 ttl_divisor: 4.0,
127 efficiency_weight: 0.3,
128 efficiency_smoothing: 0.2,
129 min_cycles_for_efficiency: 3,
130 max_ttl_interval_factor: 2.0,
131 }
132 }
133}
134
135impl CleanupConfig {
136 pub fn new() -> Self {
138 let mut config = Self::default();
139 config.validate();
140 config
141 }
142
143 pub fn high_throughput() -> Self {
147 let mut config = Self {
148 base_interval: Duration::from_secs(15),
149 min_interval: Duration::from_secs(2),
150 max_interval: Duration::from_secs(60),
151 high_utilization_threshold: 0.7,
152 low_utilization_threshold: 0.2,
153 utilization_hysteresis: 0.04,
154 drop_some_threshold: 0.90,
155 drop_most_threshold: 0.94,
156 block_new_threshold: 0.97,
157 target_cleanup_duration: Duration::from_millis(25),
158 high_load_message_rate: 5000.0,
159 rate_window: Duration::from_secs(5),
160 batch_size_smoothing: 0.2,
161 ttl_divisor: 4.0,
162 efficiency_weight: 0.4,
163 efficiency_smoothing: 0.25,
164 min_cycles_for_efficiency: 2,
165 max_ttl_interval_factor: 1.5,
166 };
167 config.validate();
168 config
169 }
170
171 pub fn low_latency() -> Self {
175 let mut config = Self {
176 base_interval: Duration::from_secs(20),
177 min_interval: Duration::from_secs(5),
178 max_interval: Duration::from_secs(60),
179 high_utilization_threshold: 0.75,
180 low_utilization_threshold: 0.25,
181 utilization_hysteresis: 0.03,
182 drop_some_threshold: 0.93,
183 drop_most_threshold: 0.96,
184 block_new_threshold: 0.98,
185 target_cleanup_duration: Duration::from_millis(10),
186 high_load_message_rate: 500.0,
187 rate_window: Duration::from_secs(5),
188 batch_size_smoothing: 0.1,
189 ttl_divisor: 4.0,
190 efficiency_weight: 0.2,
191 efficiency_smoothing: 0.15,
192 min_cycles_for_efficiency: 4,
193 max_ttl_interval_factor: 2.0,
194 };
195 config.validate();
196 config
197 }
198
199 fn validate(&mut self) {
201 self.high_utilization_threshold = self.high_utilization_threshold.clamp(0.5, 0.99);
202 self.low_utilization_threshold = self.low_utilization_threshold.clamp(0.01, 0.5);
203 self.utilization_hysteresis = self.utilization_hysteresis.clamp(0.01, 0.1);
204 self.batch_size_smoothing = self.batch_size_smoothing.clamp(0.05, 0.5);
205 self.ttl_divisor = self.ttl_divisor.clamp(2.0, 20.0);
206 self.efficiency_weight = self.efficiency_weight.clamp(0.0, 1.0);
207 self.efficiency_smoothing = self.efficiency_smoothing.clamp(0.05, 0.5);
208 self.min_cycles_for_efficiency = self.min_cycles_for_efficiency.clamp(1, 10);
209 self.max_ttl_interval_factor = self.max_ttl_interval_factor.clamp(1.0, 5.0);
210
211 if self.high_utilization_threshold <= self.low_utilization_threshold {
213 self.high_utilization_threshold = self.low_utilization_threshold + 0.2;
214 }
215
216 self.drop_some_threshold = self.drop_some_threshold.clamp(0.85, 0.99);
218 self.drop_most_threshold = self
219 .drop_most_threshold
220 .clamp(self.drop_some_threshold + 0.01, 0.99);
221 self.block_new_threshold = self
222 .block_new_threshold
223 .clamp(self.drop_most_threshold + 0.01, 0.995);
224 }
225
226 pub const fn with_base_interval(mut self, interval: Duration) -> Self {
228 self.base_interval = interval;
229 self
230 }
231
232 pub const fn with_min_interval(mut self, interval: Duration) -> Self {
234 self.min_interval = interval;
235 self
236 }
237
238 pub const fn with_max_interval(mut self, interval: Duration) -> Self {
240 self.max_interval = interval;
241 self
242 }
243
244 pub fn with_high_utilization_threshold(mut self, threshold: f64) -> Self {
246 self.high_utilization_threshold = threshold;
247 self.validate();
248 self
249 }
250
251 pub fn with_low_utilization_threshold(mut self, threshold: f64) -> Self {
253 self.low_utilization_threshold = threshold;
254 self.validate();
255 self
256 }
257
258 pub fn with_ttl_divisor(mut self, divisor: f64) -> Self {
260 self.ttl_divisor = divisor;
261 self.validate();
262 self
263 }
264
265 pub fn with_backpressure_thresholds(
267 mut self,
268 drop_some: f64,
269 drop_most: f64,
270 block_new: f64,
271 ) -> Self {
272 self.drop_some_threshold = drop_some;
273 self.drop_most_threshold = drop_most;
274 self.block_new_threshold = block_new;
275 self.validate();
276 self
277 }
278
279 pub fn with_efficiency_smoothing(mut self, smoothing: f64) -> Self {
281 self.efficiency_smoothing = smoothing;
282 self.validate();
283 self
284 }
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
289pub enum BackpressureHint {
290 None,
292 DropSome,
294 DropMost,
296 BlockNew,
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302pub enum CleanupReason {
303 HighUtilization,
305 LowUtilization,
307 ModerateUtilization,
309 CriticalUtilization {
311 backpressure: BackpressureHint,
313 },
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318pub enum EfficiencyTrend {
319 Improving,
321 Degrading,
323 Stable,
325 Unknown,
327}
328
329#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub enum PressureTrend {
332 Increasing,
334 Decreasing,
336 Stable,
338 Unknown,
340}
341
342#[derive(Debug, Clone, Copy)]
344pub struct CleanupParameters {
345 pub interval: Duration,
347
348 pub batch_size: usize,
350
351 pub aggressive: bool,
353
354 pub utilization: f64,
356
357 pub message_rate: f64,
359
360 pub reason: CleanupReason,
362}
363
364impl CleanupParameters {
365 pub fn should_cleanup_now(&self) -> bool {
367 matches!(self.reason, CleanupReason::CriticalUtilization { .. })
368 }
369
370 pub fn backpressure_hint(&self) -> BackpressureHint {
372 match self.reason {
373 CleanupReason::CriticalUtilization { backpressure } => backpressure,
374 _ => BackpressureHint::None,
375 }
376 }
377}
378
379#[derive(Debug, Clone)]
381pub struct CleanupStats {
382 pub cleanup_cycles: u64,
384
385 pub entries_removed: u64,
387
388 pub entries_scanned: u64,
390
391 pub avg_cleanup_duration: Duration,
393
394 pub max_cleanup_duration: Duration,
396
397 pub current_message_rate: f64,
399
400 pub aggressive_cleanups: u64,
402
403 pub avg_removal_efficiency: f64,
405
406 pub recent_efficiency: f64,
408
409 pub efficiency_trend: EfficiencyTrend,
411
412 pub pressure_trend: PressureTrend,
414
415 pub recent_utilization: f64,
417}
418
419#[derive(Debug)]
424struct LockFreeRateCounter {
425 current_count: AtomicU64,
427}
428
429impl LockFreeRateCounter {
430 fn new() -> Self {
431 Self {
432 current_count: AtomicU64::new(0),
433 }
434 }
435
436 #[inline]
438 fn record(&self) {
439 self.current_count.fetch_add(1, Ordering::Relaxed);
440 }
441
442 #[inline]
444 fn record_batch(&self, count: u64) {
445 self.current_count.fetch_add(count, Ordering::Relaxed);
446 }
447
448 fn drain(&self) -> u64 {
450 self.current_count.swap(0, Ordering::AcqRel)
451 }
452
453 fn load(&self) -> u64 {
455 self.current_count.load(Ordering::Acquire)
456 }
457}
458
459#[derive(Debug)]
461struct RateWindowState {
462 current_start: Instant,
464 previous_count: u64,
466 previous_duration: Duration,
468 window_size: Duration,
470 accumulated_current: u64,
472}
473
474impl RateWindowState {
475 fn new(window_size: Duration) -> Self {
476 Self {
477 current_start: Instant::now(),
478 previous_count: 0,
479 previous_duration: Duration::ZERO,
480 window_size,
481 accumulated_current: 0,
482 }
483 }
484
485 fn sync_and_calculate_rate(&mut self, counter: &LockFreeRateCounter) -> f64 {
492 let now = Instant::now();
493
494 let drained = counter.drain();
496 self.accumulated_current = self.accumulated_current.saturating_add(drained);
497
498 let current_elapsed = now
500 .checked_duration_since(self.current_start)
501 .unwrap_or(Duration::ZERO);
502
503 let mut rollovers = 0;
505 let mut elapsed = current_elapsed;
506
507 while elapsed >= self.window_size && rollovers < 10 {
508 self.previous_count = self.accumulated_current;
510 self.previous_duration = self.window_size;
511 self.accumulated_current = 0;
512
513 self.current_start += self.window_size;
515
516 elapsed = now
518 .checked_duration_since(self.current_start)
519 .unwrap_or(Duration::ZERO);
520 rollovers += 1;
521 }
522
523 if rollovers >= 10 {
525 self.previous_count = self.accumulated_current;
526 self.previous_duration = self.window_size;
527 self.current_start = now;
528 self.accumulated_current = 0;
529 elapsed = Duration::ZERO;
530 }
531
532 let current = self.accumulated_current + counter.load();
534
535 let total_count = current + self.previous_count;
537 let total_duration = elapsed + self.previous_duration;
538
539 if total_duration.is_zero() {
540 0.0
541 } else {
542 total_count as f64 / total_duration.as_secs_f64()
543 }
544 }
545
546 fn reset(&mut self, counter: &LockFreeRateCounter) {
548 let drained = counter.drain();
549 let elapsed = Instant::now()
550 .checked_duration_since(self.current_start)
551 .unwrap_or(self.window_size);
552
553 self.previous_count = self.accumulated_current.saturating_add(drained);
554 self.previous_duration = self.window_size.min(elapsed);
555 self.current_start = Instant::now();
556 self.accumulated_current = 0;
557 }
558}
559
560#[derive(Debug)]
562struct CleanupState {
563 rate_window: RateWindowState,
565
566 target_batch_size: f64,
568
569 last_cleanup_duration: Duration,
571
572 cleanup_cycles: u64,
574
575 entries_removed: u64,
577
578 entries_scanned: u64,
580
581 max_cleanup_duration: Duration,
583
584 total_cleanup_duration: Duration,
586
587 aggressive_cleanups: u64,
589
590 was_aggressive: bool,
592
593 last_raw_efficiency: f64,
595
596 efficiency_ema: f64,
598
599 prev_efficiency_ema: f64,
601
602 utilization_ema: f64,
604
605 prev_utilization_ema: f64,
607}
608
609impl CleanupState {
610 fn new(window_size: Duration) -> Self {
611 Self {
612 rate_window: RateWindowState::new(window_size),
613 target_batch_size: 100.0,
614 last_cleanup_duration: Duration::ZERO,
615 cleanup_cycles: 0,
616 entries_removed: 0,
617 entries_scanned: 0,
618 max_cleanup_duration: Duration::ZERO,
619 total_cleanup_duration: Duration::ZERO,
620 aggressive_cleanups: 0,
621 was_aggressive: false,
622 last_raw_efficiency: 0.5,
623 efficiency_ema: 0.5,
624 prev_efficiency_ema: 0.5,
625 utilization_ema: 0.5,
626 prev_utilization_ema: 0.5,
627 }
628 }
629
630 fn efficiency_trend(&self, min_cycles: u64) -> EfficiencyTrend {
632 if self.cleanup_cycles < min_cycles {
633 return EfficiencyTrend::Unknown;
634 }
635
636 let diff = self.efficiency_ema - self.prev_efficiency_ema;
637 const THRESHOLD: f64 = 0.02; if diff > THRESHOLD {
640 EfficiencyTrend::Improving
641 } else if diff < -THRESHOLD {
642 EfficiencyTrend::Degrading
643 } else {
644 EfficiencyTrend::Stable
645 }
646 }
647
648 fn pressure_trend(&self, min_cycles: u64) -> PressureTrend {
650 if self.cleanup_cycles < min_cycles {
651 return PressureTrend::Unknown;
652 }
653
654 let diff = self.utilization_ema - self.prev_utilization_ema;
655 const THRESHOLD: f64 = 0.02; if diff > THRESHOLD {
658 PressureTrend::Increasing
659 } else if diff < -THRESHOLD {
660 PressureTrend::Decreasing
661 } else {
662 PressureTrend::Stable
663 }
664 }
665}
666
667#[derive(Debug)]
669pub struct CleanupTuner {
670 config: CleanupConfig,
671 rate_counter: LockFreeRateCounter,
673 state: Mutex<CleanupState>,
675}
676
677impl CleanupTuner {
678 pub fn new(mut config: CleanupConfig) -> Self {
680 config.validate();
681 let window_size = config.rate_window;
682 Self {
683 config,
684 rate_counter: LockFreeRateCounter::new(),
685 state: Mutex::new(CleanupState::new(window_size)),
686 }
687 }
688
689 pub fn with_defaults() -> Self {
691 Self::new(CleanupConfig::default())
692 }
693
694 #[inline]
702 pub fn record_message(&self) {
703 self.rate_counter.record();
704 }
705
706 #[inline]
715 pub fn record_messages(&self, count: u64) {
716 self.rate_counter.record_batch(count);
717 }
718
719 pub fn record_cleanup(
729 &self,
730 duration: Duration,
731 entries_removed: usize,
732 params: &CleanupParameters,
733 ) {
734 let mut state = self.state.lock();
735
736 let entries_scanned = params.batch_size;
738 let raw_efficiency = if entries_scanned > 0 {
739 (entries_removed as f64 / entries_scanned as f64).clamp(0.0, 1.0)
740 } else {
741 state.last_raw_efficiency };
743
744 state.prev_efficiency_ema = state.efficiency_ema;
746 state.efficiency_ema = self.config.efficiency_smoothing * raw_efficiency
747 + (1.0 - self.config.efficiency_smoothing) * state.efficiency_ema;
748 state.last_raw_efficiency = raw_efficiency;
749
750 state.prev_utilization_ema = state.utilization_ema;
752 state.utilization_ema = self.config.efficiency_smoothing * params.utilization
753 + (1.0 - self.config.efficiency_smoothing) * state.utilization_ema;
754
755 state.cleanup_cycles += 1;
756 state.entries_removed += entries_removed as u64;
757 state.entries_scanned += entries_scanned as u64;
758 state.last_cleanup_duration = duration;
759 state.total_cleanup_duration += duration;
760 state.was_aggressive = params.aggressive;
761
762 if duration > state.max_cleanup_duration {
763 state.max_cleanup_duration = duration;
764 }
765 if params.aggressive {
766 state.aggressive_cleanups += 1;
767 }
768 }
769
770 pub fn message_rate(&self) -> f64 {
774 let mut state = self.state.lock();
775 state
776 .rate_window
777 .sync_and_calculate_rate(&self.rate_counter)
778 }
779
780 pub fn reset_rate_window(&self) {
785 let mut state = self.state.lock();
786 state.rate_window.reset(&self.rate_counter);
787 }
788
789 pub fn get_parameters(
798 &self,
799 cache_utilization: f64,
800 current_ttl: Duration,
801 ) -> CleanupParameters {
802 let mut state = self.state.lock();
803 let message_rate = state
804 .rate_window
805 .sync_and_calculate_rate(&self.rate_counter);
806
807 let high_threshold = if state.was_aggressive {
809 self.config.high_utilization_threshold - self.config.utilization_hysteresis
810 } else {
811 self.config.high_utilization_threshold + self.config.utilization_hysteresis
812 };
813
814 let low_threshold = if state.was_aggressive {
815 self.config.low_utilization_threshold + self.config.utilization_hysteresis
816 } else {
817 self.config.low_utilization_threshold - self.config.utilization_hysteresis
818 };
819
820 let high_load = message_rate > self.config.high_load_message_rate;
822
823 let backpressure = if cache_utilization >= self.config.block_new_threshold {
825 if high_load {
826 BackpressureHint::BlockNew
827 } else {
828 BackpressureHint::DropMost
829 }
830 } else if cache_utilization >= self.config.drop_most_threshold {
831 BackpressureHint::DropMost
832 } else if cache_utilization >= self.config.drop_some_threshold {
833 BackpressureHint::DropSome
834 } else {
835 BackpressureHint::None
836 };
837
838 let (reason, aggressive) = if backpressure != BackpressureHint::None {
840 (CleanupReason::CriticalUtilization { backpressure }, true)
841 } else if cache_utilization > high_threshold {
842 (CleanupReason::HighUtilization, true)
843 } else if cache_utilization < low_threshold {
844 (CleanupReason::LowUtilization, false)
845 } else {
846 (CleanupReason::ModerateUtilization, false)
847 };
848
849 let base_interval = match reason {
851 CleanupReason::CriticalUtilization { backpressure } => {
852 match backpressure {
853 BackpressureHint::BlockNew | BackpressureHint::DropMost => {
854 self.config.min_interval / 2
856 }
857 BackpressureHint::DropSome => self.config.min_interval,
858 BackpressureHint::None => unreachable!(),
859 }
860 }
861 CleanupReason::HighUtilization => {
862 if high_load {
863 self.config.min_interval * 2
865 } else {
866 self.config.min_interval
867 }
868 }
869 CleanupReason::LowUtilization => {
870 self.config.max_interval.min(self.config.base_interval * 2)
872 }
873 CleanupReason::ModerateUtilization => {
874 let range = high_threshold - low_threshold;
876 let position = ((cache_utilization - low_threshold) / range).clamp(0.0, 1.0);
877 let max_ms = self.config.max_interval.as_millis() as f64;
878 let base_ms = self.config.base_interval.as_millis() as f64;
879 let interval_ms = max_ms - (position * (max_ms - base_ms));
880 Duration::from_millis(interval_ms as u64)
881 }
882 };
883
884 let batch_size = self.calculate_batch_size(&mut state);
886
887 let ttl_secs = current_ttl.as_secs_f64();
889 let min_ttl_interval = if ttl_secs > 0.0 && ttl_secs.is_finite() {
890 let raw_interval = ttl_secs / self.config.ttl_divisor;
891 let max_ttl_interval =
893 self.config.max_interval.as_secs_f64() * self.config.max_ttl_interval_factor;
894 Duration::from_secs_f64(raw_interval.min(max_ttl_interval))
895 } else {
896 Duration::ZERO
897 };
898
899 let final_interval = base_interval
900 .max(min_ttl_interval)
901 .max(self.config.min_interval);
902
903 CleanupParameters {
904 interval: final_interval,
905 batch_size,
906 aggressive,
907 utilization: cache_utilization,
908 message_rate,
909 reason,
910 }
911 }
912
913 fn calculate_batch_size(&self, state: &mut CleanupState) -> usize {
915 let target_ms = self.config.target_cleanup_duration.as_millis() as f64;
916 let last_ms = state.last_cleanup_duration.as_millis().max(1) as f64;
917
918 let duration_ratio = (target_ms / last_ms).clamp(0.5, 2.0);
920
921 let efficiency_factor = if state.cleanup_cycles >= self.config.min_cycles_for_efficiency {
923 self.calculate_efficiency_factor(state)
924 } else {
925 1.0 };
927
928 let trend_factor = match state.efficiency_trend(self.config.min_cycles_for_efficiency) {
931 EfficiencyTrend::Improving => 0.95, EfficiencyTrend::Degrading => 1.05, EfficiencyTrend::Stable | EfficiencyTrend::Unknown => 1.0,
934 };
935
936 let desired_batch =
937 (state.target_batch_size * duration_ratio * efficiency_factor * trend_factor)
938 .clamp(10.0, 500.0);
939
940 let alpha = self.config.batch_size_smoothing;
942 state.target_batch_size = alpha * desired_batch + (1.0 - alpha) * state.target_batch_size;
943
944 state.target_batch_size = state.target_batch_size.clamp(10.0, 500.0);
946
947 state.target_batch_size.round() as usize
948 }
949
950 fn calculate_efficiency_factor(&self, state: &CleanupState) -> f64 {
952 let eff = state.efficiency_ema;
953
954 const LOW_EFF: f64 = 0.15;
960 const TARGET_LOW: f64 = 0.25;
961 const TARGET_HIGH: f64 = 0.6;
962 const HIGH_EFF: f64 = 0.85;
963
964 if eff < LOW_EFF {
965 1.3 - (eff / LOW_EFF) * 0.15
968 } else if eff < TARGET_LOW {
969 let t = (eff - LOW_EFF) / (TARGET_LOW - LOW_EFF);
972 1.15 - t * 0.15
973 } else if eff <= TARGET_HIGH {
974 1.0
976 } else if eff < HIGH_EFF {
977 let t = (eff - TARGET_HIGH) / (HIGH_EFF - TARGET_HIGH);
980 1.0 - t * 0.15
981 } else {
982 0.85 - (eff - HIGH_EFF).min(0.15) * 0.33
985 }
986 .clamp(0.8, 1.3)
987 }
988
989 pub fn stats(&self) -> CleanupStats {
991 let mut state = self.state.lock();
992 let avg_duration = if state.cleanup_cycles > 0 {
993 state.total_cleanup_duration / state.cleanup_cycles as u32
994 } else {
995 Duration::ZERO
996 };
997
998 let avg_efficiency = if state.entries_scanned > 0 {
999 state.entries_removed as f64 / state.entries_scanned as f64
1000 } else {
1001 0.0
1002 };
1003
1004 let efficiency_trend = state.efficiency_trend(self.config.min_cycles_for_efficiency);
1005 let pressure_trend = state.pressure_trend(self.config.min_cycles_for_efficiency);
1006
1007 CleanupStats {
1008 cleanup_cycles: state.cleanup_cycles,
1009 entries_removed: state.entries_removed,
1010 entries_scanned: state.entries_scanned,
1011 avg_cleanup_duration: avg_duration,
1012 max_cleanup_duration: state.max_cleanup_duration,
1013 current_message_rate: state
1014 .rate_window
1015 .sync_and_calculate_rate(&self.rate_counter),
1016 aggressive_cleanups: state.aggressive_cleanups,
1017 avg_removal_efficiency: avg_efficiency,
1018 recent_efficiency: state.efficiency_ema,
1019 efficiency_trend,
1020 pressure_trend,
1021 recent_utilization: state.utilization_ema,
1022 }
1023 }
1024
1025 pub fn reset_stats(&self) {
1027 let mut state = self.state.lock();
1028 state.cleanup_cycles = 0;
1029 state.entries_removed = 0;
1030 state.entries_scanned = 0;
1031 state.max_cleanup_duration = Duration::ZERO;
1032 state.total_cleanup_duration = Duration::ZERO;
1033 state.aggressive_cleanups = 0;
1034 state.last_raw_efficiency = 0.5;
1035 state.efficiency_ema = 0.5;
1036 state.prev_efficiency_ema = 0.5;
1037 state.utilization_ema = 0.5;
1038 state.prev_utilization_ema = 0.5;
1039 }
1040
1041 pub fn config(&self) -> &CleanupConfig {
1043 &self.config
1044 }
1045}
1046
1047impl Default for CleanupTuner {
1048 fn default() -> Self {
1049 Self::with_defaults()
1050 }
1051}
1052
1053impl Clone for CleanupTuner {
1054 fn clone(&self) -> Self {
1055 Self::new(self.config.clone())
1057 }
1058}
1059
1060#[cfg(test)]
1061mod tests {
1062 use super::*;
1063 use std::thread;
1064
1065 #[test]
1066 fn test_cleanup_config_defaults() {
1067 let config = CleanupConfig::default();
1068 assert_eq!(config.base_interval, Duration::from_secs(30));
1069 assert_eq!(config.min_interval, Duration::from_secs(5));
1070 assert_eq!(config.max_interval, Duration::from_secs(120));
1071 assert_eq!(config.ttl_divisor, 4.0);
1072 }
1073
1074 #[test]
1075 fn test_cleanup_config_presets() {
1076 let high = CleanupConfig::high_throughput();
1077 assert!(high.min_interval < CleanupConfig::default().min_interval);
1078
1079 let low = CleanupConfig::low_latency();
1080 assert!(low.target_cleanup_duration < CleanupConfig::default().target_cleanup_duration);
1081 }
1082
1083 #[test]
1084 fn test_cleanup_config_validation() {
1085 let mut config = CleanupConfig::default();
1086 config.ttl_divisor = 0.5; config.validate();
1088 assert!(config.ttl_divisor >= 2.0);
1089
1090 config.ttl_divisor = 100.0; config.validate();
1092 assert!(config.ttl_divisor <= 20.0);
1093 }
1094
1095 #[test]
1096 fn test_cleanup_parameters_low_utilization() {
1097 let tuner = CleanupTuner::with_defaults();
1098 let params = tuner.get_parameters(0.1, Duration::from_secs(300));
1099
1100 assert!(!params.aggressive);
1101 assert!(params.interval >= tuner.config().base_interval);
1102 assert_eq!(params.reason, CleanupReason::LowUtilization);
1103 }
1104
1105 #[test]
1106 fn test_cleanup_parameters_high_utilization() {
1107 let tuner = CleanupTuner::with_defaults();
1108 let params = tuner.get_parameters(0.9, Duration::from_secs(60));
1110
1111 assert!(params.aggressive);
1112 assert!(params.interval <= tuner.config().base_interval.max(Duration::from_secs(15)));
1115 assert_eq!(params.reason, CleanupReason::HighUtilization);
1116 }
1117
1118 #[test]
1119 fn test_cleanup_parameters_moderate_utilization() {
1120 let tuner = CleanupTuner::with_defaults();
1121 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1122
1123 assert!(!params.aggressive);
1124 assert_eq!(params.reason, CleanupReason::ModerateUtilization);
1125 }
1126
1127 #[test]
1128 fn test_cleanup_parameters_critical_with_backpressure() {
1129 let tuner = CleanupTuner::with_defaults();
1130
1131 let params = tuner.get_parameters(0.94, Duration::from_secs(300));
1133 assert!(params.should_cleanup_now());
1134 assert_eq!(params.backpressure_hint(), BackpressureHint::DropSome);
1135
1136 let params = tuner.get_parameters(0.97, Duration::from_secs(300));
1138 assert_eq!(params.backpressure_hint(), BackpressureHint::DropMost);
1139 }
1140
1141 #[test]
1142 fn test_hysteresis_prevents_flapping() {
1143 let tuner = CleanupTuner::with_defaults();
1144
1145 let params1 = tuner.get_parameters(0.79, Duration::from_secs(300));
1147 assert!(!params1.aggressive);
1148
1149 tuner.record_cleanup(Duration::from_millis(10), 50, ¶ms1);
1151
1152 let params2 = tuner.get_parameters(0.81, Duration::from_secs(300));
1154 assert!(!params2.aggressive);
1156
1157 let params3 = tuner.get_parameters(0.85, Duration::from_secs(300));
1159 assert!(params3.aggressive);
1160 }
1161
1162 #[test]
1163 fn test_message_rate_tracking() {
1164 let tuner = CleanupTuner::with_defaults();
1165
1166 tuner.record_messages(100);
1168 thread::sleep(Duration::from_millis(10));
1169
1170 let rate = tuner.message_rate();
1171 assert!(rate > 0.0);
1172 }
1173
1174 #[test]
1175 fn test_cleanup_stats() {
1176 let tuner = CleanupTuner::with_defaults();
1177 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1178
1179 tuner.record_cleanup(Duration::from_millis(20), 50, ¶ms);
1180 tuner.record_cleanup(Duration::from_millis(30), 75, ¶ms);
1181
1182 let stats = tuner.stats();
1183 assert_eq!(stats.cleanup_cycles, 2);
1184 assert_eq!(stats.entries_removed, 125);
1185 }
1186
1187 #[test]
1188 fn test_reset_stats() {
1189 let tuner = CleanupTuner::with_defaults();
1190 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1191
1192 tuner.record_cleanup(Duration::from_millis(20), 50, ¶ms);
1193 tuner.reset_stats();
1194
1195 let stats = tuner.stats();
1196 assert_eq!(stats.cleanup_cycles, 0);
1197 assert_eq!(stats.entries_removed, 0);
1198 }
1199
1200 #[test]
1201 fn test_batch_size_adjustment() {
1202 let tuner = CleanupTuner::with_defaults();
1203 let params_initial = tuner.get_parameters(0.5, Duration::from_secs(300));
1204
1205 for _ in 0..5 {
1207 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1208 tuner.record_cleanup(Duration::from_millis(200), 100, ¶ms);
1209 }
1210
1211 let params_final = tuner.get_parameters(0.5, Duration::from_secs(300));
1212
1213 assert!(params_final.batch_size < params_initial.batch_size);
1215 assert!(params_final.batch_size >= 10); }
1217
1218 #[test]
1219 fn test_should_cleanup_now() {
1220 let tuner = CleanupTuner::with_defaults();
1221
1222 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1224 assert!(!params.should_cleanup_now());
1225
1226 let params = tuner.get_parameters(0.95, Duration::from_secs(300));
1228 assert!(params.should_cleanup_now());
1229 }
1230
1231 #[test]
1232 fn test_lock_free_recording() {
1233 let tuner = CleanupTuner::with_defaults();
1234
1235 for _ in 0..1000 {
1237 tuner.record_message();
1238 }
1239
1240 tuner.record_messages(500);
1241
1242 thread::sleep(Duration::from_millis(10));
1243 let rate = tuner.message_rate();
1244 assert!(rate > 0.0);
1245 }
1246
1247 #[test]
1248 fn test_efficiency_trend_tracking() {
1249 let config = CleanupConfig::default().with_efficiency_smoothing(0.4); let tuner = CleanupTuner::new(config);
1252
1253 for _ in 0..5 {
1255 let params = tuner.get_parameters(0.5, Duration::from_secs(60));
1256 let batch = params.batch_size;
1258 tuner.record_cleanup(Duration::from_millis(20), batch / 2, ¶ms);
1260 }
1261
1262 let stats = tuner.stats();
1263 assert_ne!(
1265 stats.efficiency_trend,
1266 EfficiencyTrend::Unknown,
1267 "Trend should be known after {} cycles (min_cycles={})",
1268 stats.cleanup_cycles,
1269 tuner.config().min_cycles_for_efficiency
1270 );
1271 assert!(
1273 stats.recent_efficiency > 0.3 && stats.recent_efficiency < 0.7,
1274 "Recent efficiency {} should be around 0.5",
1275 stats.recent_efficiency
1276 );
1277 }
1278
1279 #[test]
1280 fn test_pressure_trend_tracking() {
1281 let tuner = CleanupTuner::with_defaults();
1282
1283 for i in 0..5 {
1285 let util = 0.4 + (i as f64 * 0.1);
1286 let params = tuner.get_parameters(util, Duration::from_secs(300));
1287 tuner.record_cleanup(Duration::from_millis(20), 50, ¶ms);
1288 }
1289
1290 let stats = tuner.stats();
1291 assert_eq!(stats.pressure_trend, PressureTrend::Increasing);
1292 }
1293
1294 #[test]
1295 fn test_huge_ttl_safety() {
1296 let tuner = CleanupTuner::with_defaults();
1297
1298 let params = tuner.get_parameters(0.5, Duration::from_secs(86400 * 30)); assert!(
1303 params.interval
1304 <= tuner
1305 .config()
1306 .max_interval
1307 .mul_f64(tuner.config().max_ttl_interval_factor + 0.1)
1308 );
1309 }
1310
1311 #[test]
1312 fn test_zero_efficiency_startup() {
1313 let tuner = CleanupTuner::with_defaults();
1314
1315 for _ in 0..3 {
1317 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1318 tuner.record_cleanup(Duration::from_millis(20), 0, ¶ms);
1319 }
1320
1321 let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1322
1323 assert!(params.batch_size >= 10);
1325 assert!(params.batch_size <= 500);
1326 }
1327
1328 #[test]
1329 fn test_efficiency_factor_smooth_transitions() {
1330 let tuner = CleanupTuner::with_defaults();
1331 let mut state = CleanupState::new(Duration::from_secs(10));
1332
1333 let test_cases = [
1335 (0.05, 1.25..1.35), (0.15, 1.10..1.20), (0.25, 0.95..1.05), (0.40, 0.95..1.05), (0.60, 0.95..1.05), (0.75, 0.85..0.95), (0.90, 0.78..0.88), ];
1343
1344 for (eff, expected_range) in test_cases {
1345 state.efficiency_ema = eff;
1346 state.cleanup_cycles = 10; let factor = tuner.calculate_efficiency_factor(&state);
1348 assert!(
1349 factor >= expected_range.start && factor <= expected_range.end,
1350 "Efficiency {}: factor {} not in range {:?}",
1351 eff,
1352 factor,
1353 expected_range
1354 );
1355 }
1356 }
1357
1358 #[test]
1359 fn test_stats_includes_derived_metrics() {
1360 let tuner = CleanupTuner::with_defaults();
1361
1362 for i in 0..5 {
1363 let params = tuner.get_parameters(0.5 + (i as f64 * 0.05), Duration::from_secs(300));
1364 tuner.record_cleanup(Duration::from_millis(20), 50, ¶ms);
1365 }
1366
1367 let stats = tuner.stats();
1368
1369 assert!(stats.recent_efficiency > 0.0);
1371 assert!(stats.recent_utilization > 0.0);
1372 assert!(stats.entries_scanned > 0);
1373 assert_ne!(stats.efficiency_trend, EfficiencyTrend::Unknown);
1375 }
1376
1377 #[test]
1378 fn test_concurrent_recording() {
1379 use std::sync::Arc;
1380
1381 let tuner = Arc::new(CleanupTuner::with_defaults());
1382 let mut handles = vec![];
1383
1384 for _ in 0..4 {
1386 let tuner_clone = Arc::clone(&tuner);
1387 let handle = thread::spawn(move || {
1388 for _ in 0..1000 {
1389 tuner_clone.record_message();
1390 }
1391 tuner_clone.record_messages(500);
1392 });
1393 handles.push(handle);
1394 }
1395
1396 for handle in handles {
1398 handle.join().unwrap();
1399 }
1400
1401 thread::sleep(Duration::from_millis(10));
1402 let rate = tuner.message_rate();
1403 assert!(rate > 0.0);
1404
1405 }
1408
1409 #[test]
1410 fn test_rate_tracker_window_rollover() {
1411 let counter = LockFreeRateCounter::new();
1412 let mut state = RateWindowState::new(Duration::from_millis(100));
1413
1414 counter.record_batch(50);
1416 thread::sleep(Duration::from_millis(120));
1417
1418 let rate = state.sync_and_calculate_rate(&counter);
1420 assert!(rate > 0.0);
1421
1422 counter.record_batch(50);
1424 let rate2 = state.sync_and_calculate_rate(&counter);
1425 assert!(rate2 > 0.0);
1426 }
1427
1428 #[test]
1429 fn test_extreme_pressure_backpressure() {
1430 let config = CleanupConfig::default();
1431 let tuner = CleanupTuner::new(config);
1432
1433 tuner.record_messages(10000);
1435 thread::sleep(Duration::from_millis(10));
1436
1437 let params = tuner.get_parameters(0.98, Duration::from_secs(10));
1439
1440 assert_eq!(params.backpressure_hint(), BackpressureHint::BlockNew);
1442
1443 assert!(params.interval <= tuner.config().min_interval);
1447 }
1448
1449 #[test]
1450 fn test_builder_pattern() {
1451 let config = CleanupConfig::default()
1452 .with_base_interval(Duration::from_secs(45))
1453 .with_min_interval(Duration::from_secs(10))
1454 .with_max_interval(Duration::from_secs(180))
1455 .with_ttl_divisor(6.0)
1456 .with_efficiency_smoothing(0.25);
1457
1458 assert_eq!(config.base_interval, Duration::from_secs(45));
1459 assert_eq!(config.min_interval, Duration::from_secs(10));
1460 assert_eq!(config.max_interval, Duration::from_secs(180));
1461 assert_eq!(config.ttl_divisor, 6.0);
1462 assert_eq!(config.efficiency_smoothing, 0.25);
1463 }
1464}