memberlist_plumtree/
cleanup_tuner.rs

1//! Dynamic cleanup tuning based on system load.
2//!
3//! This module provides adaptive cleanup behavior that adjusts based on:
4//!
5//! - **Cache utilization**: Clean more aggressively when cache is near capacity
6//! - **Message rate**: Be more conservative when under high load
7//! - **Cleanup duration**: Adjust batch sizes to keep cleanup fast
8//! - **Removal efficiency**: Scan more when hit rate is low, less when high
9//!
10//! # Example
11//!
12//! ```ignore
13//! use memberlist_plumtree::CleanupTuner;
14//!
15//! let tuner = CleanupTuner::new(CleanupConfig::default());
16//!
17//! // Batch recording for efficiency (recommended for throughput > 10k msg/s)
18//! tuner.record_messages(50);
19//!
20//! // Get tuned parameters before cleanup
21//! let params = tuner.get_parameters(cache_utilization, Duration::from_secs(300));
22//!
23//! // After cleanup completes
24//! tuner.record_cleanup(duration, entries_removed, &params);
25//! ```
26//!
27//! # Performance Notes
28//!
29//! - `record_message()` and `record_messages()` are fully lock-free (atomic only)
30//! - The mutex is only acquired when calculating parameters or reading stats
31//! - For >50k msg/s, batch every 100-200 messages with `record_messages()`
32
33use std::{
34    sync::atomic::{AtomicU64, Ordering},
35    time::{Duration, Instant},
36};
37
38use parking_lot::Mutex;
39
40/// Configuration for dynamic cleanup tuning.
41#[derive(Debug, Clone)]
42pub struct CleanupConfig {
43    /// Base cleanup interval (when utilization is moderate).
44    pub base_interval: Duration,
45
46    /// Minimum cleanup interval (under high utilization).
47    pub min_interval: Duration,
48
49    /// Maximum cleanup interval (under low utilization).
50    pub max_interval: Duration,
51
52    /// Cache utilization threshold for aggressive cleanup (0.0-1.0).
53    pub high_utilization_threshold: f64,
54
55    /// Cache utilization threshold for relaxed cleanup (0.0-1.0).
56    pub low_utilization_threshold: f64,
57
58    /// Hysteresis for utilization thresholds (prevents flapping).
59    /// Applied as ± adjustment to thresholds when already in a state.
60    pub utilization_hysteresis: f64,
61
62    /// Utilization threshold for DropSome backpressure (0.0-1.0).
63    /// Default: 0.93
64    pub drop_some_threshold: f64,
65
66    /// Utilization threshold for DropMost backpressure (0.0-1.0).
67    /// Default: 0.96
68    pub drop_most_threshold: f64,
69
70    /// Utilization threshold for BlockNew backpressure (0.0-1.0).
71    /// Default: 0.98
72    pub block_new_threshold: f64,
73
74    /// Target cleanup duration (try to keep cleanup under this).
75    pub target_cleanup_duration: Duration,
76
77    /// Message rate threshold for high load (messages per second).
78    pub high_load_message_rate: f64,
79
80    /// Window size for calculating message rate.
81    pub rate_window: Duration,
82
83    /// Batch size smoothing factor (0.0-1.0). Higher = more responsive to changes.
84    /// Recommended: 0.1-0.2 for production stability.
85    pub batch_size_smoothing: f64,
86
87    /// Cleanup interval should be at least TTL divided by this value.
88    /// Default is 4, meaning cleanup runs at least 4 times per TTL period.
89    /// Valid range: 2.0-20.0 (enforced at construction).
90    pub ttl_divisor: f64,
91
92    /// Weight for removal efficiency in batch size calculation (0.0-1.0).
93    /// Higher values make batch size more responsive to how many entries
94    /// are actually being removed vs just scanned.
95    pub efficiency_weight: f64,
96
97    /// Smoothing factor for efficiency EMA (0.0-1.0).
98    /// Lower = smoother/slower to respond, higher = more responsive.
99    pub efficiency_smoothing: f64,
100
101    /// Minimum cleanup cycles before efficiency data is trusted.
102    /// Prevents wild adjustments during startup.
103    pub min_cycles_for_efficiency: u64,
104
105    /// Maximum TTL-based interval multiplier to prevent huge intervals.
106    /// If TTL is very large, interval is capped at max_interval * this factor.
107    pub max_ttl_interval_factor: f64,
108}
109
110impl Default for CleanupConfig {
111    fn default() -> Self {
112        Self {
113            base_interval: Duration::from_secs(30),
114            min_interval: Duration::from_secs(5),
115            max_interval: Duration::from_secs(120),
116            high_utilization_threshold: 0.8,
117            low_utilization_threshold: 0.3,
118            utilization_hysteresis: 0.03,
119            drop_some_threshold: 0.93,
120            drop_most_threshold: 0.96,
121            block_new_threshold: 0.98,
122            target_cleanup_duration: Duration::from_millis(50),
123            high_load_message_rate: 1000.0,
124            rate_window: Duration::from_secs(10),
125            batch_size_smoothing: 0.15,
126            ttl_divisor: 4.0,
127            efficiency_weight: 0.3,
128            efficiency_smoothing: 0.2,
129            min_cycles_for_efficiency: 3,
130            max_ttl_interval_factor: 2.0,
131        }
132    }
133}
134
135impl CleanupConfig {
136    /// Create a new cleanup config with default values.
137    pub fn new() -> Self {
138        let mut config = Self::default();
139        config.validate();
140        config
141    }
142
143    /// Configuration for high-throughput scenarios.
144    ///
145    /// More aggressive cleanup to prevent memory pressure.
146    pub fn high_throughput() -> Self {
147        let mut config = Self {
148            base_interval: Duration::from_secs(15),
149            min_interval: Duration::from_secs(2),
150            max_interval: Duration::from_secs(60),
151            high_utilization_threshold: 0.7,
152            low_utilization_threshold: 0.2,
153            utilization_hysteresis: 0.04,
154            drop_some_threshold: 0.90,
155            drop_most_threshold: 0.94,
156            block_new_threshold: 0.97,
157            target_cleanup_duration: Duration::from_millis(25),
158            high_load_message_rate: 5000.0,
159            rate_window: Duration::from_secs(5),
160            batch_size_smoothing: 0.2,
161            ttl_divisor: 4.0,
162            efficiency_weight: 0.4,
163            efficiency_smoothing: 0.25,
164            min_cycles_for_efficiency: 2,
165            max_ttl_interval_factor: 1.5,
166        };
167        config.validate();
168        config
169    }
170
171    /// Configuration for low-latency scenarios.
172    ///
173    /// Smaller, more frequent cleanups to minimize impact.
174    pub fn low_latency() -> Self {
175        let mut config = Self {
176            base_interval: Duration::from_secs(20),
177            min_interval: Duration::from_secs(5),
178            max_interval: Duration::from_secs(60),
179            high_utilization_threshold: 0.75,
180            low_utilization_threshold: 0.25,
181            utilization_hysteresis: 0.03,
182            drop_some_threshold: 0.93,
183            drop_most_threshold: 0.96,
184            block_new_threshold: 0.98,
185            target_cleanup_duration: Duration::from_millis(10),
186            high_load_message_rate: 500.0,
187            rate_window: Duration::from_secs(5),
188            batch_size_smoothing: 0.1,
189            ttl_divisor: 4.0,
190            efficiency_weight: 0.2,
191            efficiency_smoothing: 0.15,
192            min_cycles_for_efficiency: 4,
193            max_ttl_interval_factor: 2.0,
194        };
195        config.validate();
196        config
197    }
198
199    /// Validate and clamp configuration values to safe ranges.
200    fn validate(&mut self) {
201        self.high_utilization_threshold = self.high_utilization_threshold.clamp(0.5, 0.99);
202        self.low_utilization_threshold = self.low_utilization_threshold.clamp(0.01, 0.5);
203        self.utilization_hysteresis = self.utilization_hysteresis.clamp(0.01, 0.1);
204        self.batch_size_smoothing = self.batch_size_smoothing.clamp(0.05, 0.5);
205        self.ttl_divisor = self.ttl_divisor.clamp(2.0, 20.0);
206        self.efficiency_weight = self.efficiency_weight.clamp(0.0, 1.0);
207        self.efficiency_smoothing = self.efficiency_smoothing.clamp(0.05, 0.5);
208        self.min_cycles_for_efficiency = self.min_cycles_for_efficiency.clamp(1, 10);
209        self.max_ttl_interval_factor = self.max_ttl_interval_factor.clamp(1.0, 5.0);
210
211        // Ensure thresholds are properly ordered
212        if self.high_utilization_threshold <= self.low_utilization_threshold {
213            self.high_utilization_threshold = self.low_utilization_threshold + 0.2;
214        }
215
216        // Ensure backpressure thresholds are ordered
217        self.drop_some_threshold = self.drop_some_threshold.clamp(0.85, 0.99);
218        self.drop_most_threshold = self
219            .drop_most_threshold
220            .clamp(self.drop_some_threshold + 0.01, 0.99);
221        self.block_new_threshold = self
222            .block_new_threshold
223            .clamp(self.drop_most_threshold + 0.01, 0.995);
224    }
225
226    /// Set the base cleanup interval (builder pattern).
227    pub const fn with_base_interval(mut self, interval: Duration) -> Self {
228        self.base_interval = interval;
229        self
230    }
231
232    /// Set the minimum cleanup interval (builder pattern).
233    pub const fn with_min_interval(mut self, interval: Duration) -> Self {
234        self.min_interval = interval;
235        self
236    }
237
238    /// Set the maximum cleanup interval (builder pattern).
239    pub const fn with_max_interval(mut self, interval: Duration) -> Self {
240        self.max_interval = interval;
241        self
242    }
243
244    /// Set the high utilization threshold (builder pattern).
245    pub fn with_high_utilization_threshold(mut self, threshold: f64) -> Self {
246        self.high_utilization_threshold = threshold;
247        self.validate();
248        self
249    }
250
251    /// Set the low utilization threshold (builder pattern).
252    pub fn with_low_utilization_threshold(mut self, threshold: f64) -> Self {
253        self.low_utilization_threshold = threshold;
254        self.validate();
255        self
256    }
257
258    /// Set the TTL divisor (builder pattern).
259    pub fn with_ttl_divisor(mut self, divisor: f64) -> Self {
260        self.ttl_divisor = divisor;
261        self.validate();
262        self
263    }
264
265    /// Set backpressure thresholds (builder pattern).
266    pub fn with_backpressure_thresholds(
267        mut self,
268        drop_some: f64,
269        drop_most: f64,
270        block_new: f64,
271    ) -> Self {
272        self.drop_some_threshold = drop_some;
273        self.drop_most_threshold = drop_most;
274        self.block_new_threshold = block_new;
275        self.validate();
276        self
277    }
278
279    /// Set efficiency smoothing factor (builder pattern).
280    pub fn with_efficiency_smoothing(mut self, smoothing: f64) -> Self {
281        self.efficiency_smoothing = smoothing;
282        self.validate();
283        self
284    }
285}
286
287/// Hint about whether to apply backpressure or drop messages.
288#[derive(Debug, Clone, Copy, PartialEq, Eq)]
289pub enum BackpressureHint {
290    /// No backpressure needed.
291    None,
292    /// Consider dropping some lower-priority messages.
293    DropSome,
294    /// Drop most non-critical messages.
295    DropMost,
296    /// Block new messages temporarily.
297    BlockNew,
298}
299
300/// Reason for cleanup parameter decision.
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302pub enum CleanupReason {
303    /// High cache utilization detected.
304    HighUtilization,
305    /// Low cache utilization, relaxed cleanup.
306    LowUtilization,
307    /// Moderate utilization, normal cleanup.
308    ModerateUtilization,
309    /// Critical utilization, immediate cleanup needed.
310    CriticalUtilization {
311        /// The level of backpressure recommended.
312        backpressure: BackpressureHint,
313    },
314}
315
316/// Efficiency trend direction.
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318pub enum EfficiencyTrend {
319    /// Efficiency is improving (removing more per scan).
320    Improving,
321    /// Efficiency is degrading (removing less per scan).
322    Degrading,
323    /// Efficiency is stable.
324    Stable,
325    /// Not enough data to determine trend.
326    Unknown,
327}
328
329/// Pressure trend direction.
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub enum PressureTrend {
332    /// Pressure is increasing (utilization rising).
333    Increasing,
334    /// Pressure is decreasing (utilization falling).
335    Decreasing,
336    /// Pressure is stable.
337    Stable,
338    /// Not enough data to determine trend.
339    Unknown,
340}
341
342/// Tuned cleanup parameters.
343#[derive(Debug, Clone, Copy)]
344pub struct CleanupParameters {
345    /// Recommended interval until next cleanup.
346    pub interval: Duration,
347
348    /// Recommended batch size per shard.
349    pub batch_size: usize,
350
351    /// Whether aggressive cleanup is recommended.
352    pub aggressive: bool,
353
354    /// The utilization level that triggered these parameters.
355    pub utilization: f64,
356
357    /// Current message rate (messages per second).
358    pub message_rate: f64,
359
360    /// Reason for these parameters.
361    pub reason: CleanupReason,
362}
363
364impl CleanupParameters {
365    /// Check if immediate cleanup is recommended.
366    pub fn should_cleanup_now(&self) -> bool {
367        matches!(self.reason, CleanupReason::CriticalUtilization { .. })
368    }
369
370    /// Get backpressure hint if applicable.
371    pub fn backpressure_hint(&self) -> BackpressureHint {
372        match self.reason {
373            CleanupReason::CriticalUtilization { backpressure } => backpressure,
374            _ => BackpressureHint::None,
375        }
376    }
377}
378
379/// Statistics about cleanup performance.
380#[derive(Debug, Clone)]
381pub struct CleanupStats {
382    /// Total cleanup cycles completed.
383    pub cleanup_cycles: u64,
384
385    /// Total entries removed.
386    pub entries_removed: u64,
387
388    /// Total entries scanned.
389    pub entries_scanned: u64,
390
391    /// Average cleanup duration.
392    pub avg_cleanup_duration: Duration,
393
394    /// Maximum cleanup duration observed.
395    pub max_cleanup_duration: Duration,
396
397    /// Current message rate (messages per second).
398    pub current_message_rate: f64,
399
400    /// Number of times aggressive cleanup was triggered.
401    pub aggressive_cleanups: u64,
402
403    /// Average removal efficiency (entries removed / entries scanned).
404    pub avg_removal_efficiency: f64,
405
406    /// Recent removal efficiency (EMA-smoothed).
407    pub recent_efficiency: f64,
408
409    /// Current efficiency trend.
410    pub efficiency_trend: EfficiencyTrend,
411
412    /// Current pressure trend (based on recent utilization).
413    pub pressure_trend: PressureTrend,
414
415    /// Recent average utilization (EMA-smoothed).
416    pub recent_utilization: f64,
417}
418
419/// Lock-free rate counter for high-throughput message recording.
420///
421/// This counter is completely lock-free for recording operations.
422/// The mutex is only needed when calculating rate (which syncs window state).
423#[derive(Debug)]
424struct LockFreeRateCounter {
425    /// Current window message count (fully lock-free).
426    current_count: AtomicU64,
427}
428
429impl LockFreeRateCounter {
430    fn new() -> Self {
431        Self {
432            current_count: AtomicU64::new(0),
433        }
434    }
435
436    /// Record a single message (fully lock-free).
437    #[inline]
438    fn record(&self) {
439        self.current_count.fetch_add(1, Ordering::Relaxed);
440    }
441
442    /// Record multiple messages (fully lock-free).
443    #[inline]
444    fn record_batch(&self, count: u64) {
445        self.current_count.fetch_add(count, Ordering::Relaxed);
446    }
447
448    /// Drain the counter and return the count (used during window sync).
449    fn drain(&self) -> u64 {
450        self.current_count.swap(0, Ordering::AcqRel)
451    }
452
453    /// Read current count without draining.
454    fn load(&self) -> u64 {
455        self.current_count.load(Ordering::Acquire)
456    }
457}
458
459/// Dual-window rate tracker state (requires mutex for window management).
460#[derive(Debug)]
461struct RateWindowState {
462    /// Current window start time.
463    current_start: Instant,
464    /// Previous window message count.
465    previous_count: u64,
466    /// Previous window duration.
467    previous_duration: Duration,
468    /// Window size.
469    window_size: Duration,
470    /// Accumulated count from lock-free counter since last sync.
471    accumulated_current: u64,
472}
473
474impl RateWindowState {
475    fn new(window_size: Duration) -> Self {
476        Self {
477            current_start: Instant::now(),
478            previous_count: 0,
479            previous_duration: Duration::ZERO,
480            window_size,
481            accumulated_current: 0,
482        }
483    }
484
485    /// Sync with lock-free counter and calculate rate.
486    ///
487    /// Hardened against:
488    /// - Clock jumps/backwards movement
489    /// - Very long pauses between calls
490    /// - Multiple missed window boundaries
491    fn sync_and_calculate_rate(&mut self, counter: &LockFreeRateCounter) -> f64 {
492        let now = Instant::now();
493
494        // Drain atomic counter into accumulated
495        let drained = counter.drain();
496        self.accumulated_current = self.accumulated_current.saturating_add(drained);
497
498        // Protect against clock issues
499        let current_elapsed = now
500            .checked_duration_since(self.current_start)
501            .unwrap_or(Duration::ZERO);
502
503        // Handle window rollover (cap iterations to prevent infinite loops)
504        let mut rollovers = 0;
505        let mut elapsed = current_elapsed;
506
507        while elapsed >= self.window_size && rollovers < 10 {
508            // Store previous window data
509            self.previous_count = self.accumulated_current;
510            self.previous_duration = self.window_size;
511            self.accumulated_current = 0;
512
513            // Advance window start
514            self.current_start += self.window_size;
515
516            // Recalculate elapsed
517            elapsed = now
518                .checked_duration_since(self.current_start)
519                .unwrap_or(Duration::ZERO);
520            rollovers += 1;
521        }
522
523        // If we hit rollover limit, force reset to current time
524        if rollovers >= 10 {
525            self.previous_count = self.accumulated_current;
526            self.previous_duration = self.window_size;
527            self.current_start = now;
528            self.accumulated_current = 0;
529            elapsed = Duration::ZERO;
530        }
531
532        // Add any remaining count from counter (for accurate current window)
533        let current = self.accumulated_current + counter.load();
534
535        // Blend current and previous windows
536        let total_count = current + self.previous_count;
537        let total_duration = elapsed + self.previous_duration;
538
539        if total_duration.is_zero() {
540            0.0
541        } else {
542            total_count as f64 / total_duration.as_secs_f64()
543        }
544    }
545
546    /// Force reset the window.
547    fn reset(&mut self, counter: &LockFreeRateCounter) {
548        let drained = counter.drain();
549        let elapsed = Instant::now()
550            .checked_duration_since(self.current_start)
551            .unwrap_or(self.window_size);
552
553        self.previous_count = self.accumulated_current.saturating_add(drained);
554        self.previous_duration = self.window_size.min(elapsed);
555        self.current_start = Instant::now();
556        self.accumulated_current = 0;
557    }
558}
559
560/// Internal state for tracking cleanup metrics.
561#[derive(Debug)]
562struct CleanupState {
563    /// Rate window state (requires mutex).
564    rate_window: RateWindowState,
565
566    /// Smoothed batch size target.
567    target_batch_size: f64,
568
569    /// Last cleanup duration.
570    last_cleanup_duration: Duration,
571
572    /// Total cleanup cycles.
573    cleanup_cycles: u64,
574
575    /// Total entries removed.
576    entries_removed: u64,
577
578    /// Total entries scanned.
579    entries_scanned: u64,
580
581    /// Max cleanup duration.
582    max_cleanup_duration: Duration,
583
584    /// Sum of cleanup durations (for average).
585    total_cleanup_duration: Duration,
586
587    /// Aggressive cleanup count.
588    aggressive_cleanups: u64,
589
590    /// Was the last cleanup aggressive? (for hysteresis).
591    was_aggressive: bool,
592
593    /// Raw last efficiency (for trend calculation).
594    last_raw_efficiency: f64,
595
596    /// EMA-smoothed efficiency.
597    efficiency_ema: f64,
598
599    /// Previous EMA efficiency (for trend).
600    prev_efficiency_ema: f64,
601
602    /// EMA-smoothed utilization (for pressure trend).
603    utilization_ema: f64,
604
605    /// Previous EMA utilization (for trend).
606    prev_utilization_ema: f64,
607}
608
609impl CleanupState {
610    fn new(window_size: Duration) -> Self {
611        Self {
612            rate_window: RateWindowState::new(window_size),
613            target_batch_size: 100.0,
614            last_cleanup_duration: Duration::ZERO,
615            cleanup_cycles: 0,
616            entries_removed: 0,
617            entries_scanned: 0,
618            max_cleanup_duration: Duration::ZERO,
619            total_cleanup_duration: Duration::ZERO,
620            aggressive_cleanups: 0,
621            was_aggressive: false,
622            last_raw_efficiency: 0.5,
623            efficiency_ema: 0.5,
624            prev_efficiency_ema: 0.5,
625            utilization_ema: 0.5,
626            prev_utilization_ema: 0.5,
627        }
628    }
629
630    /// Calculate efficiency trend based on EMA changes.
631    fn efficiency_trend(&self, min_cycles: u64) -> EfficiencyTrend {
632        if self.cleanup_cycles < min_cycles {
633            return EfficiencyTrend::Unknown;
634        }
635
636        let diff = self.efficiency_ema - self.prev_efficiency_ema;
637        const THRESHOLD: f64 = 0.02; // 2% change threshold
638
639        if diff > THRESHOLD {
640            EfficiencyTrend::Improving
641        } else if diff < -THRESHOLD {
642            EfficiencyTrend::Degrading
643        } else {
644            EfficiencyTrend::Stable
645        }
646    }
647
648    /// Calculate pressure trend based on utilization EMA changes.
649    fn pressure_trend(&self, min_cycles: u64) -> PressureTrend {
650        if self.cleanup_cycles < min_cycles {
651            return PressureTrend::Unknown;
652        }
653
654        let diff = self.utilization_ema - self.prev_utilization_ema;
655        const THRESHOLD: f64 = 0.02; // 2% change threshold
656
657        if diff > THRESHOLD {
658            PressureTrend::Increasing
659        } else if diff < -THRESHOLD {
660            PressureTrend::Decreasing
661        } else {
662            PressureTrend::Stable
663        }
664    }
665}
666
667/// Dynamic cleanup tuner that adjusts parameters based on load.
668#[derive(Debug)]
669pub struct CleanupTuner {
670    config: CleanupConfig,
671    /// Lock-free counter for message recording (no mutex needed).
672    rate_counter: LockFreeRateCounter,
673    /// State requiring synchronization.
674    state: Mutex<CleanupState>,
675}
676
677impl CleanupTuner {
678    /// Create a new cleanup tuner with the given configuration.
679    pub fn new(mut config: CleanupConfig) -> Self {
680        config.validate();
681        let window_size = config.rate_window;
682        Self {
683            config,
684            rate_counter: LockFreeRateCounter::new(),
685            state: Mutex::new(CleanupState::new(window_size)),
686        }
687    }
688
689    /// Create a tuner with default configuration.
690    pub fn with_defaults() -> Self {
691        Self::new(CleanupConfig::default())
692    }
693
694    /// Record a message being processed (fully lock-free, very fast).
695    ///
696    /// This method uses atomic operations only - no mutex is acquired.
697    ///
698    /// **For high-throughput scenarios (>10k msg/s)**: Use `record_messages()`
699    /// to batch updates every 50-200 messages. This improves cache locality
700    /// while maintaining the same lock-free performance.
701    #[inline]
702    pub fn record_message(&self) {
703        self.rate_counter.record();
704    }
705
706    /// Record multiple messages being processed (fully lock-free).
707    ///
708    /// This method uses atomic operations only - no mutex is acquired.
709    ///
710    /// **Recommended batching**:
711    /// - < 10k msg/s: batch every 10-50 messages
712    /// - 10k-50k msg/s: batch every 50-100 messages
713    /// - > 50k msg/s: batch every 100-200 messages
714    #[inline]
715    pub fn record_messages(&self, count: u64) {
716        self.rate_counter.record_batch(count);
717    }
718
719    /// Record a cleanup cycle completion.
720    ///
721    /// Call this after each cleanup cycle with the duration and entries removed.
722    ///
723    /// # Arguments
724    ///
725    /// * `duration` - How long the cleanup took
726    /// * `entries_removed` - Number of entries actually removed/cleaned up
727    /// * `params` - The parameters that were used for this cleanup (for tracking)
728    pub fn record_cleanup(
729        &self,
730        duration: Duration,
731        entries_removed: usize,
732        params: &CleanupParameters,
733    ) {
734        let mut state = self.state.lock();
735
736        // Calculate removal efficiency
737        let entries_scanned = params.batch_size;
738        let raw_efficiency = if entries_scanned > 0 {
739            (entries_removed as f64 / entries_scanned as f64).clamp(0.0, 1.0)
740        } else {
741            state.last_raw_efficiency // Use previous if no data
742        };
743
744        // Update efficiency EMA
745        state.prev_efficiency_ema = state.efficiency_ema;
746        state.efficiency_ema = self.config.efficiency_smoothing * raw_efficiency
747            + (1.0 - self.config.efficiency_smoothing) * state.efficiency_ema;
748        state.last_raw_efficiency = raw_efficiency;
749
750        // Update utilization EMA
751        state.prev_utilization_ema = state.utilization_ema;
752        state.utilization_ema = self.config.efficiency_smoothing * params.utilization
753            + (1.0 - self.config.efficiency_smoothing) * state.utilization_ema;
754
755        state.cleanup_cycles += 1;
756        state.entries_removed += entries_removed as u64;
757        state.entries_scanned += entries_scanned as u64;
758        state.last_cleanup_duration = duration;
759        state.total_cleanup_duration += duration;
760        state.was_aggressive = params.aggressive;
761
762        if duration > state.max_cleanup_duration {
763            state.max_cleanup_duration = duration;
764        }
765        if params.aggressive {
766            state.aggressive_cleanups += 1;
767        }
768    }
769
770    /// Get the current message rate (messages per second).
771    ///
772    /// This synchronizes the lock-free counter with the rate window state.
773    pub fn message_rate(&self) -> f64 {
774        let mut state = self.state.lock();
775        state
776            .rate_window
777            .sync_and_calculate_rate(&self.rate_counter)
778    }
779
780    /// Force reset the message rate window.
781    ///
782    /// **NOTE**: This is usually not needed as the window resets automatically.
783    /// Only call this if you need to explicitly reset statistics.
784    pub fn reset_rate_window(&self) {
785        let mut state = self.state.lock();
786        state.rate_window.reset(&self.rate_counter);
787    }
788
789    /// Get tuned cleanup parameters based on current state.
790    ///
791    /// # Arguments
792    ///
793    /// * `cache_utilization` - Current cache utilization (0.0-1.0)
794    /// * `current_ttl` - The TTL duration used for cache entries. Cleanup interval
795    ///   will be at least `current_ttl / ttl_divisor` to ensure entries are cleaned
796    ///   multiple times per TTL period. Default divisor is 4.
797    pub fn get_parameters(
798        &self,
799        cache_utilization: f64,
800        current_ttl: Duration,
801    ) -> CleanupParameters {
802        let mut state = self.state.lock();
803        let message_rate = state
804            .rate_window
805            .sync_and_calculate_rate(&self.rate_counter);
806
807        // Apply hysteresis to prevent threshold flapping
808        let high_threshold = if state.was_aggressive {
809            self.config.high_utilization_threshold - self.config.utilization_hysteresis
810        } else {
811            self.config.high_utilization_threshold + self.config.utilization_hysteresis
812        };
813
814        let low_threshold = if state.was_aggressive {
815            self.config.low_utilization_threshold + self.config.utilization_hysteresis
816        } else {
817            self.config.low_utilization_threshold - self.config.utilization_hysteresis
818        };
819
820        // Determine if we're under high load
821        let high_load = message_rate > self.config.high_load_message_rate;
822
823        // Determine backpressure level based on configurable thresholds
824        let backpressure = if cache_utilization >= self.config.block_new_threshold {
825            if high_load {
826                BackpressureHint::BlockNew
827            } else {
828                BackpressureHint::DropMost
829            }
830        } else if cache_utilization >= self.config.drop_most_threshold {
831            BackpressureHint::DropMost
832        } else if cache_utilization >= self.config.drop_some_threshold {
833            BackpressureHint::DropSome
834        } else {
835            BackpressureHint::None
836        };
837
838        // Determine cleanup reason and aggressiveness
839        let (reason, aggressive) = if backpressure != BackpressureHint::None {
840            (CleanupReason::CriticalUtilization { backpressure }, true)
841        } else if cache_utilization > high_threshold {
842            (CleanupReason::HighUtilization, true)
843        } else if cache_utilization < low_threshold {
844            (CleanupReason::LowUtilization, false)
845        } else {
846            (CleanupReason::ModerateUtilization, false)
847        };
848
849        // Calculate interval based on utilization and backpressure
850        let base_interval = match reason {
851            CleanupReason::CriticalUtilization { backpressure } => {
852                match backpressure {
853                    BackpressureHint::BlockNew | BackpressureHint::DropMost => {
854                        // Extreme pressure: very aggressive cleanup even under high load
855                        self.config.min_interval / 2
856                    }
857                    BackpressureHint::DropSome => self.config.min_interval,
858                    BackpressureHint::None => unreachable!(),
859                }
860            }
861            CleanupReason::HighUtilization => {
862                if high_load {
863                    // High load + high utilization: balance cleanup vs message processing
864                    self.config.min_interval * 2
865                } else {
866                    self.config.min_interval
867                }
868            }
869            CleanupReason::LowUtilization => {
870                // Low utilization: relaxed cleanup
871                self.config.max_interval.min(self.config.base_interval * 2)
872            }
873            CleanupReason::ModerateUtilization => {
874                // Moderate utilization: interpolate based on position in range
875                let range = high_threshold - low_threshold;
876                let position = ((cache_utilization - low_threshold) / range).clamp(0.0, 1.0);
877                let max_ms = self.config.max_interval.as_millis() as f64;
878                let base_ms = self.config.base_interval.as_millis() as f64;
879                let interval_ms = max_ms - (position * (max_ms - base_ms));
880                Duration::from_millis(interval_ms as u64)
881            }
882        };
883
884        // Calculate batch size adjustment
885        let batch_size = self.calculate_batch_size(&mut state);
886
887        // Apply TTL constraint with safety cap
888        let ttl_secs = current_ttl.as_secs_f64();
889        let min_ttl_interval = if ttl_secs > 0.0 && ttl_secs.is_finite() {
890            let raw_interval = ttl_secs / self.config.ttl_divisor;
891            // Cap TTL-based interval to prevent huge values
892            let max_ttl_interval =
893                self.config.max_interval.as_secs_f64() * self.config.max_ttl_interval_factor;
894            Duration::from_secs_f64(raw_interval.min(max_ttl_interval))
895        } else {
896            Duration::ZERO
897        };
898
899        let final_interval = base_interval
900            .max(min_ttl_interval)
901            .max(self.config.min_interval);
902
903        CleanupParameters {
904            interval: final_interval,
905            batch_size,
906            aggressive,
907            utilization: cache_utilization,
908            message_rate,
909            reason,
910        }
911    }
912
913    /// Calculate batch size with smoothing and trend-aware efficiency adjustment.
914    fn calculate_batch_size(&self, state: &mut CleanupState) -> usize {
915        let target_ms = self.config.target_cleanup_duration.as_millis() as f64;
916        let last_ms = state.last_cleanup_duration.as_millis().max(1) as f64;
917
918        // Calculate desired batch size based on duration
919        let duration_ratio = (target_ms / last_ms).clamp(0.5, 2.0);
920
921        // Use EMA efficiency for smoother adjustment (only if we have enough data)
922        let efficiency_factor = if state.cleanup_cycles >= self.config.min_cycles_for_efficiency {
923            self.calculate_efficiency_factor(state)
924        } else {
925            1.0 // Neutral until we have enough data
926        };
927
928        // Apply trend adjustment - if efficiency is improving, be less aggressive
929        // in increasing batch size; if degrading, be more willing to adjust
930        let trend_factor = match state.efficiency_trend(self.config.min_cycles_for_efficiency) {
931            EfficiencyTrend::Improving => 0.95, // Slightly dampen increases
932            EfficiencyTrend::Degrading => 1.05, // Slightly boost adjustments
933            EfficiencyTrend::Stable | EfficiencyTrend::Unknown => 1.0,
934        };
935
936        let desired_batch =
937            (state.target_batch_size * duration_ratio * efficiency_factor * trend_factor)
938                .clamp(10.0, 500.0);
939
940        // Apply smoothing to avoid jumpy behavior
941        let alpha = self.config.batch_size_smoothing;
942        state.target_batch_size = alpha * desired_batch + (1.0 - alpha) * state.target_batch_size;
943
944        // Safety: ensure batch_size doesn't get stuck at extreme values
945        state.target_batch_size = state.target_batch_size.clamp(10.0, 500.0);
946
947        state.target_batch_size.round() as usize
948    }
949
950    /// Calculate efficiency factor with smooth transitions (no sharp thresholds).
951    fn calculate_efficiency_factor(&self, state: &CleanupState) -> f64 {
952        let eff = state.efficiency_ema;
953
954        // Target efficiency range: 0.25 - 0.6
955        // Below 0.25: we're scanning too much for what we're removing → scan more
956        // Above 0.6: we're very efficient → can scan less
957        // Within range: linear interpolation
958
959        const LOW_EFF: f64 = 0.15;
960        const TARGET_LOW: f64 = 0.25;
961        const TARGET_HIGH: f64 = 0.6;
962        const HIGH_EFF: f64 = 0.85;
963
964        if eff < LOW_EFF {
965            // Very low efficiency - increase scanning significantly but smoothly
966            // Linear ramp from 1.3 at 0 to 1.15 at LOW_EFF
967            1.3 - (eff / LOW_EFF) * 0.15
968        } else if eff < TARGET_LOW {
969            // Below target - increase scanning moderately
970            // Linear ramp from 1.15 at LOW_EFF to 1.0 at TARGET_LOW
971            let t = (eff - LOW_EFF) / (TARGET_LOW - LOW_EFF);
972            1.15 - t * 0.15
973        } else if eff <= TARGET_HIGH {
974            // Within target range - neutral
975            1.0
976        } else if eff < HIGH_EFF {
977            // Above target - decrease scanning moderately
978            // Linear ramp from 1.0 at TARGET_HIGH to 0.85 at HIGH_EFF
979            let t = (eff - TARGET_HIGH) / (HIGH_EFF - TARGET_HIGH);
980            1.0 - t * 0.15
981        } else {
982            // Very high efficiency - decrease scanning more
983            // But cap at 0.8 to avoid too aggressive reduction
984            0.85 - (eff - HIGH_EFF).min(0.15) * 0.33
985        }
986        .clamp(0.8, 1.3)
987    }
988
989    /// Get cleanup statistics.
990    pub fn stats(&self) -> CleanupStats {
991        let mut state = self.state.lock();
992        let avg_duration = if state.cleanup_cycles > 0 {
993            state.total_cleanup_duration / state.cleanup_cycles as u32
994        } else {
995            Duration::ZERO
996        };
997
998        let avg_efficiency = if state.entries_scanned > 0 {
999            state.entries_removed as f64 / state.entries_scanned as f64
1000        } else {
1001            0.0
1002        };
1003
1004        let efficiency_trend = state.efficiency_trend(self.config.min_cycles_for_efficiency);
1005        let pressure_trend = state.pressure_trend(self.config.min_cycles_for_efficiency);
1006
1007        CleanupStats {
1008            cleanup_cycles: state.cleanup_cycles,
1009            entries_removed: state.entries_removed,
1010            entries_scanned: state.entries_scanned,
1011            avg_cleanup_duration: avg_duration,
1012            max_cleanup_duration: state.max_cleanup_duration,
1013            current_message_rate: state
1014                .rate_window
1015                .sync_and_calculate_rate(&self.rate_counter),
1016            aggressive_cleanups: state.aggressive_cleanups,
1017            avg_removal_efficiency: avg_efficiency,
1018            recent_efficiency: state.efficiency_ema,
1019            efficiency_trend,
1020            pressure_trend,
1021            recent_utilization: state.utilization_ema,
1022        }
1023    }
1024
1025    /// Reset all statistics.
1026    pub fn reset_stats(&self) {
1027        let mut state = self.state.lock();
1028        state.cleanup_cycles = 0;
1029        state.entries_removed = 0;
1030        state.entries_scanned = 0;
1031        state.max_cleanup_duration = Duration::ZERO;
1032        state.total_cleanup_duration = Duration::ZERO;
1033        state.aggressive_cleanups = 0;
1034        state.last_raw_efficiency = 0.5;
1035        state.efficiency_ema = 0.5;
1036        state.prev_efficiency_ema = 0.5;
1037        state.utilization_ema = 0.5;
1038        state.prev_utilization_ema = 0.5;
1039    }
1040
1041    /// Get the configuration.
1042    pub fn config(&self) -> &CleanupConfig {
1043        &self.config
1044    }
1045}
1046
1047impl Default for CleanupTuner {
1048    fn default() -> Self {
1049        Self::with_defaults()
1050    }
1051}
1052
1053impl Clone for CleanupTuner {
1054    fn clone(&self) -> Self {
1055        // Create new tuner with same config but fresh state
1056        Self::new(self.config.clone())
1057    }
1058}
1059
1060#[cfg(test)]
1061mod tests {
1062    use super::*;
1063    use std::thread;
1064
1065    #[test]
1066    fn test_cleanup_config_defaults() {
1067        let config = CleanupConfig::default();
1068        assert_eq!(config.base_interval, Duration::from_secs(30));
1069        assert_eq!(config.min_interval, Duration::from_secs(5));
1070        assert_eq!(config.max_interval, Duration::from_secs(120));
1071        assert_eq!(config.ttl_divisor, 4.0);
1072    }
1073
1074    #[test]
1075    fn test_cleanup_config_presets() {
1076        let high = CleanupConfig::high_throughput();
1077        assert!(high.min_interval < CleanupConfig::default().min_interval);
1078
1079        let low = CleanupConfig::low_latency();
1080        assert!(low.target_cleanup_duration < CleanupConfig::default().target_cleanup_duration);
1081    }
1082
1083    #[test]
1084    fn test_cleanup_config_validation() {
1085        let mut config = CleanupConfig::default();
1086        config.ttl_divisor = 0.5; // Too low
1087        config.validate();
1088        assert!(config.ttl_divisor >= 2.0);
1089
1090        config.ttl_divisor = 100.0; // Too high
1091        config.validate();
1092        assert!(config.ttl_divisor <= 20.0);
1093    }
1094
1095    #[test]
1096    fn test_cleanup_parameters_low_utilization() {
1097        let tuner = CleanupTuner::with_defaults();
1098        let params = tuner.get_parameters(0.1, Duration::from_secs(300));
1099
1100        assert!(!params.aggressive);
1101        assert!(params.interval >= tuner.config().base_interval);
1102        assert_eq!(params.reason, CleanupReason::LowUtilization);
1103    }
1104
1105    #[test]
1106    fn test_cleanup_parameters_high_utilization() {
1107        let tuner = CleanupTuner::with_defaults();
1108        // Use a shorter TTL so TTL constraint doesn't dominate
1109        let params = tuner.get_parameters(0.9, Duration::from_secs(60));
1110
1111        assert!(params.aggressive);
1112        // With TTL=60s and divisor=4, min_ttl_interval=15s, which is >= min_interval
1113        // so we check it's reasonable (less than base or at TTL constraint)
1114        assert!(params.interval <= tuner.config().base_interval.max(Duration::from_secs(15)));
1115        assert_eq!(params.reason, CleanupReason::HighUtilization);
1116    }
1117
1118    #[test]
1119    fn test_cleanup_parameters_moderate_utilization() {
1120        let tuner = CleanupTuner::with_defaults();
1121        let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1122
1123        assert!(!params.aggressive);
1124        assert_eq!(params.reason, CleanupReason::ModerateUtilization);
1125    }
1126
1127    #[test]
1128    fn test_cleanup_parameters_critical_with_backpressure() {
1129        let tuner = CleanupTuner::with_defaults();
1130
1131        // Critical utilization at DropSome level
1132        let params = tuner.get_parameters(0.94, Duration::from_secs(300));
1133        assert!(params.should_cleanup_now());
1134        assert_eq!(params.backpressure_hint(), BackpressureHint::DropSome);
1135
1136        // Critical utilization at DropMost level
1137        let params = tuner.get_parameters(0.97, Duration::from_secs(300));
1138        assert_eq!(params.backpressure_hint(), BackpressureHint::DropMost);
1139    }
1140
1141    #[test]
1142    fn test_hysteresis_prevents_flapping() {
1143        let tuner = CleanupTuner::with_defaults();
1144
1145        // Cross threshold from below
1146        let params1 = tuner.get_parameters(0.79, Duration::from_secs(300));
1147        assert!(!params1.aggressive);
1148
1149        // Record cleanup to set state
1150        tuner.record_cleanup(Duration::from_millis(10), 50, &params1);
1151
1152        // Slightly above threshold
1153        let params2 = tuner.get_parameters(0.81, Duration::from_secs(300));
1154        // Should still not be aggressive due to hysteresis
1155        assert!(!params2.aggressive);
1156
1157        // Well above threshold
1158        let params3 = tuner.get_parameters(0.85, Duration::from_secs(300));
1159        assert!(params3.aggressive);
1160    }
1161
1162    #[test]
1163    fn test_message_rate_tracking() {
1164        let tuner = CleanupTuner::with_defaults();
1165
1166        // Record messages
1167        tuner.record_messages(100);
1168        thread::sleep(Duration::from_millis(10));
1169
1170        let rate = tuner.message_rate();
1171        assert!(rate > 0.0);
1172    }
1173
1174    #[test]
1175    fn test_cleanup_stats() {
1176        let tuner = CleanupTuner::with_defaults();
1177        let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1178
1179        tuner.record_cleanup(Duration::from_millis(20), 50, &params);
1180        tuner.record_cleanup(Duration::from_millis(30), 75, &params);
1181
1182        let stats = tuner.stats();
1183        assert_eq!(stats.cleanup_cycles, 2);
1184        assert_eq!(stats.entries_removed, 125);
1185    }
1186
1187    #[test]
1188    fn test_reset_stats() {
1189        let tuner = CleanupTuner::with_defaults();
1190        let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1191
1192        tuner.record_cleanup(Duration::from_millis(20), 50, &params);
1193        tuner.reset_stats();
1194
1195        let stats = tuner.stats();
1196        assert_eq!(stats.cleanup_cycles, 0);
1197        assert_eq!(stats.entries_removed, 0);
1198    }
1199
1200    #[test]
1201    fn test_batch_size_adjustment() {
1202        let tuner = CleanupTuner::with_defaults();
1203        let params_initial = tuner.get_parameters(0.5, Duration::from_secs(300));
1204
1205        // Record several slow cleanups
1206        for _ in 0..5 {
1207            let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1208            tuner.record_cleanup(Duration::from_millis(200), 100, &params);
1209        }
1210
1211        let params_final = tuner.get_parameters(0.5, Duration::from_secs(300));
1212
1213        // Should decrease batch size due to slow cleanups
1214        assert!(params_final.batch_size < params_initial.batch_size);
1215        assert!(params_final.batch_size >= 10); // Respects minimum
1216    }
1217
1218    #[test]
1219    fn test_should_cleanup_now() {
1220        let tuner = CleanupTuner::with_defaults();
1221
1222        // Not critical
1223        let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1224        assert!(!params.should_cleanup_now());
1225
1226        // Critical
1227        let params = tuner.get_parameters(0.95, Duration::from_secs(300));
1228        assert!(params.should_cleanup_now());
1229    }
1230
1231    #[test]
1232    fn test_lock_free_recording() {
1233        let tuner = CleanupTuner::with_defaults();
1234
1235        // These should be fully lock-free
1236        for _ in 0..1000 {
1237            tuner.record_message();
1238        }
1239
1240        tuner.record_messages(500);
1241
1242        thread::sleep(Duration::from_millis(10));
1243        let rate = tuner.message_rate();
1244        assert!(rate > 0.0);
1245    }
1246
1247    #[test]
1248    fn test_efficiency_trend_tracking() {
1249        // Use a config with higher smoothing for more responsive trends
1250        let config = CleanupConfig::default().with_efficiency_smoothing(0.4); // More responsive
1251        let tuner = CleanupTuner::new(config);
1252
1253        // Record enough cleanups to have valid trend data
1254        for _ in 0..5 {
1255            let params = tuner.get_parameters(0.5, Duration::from_secs(60));
1256            // Use batch_size from params for accurate efficiency calculation
1257            let batch = params.batch_size;
1258            // 50% efficiency (half of batch removed)
1259            tuner.record_cleanup(Duration::from_millis(20), batch / 2, &params);
1260        }
1261
1262        let stats = tuner.stats();
1263        // After enough cycles, trend should be determined (not Unknown)
1264        assert_ne!(
1265            stats.efficiency_trend,
1266            EfficiencyTrend::Unknown,
1267            "Trend should be known after {} cycles (min_cycles={})",
1268            stats.cleanup_cycles,
1269            tuner.config().min_cycles_for_efficiency
1270        );
1271        // Recent efficiency should be reasonable (around 0.5)
1272        assert!(
1273            stats.recent_efficiency > 0.3 && stats.recent_efficiency < 0.7,
1274            "Recent efficiency {} should be around 0.5",
1275            stats.recent_efficiency
1276        );
1277    }
1278
1279    #[test]
1280    fn test_pressure_trend_tracking() {
1281        let tuner = CleanupTuner::with_defaults();
1282
1283        // Record cleanups with increasing utilization
1284        for i in 0..5 {
1285            let util = 0.4 + (i as f64 * 0.1);
1286            let params = tuner.get_parameters(util, Duration::from_secs(300));
1287            tuner.record_cleanup(Duration::from_millis(20), 50, &params);
1288        }
1289
1290        let stats = tuner.stats();
1291        assert_eq!(stats.pressure_trend, PressureTrend::Increasing);
1292    }
1293
1294    #[test]
1295    fn test_huge_ttl_safety() {
1296        let tuner = CleanupTuner::with_defaults();
1297
1298        // Very large TTL should not cause huge intervals
1299        let params = tuner.get_parameters(0.5, Duration::from_secs(86400 * 30)); // 30 days
1300
1301        // Should be capped by max_interval * max_ttl_interval_factor
1302        assert!(
1303            params.interval
1304                <= tuner
1305                    .config()
1306                    .max_interval
1307                    .mul_f64(tuner.config().max_ttl_interval_factor + 0.1)
1308        );
1309    }
1310
1311    #[test]
1312    fn test_zero_efficiency_startup() {
1313        let tuner = CleanupTuner::with_defaults();
1314
1315        // First few cleanups with 0 efficiency should not cause wild adjustments
1316        for _ in 0..3 {
1317            let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1318            tuner.record_cleanup(Duration::from_millis(20), 0, &params);
1319        }
1320
1321        let params = tuner.get_parameters(0.5, Duration::from_secs(300));
1322
1323        // Batch size should still be reasonable
1324        assert!(params.batch_size >= 10);
1325        assert!(params.batch_size <= 500);
1326    }
1327
1328    #[test]
1329    fn test_efficiency_factor_smooth_transitions() {
1330        let tuner = CleanupTuner::with_defaults();
1331        let mut state = CleanupState::new(Duration::from_secs(10));
1332
1333        // Test various efficiency levels
1334        let test_cases = [
1335            (0.05, 1.25..1.35), // Very low efficiency → increase scan
1336            (0.15, 1.10..1.20), // Low efficiency
1337            (0.25, 0.95..1.05), // At target low
1338            (0.40, 0.95..1.05), // Mid target range
1339            (0.60, 0.95..1.05), // At target high
1340            (0.75, 0.85..0.95), // Above target
1341            (0.90, 0.78..0.88), // Very high efficiency → decrease scan
1342        ];
1343
1344        for (eff, expected_range) in test_cases {
1345            state.efficiency_ema = eff;
1346            state.cleanup_cycles = 10; // Ensure we have enough data
1347            let factor = tuner.calculate_efficiency_factor(&state);
1348            assert!(
1349                factor >= expected_range.start && factor <= expected_range.end,
1350                "Efficiency {}: factor {} not in range {:?}",
1351                eff,
1352                factor,
1353                expected_range
1354            );
1355        }
1356    }
1357
1358    #[test]
1359    fn test_stats_includes_derived_metrics() {
1360        let tuner = CleanupTuner::with_defaults();
1361
1362        for i in 0..5 {
1363            let params = tuner.get_parameters(0.5 + (i as f64 * 0.05), Duration::from_secs(300));
1364            tuner.record_cleanup(Duration::from_millis(20), 50, &params);
1365        }
1366
1367        let stats = tuner.stats();
1368
1369        // Check all derived metrics are present
1370        assert!(stats.recent_efficiency > 0.0);
1371        assert!(stats.recent_utilization > 0.0);
1372        assert!(stats.entries_scanned > 0);
1373        // Trend should be known after enough cycles
1374        assert_ne!(stats.efficiency_trend, EfficiencyTrend::Unknown);
1375    }
1376
1377    #[test]
1378    fn test_concurrent_recording() {
1379        use std::sync::Arc;
1380
1381        let tuner = Arc::new(CleanupTuner::with_defaults());
1382        let mut handles = vec![];
1383
1384        // Spawn multiple threads doing concurrent recording
1385        for _ in 0..4 {
1386            let tuner_clone = Arc::clone(&tuner);
1387            let handle = thread::spawn(move || {
1388                for _ in 0..1000 {
1389                    tuner_clone.record_message();
1390                }
1391                tuner_clone.record_messages(500);
1392            });
1393            handles.push(handle);
1394        }
1395
1396        // Wait for all threads
1397        for handle in handles {
1398            handle.join().unwrap();
1399        }
1400
1401        thread::sleep(Duration::from_millis(10));
1402        let rate = tuner.message_rate();
1403        assert!(rate > 0.0);
1404
1405        // Total should be ~5500 per thread * 4 threads = 22000
1406        // Rate will vary based on timing
1407    }
1408
1409    #[test]
1410    fn test_rate_tracker_window_rollover() {
1411        let counter = LockFreeRateCounter::new();
1412        let mut state = RateWindowState::new(Duration::from_millis(100));
1413
1414        // Add some messages
1415        counter.record_batch(50);
1416        thread::sleep(Duration::from_millis(120));
1417
1418        // Should have rolled over
1419        let rate = state.sync_and_calculate_rate(&counter);
1420        assert!(rate > 0.0);
1421
1422        // Previous window should be preserved
1423        counter.record_batch(50);
1424        let rate2 = state.sync_and_calculate_rate(&counter);
1425        assert!(rate2 > 0.0);
1426    }
1427
1428    #[test]
1429    fn test_extreme_pressure_backpressure() {
1430        let config = CleanupConfig::default();
1431        let tuner = CleanupTuner::new(config);
1432
1433        // Simulate high load
1434        tuner.record_messages(10000);
1435        thread::sleep(Duration::from_millis(10));
1436
1437        // Very high utilization under high load - use short TTL to avoid TTL constraint
1438        let params = tuner.get_parameters(0.98, Duration::from_secs(10));
1439
1440        // Should recommend blocking new messages
1441        assert_eq!(params.backpressure_hint(), BackpressureHint::BlockNew);
1442
1443        // Should still do aggressive cleanup despite high load
1444        // With TTL=10s and divisor=4, min_ttl_interval=2.5s which is less than min_interval=5s
1445        // So the critical interval (min_interval/2 = 2.5s) should be used
1446        assert!(params.interval <= tuner.config().min_interval);
1447    }
1448
1449    #[test]
1450    fn test_builder_pattern() {
1451        let config = CleanupConfig::default()
1452            .with_base_interval(Duration::from_secs(45))
1453            .with_min_interval(Duration::from_secs(10))
1454            .with_max_interval(Duration::from_secs(180))
1455            .with_ttl_divisor(6.0)
1456            .with_efficiency_smoothing(0.25);
1457
1458        assert_eq!(config.base_interval, Duration::from_secs(45));
1459        assert_eq!(config.min_interval, Duration::from_secs(10));
1460        assert_eq!(config.max_interval, Duration::from_secs(180));
1461        assert_eq!(config.ttl_divisor, 6.0);
1462        assert_eq!(config.efficiency_smoothing, 0.25);
1463    }
1464}