metrics_lib/
adaptive.rs

1//! Adaptive sampling and backpressure mechanisms
2//!
3//! Provides automatic load shedding and sampling to maintain performance under pressure
4
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
6use std::time::{Duration, Instant};
7
8/// Adaptive sampling strategy
9#[derive(Debug, Clone, Copy)]
10pub enum SamplingStrategy {
11    /// Fixed rate sampling (1 in N)
12    ///
13    /// # Fields
14    /// - `rate`: The fixed sampling rate, where 1 in `rate` samples are taken.
15    Fixed {
16        /// The fixed sampling rate, where 1 in `rate` samples are taken.
17        rate: u32,
18    },
19    /// Dynamic rate based on load
20    ///
21    /// # Fields
22    /// - `min_rate`: The minimum sampling rate.
23    /// - `max_rate`: The maximum sampling rate.
24    /// - `target_throughput`: The target throughput to maintain.
25    Dynamic {
26        /// The minimum sampling rate.
27        min_rate: u32,
28        /// The maximum sampling rate.
29        max_rate: u32,
30        /// The target throughput to maintain.
31        target_throughput: u64,
32    },
33    /// Time-based sampling
34    ///
35    /// # Fields
36    /// - `min_interval`: The minimum interval between samples.
37    TimeBased {
38        /// Duration in nanoseconds
39        min_interval: u64,
40    },
41}
42
43/// Adaptive sampler for load shedding
44pub struct AdaptiveSampler {
45    strategy: SamplingStrategy,
46    current_rate: AtomicU32,
47    samples_taken: AtomicU64,
48    samples_dropped: AtomicU64,
49    last_adjustment: parking_lot::Mutex<Instant>,
50    overloaded: AtomicBool,
51}
52
53impl AdaptiveSampler {
54    /// Create new sampler with strategy
55    pub fn new(strategy: SamplingStrategy) -> Self {
56        let initial_rate = match strategy {
57            SamplingStrategy::Fixed { rate } => rate,
58            SamplingStrategy::Dynamic { min_rate, .. } => min_rate,
59            SamplingStrategy::TimeBased { .. } => 1,
60        };
61
62        Self {
63            strategy,
64            current_rate: AtomicU32::new(initial_rate),
65            samples_taken: AtomicU64::new(0),
66            samples_dropped: AtomicU64::new(0),
67            last_adjustment: parking_lot::Mutex::new(Instant::now()),
68            overloaded: AtomicBool::new(false),
69        }
70    }
71
72    /// Check if sample should be taken
73    #[inline]
74    pub fn should_sample(&self) -> bool {
75        match self.strategy {
76            SamplingStrategy::Fixed { .. } => self.should_sample_fixed(),
77            SamplingStrategy::Dynamic { .. } => self.should_sample_dynamic(),
78            SamplingStrategy::TimeBased { min_interval } => {
79                self.should_sample_time_based(Duration::from_nanos(min_interval))
80            }
81        }
82    }
83
84    #[inline]
85    fn should_sample_fixed(&self) -> bool {
86        let rate = self.current_rate.load(Ordering::Relaxed);
87        if rate == 1 {
88            self.samples_taken.fetch_add(1, Ordering::Relaxed);
89            return true;
90        }
91
92        // Fast thread-local random
93        let should_sample = fastrand::u32(1..=rate) == 1;
94
95        if should_sample {
96            self.samples_taken.fetch_add(1, Ordering::Relaxed);
97        } else {
98            self.samples_dropped.fetch_add(1, Ordering::Relaxed);
99        }
100
101        should_sample
102    }
103
104    fn should_sample_dynamic(&self) -> bool {
105        // Check if we need to adjust rate
106        let mut last_adjustment = self.last_adjustment.lock();
107        let now = Instant::now();
108
109        if now.duration_since(*last_adjustment) > Duration::from_secs(1) {
110            self.adjust_dynamic_rate();
111            *last_adjustment = now;
112        }
113        drop(last_adjustment);
114
115        self.should_sample_fixed()
116    }
117
118    fn should_sample_time_based(&self, min_interval: Duration) -> bool {
119        thread_local! {
120            static LAST_SAMPLE: std::cell::RefCell<Option<Instant>> = const { std::cell::RefCell::new(None) };
121        }
122
123        LAST_SAMPLE.with(|last| {
124            let mut last = last.borrow_mut();
125            let now = Instant::now();
126
127            match *last {
128                Some(last_time) if now.duration_since(last_time) < min_interval => {
129                    self.samples_dropped.fetch_add(1, Ordering::Relaxed);
130                    false
131                }
132                _ => {
133                    *last = Some(now);
134                    self.samples_taken.fetch_add(1, Ordering::Relaxed);
135                    true
136                }
137            }
138        })
139    }
140
141    fn adjust_dynamic_rate(&self) {
142        if let SamplingStrategy::Dynamic {
143            min_rate,
144            max_rate,
145            target_throughput,
146        } = self.strategy
147        {
148            let taken = self.samples_taken.load(Ordering::Relaxed);
149            let current_rate = self.current_rate.load(Ordering::Relaxed);
150
151            let new_rate = if taken > target_throughput {
152                // Increase sampling rate (sample less)
153                (current_rate * 2).min(max_rate)
154            } else if taken < target_throughput / 2 {
155                // Decrease sampling rate (sample more)
156                (current_rate / 2).max(min_rate)
157            } else {
158                current_rate
159            };
160
161            if new_rate != current_rate {
162                self.current_rate.store(new_rate, Ordering::Relaxed);
163                self.overloaded
164                    .store(new_rate > min_rate * 2, Ordering::Relaxed);
165            }
166
167            // Reset counters
168            self.samples_taken.store(0, Ordering::Relaxed);
169            self.samples_dropped.store(0, Ordering::Relaxed);
170        }
171    }
172
173    /// Get current sampling rate
174    #[inline]
175    pub fn current_rate(&self) -> u32 {
176        self.current_rate.load(Ordering::Relaxed)
177    }
178
179    /// Check if system is overloaded
180    #[inline]
181    pub fn is_overloaded(&self) -> bool {
182        self.overloaded.load(Ordering::Relaxed)
183    }
184
185    /// Get sampling statistics
186    pub fn stats(&self) -> SamplingStats {
187        SamplingStats {
188            samples_taken: self.samples_taken.load(Ordering::Relaxed),
189            samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
190            current_rate: self.current_rate.load(Ordering::Relaxed),
191            is_overloaded: self.is_overloaded(),
192        }
193    }
194}
195
196/// Sampling statistics
197#[derive(Debug, Clone)]
198pub struct SamplingStats {
199    pub samples_taken: u64,
200    pub samples_dropped: u64,
201    pub current_rate: u32,
202    pub is_overloaded: bool,
203}
204
205impl SamplingStats {
206    /// Calculate sampling percentage
207    pub fn sampling_percentage(&self) -> f64 {
208        let total = self.samples_taken + self.samples_dropped;
209        if total == 0 {
210            100.0
211        } else {
212            (self.samples_taken as f64 / total as f64) * 100.0
213        }
214    }
215}
216
217/// Circuit breaker for metric recording
218pub struct MetricCircuitBreaker {
219    state: AtomicU32,
220    failures: AtomicU64,
221    successes: AtomicU64,
222    last_state_change: parking_lot::Mutex<Instant>,
223    config: CircuitBreakerConfig,
224}
225
226#[derive(Debug, Clone)]
227pub struct CircuitBreakerConfig {
228    pub failure_threshold: u64,
229    pub success_threshold: u64,
230    pub timeout: Duration,
231    pub half_open_max_calls: u64,
232}
233
234impl Default for CircuitBreakerConfig {
235    fn default() -> Self {
236        Self {
237            failure_threshold: 5,
238            success_threshold: 3,
239            timeout: Duration::from_secs(30),
240            half_open_max_calls: 10,
241        }
242    }
243}
244
245#[repr(u32)]
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247enum CircuitState {
248    Closed = 0,
249    Open = 1,
250    HalfOpen = 2,
251}
252
253impl MetricCircuitBreaker {
254    /// Create new circuit breaker
255    pub fn new(config: CircuitBreakerConfig) -> Self {
256        Self {
257            state: AtomicU32::new(CircuitState::Closed as u32),
258            failures: AtomicU64::new(0),
259            successes: AtomicU64::new(0),
260            last_state_change: parking_lot::Mutex::new(Instant::now()),
261            config,
262        }
263    }
264
265    /// Check if operation is allowed
266    #[inline]
267    pub fn is_allowed(&self) -> bool {
268        let state = self.get_state();
269
270        match state {
271            CircuitState::Closed => true,
272            CircuitState::Open => {
273                // Check if timeout has passed
274                let last_change = *self.last_state_change.lock();
275                if Instant::now().duration_since(last_change) > self.config.timeout {
276                    self.transition_to(CircuitState::HalfOpen);
277                    true
278                } else {
279                    false
280                }
281            }
282            CircuitState::HalfOpen => {
283                // Allow limited calls
284                let calls =
285                    self.successes.load(Ordering::Relaxed) + self.failures.load(Ordering::Relaxed);
286                calls < self.config.half_open_max_calls
287            }
288        }
289    }
290
291    /// Record success
292    #[inline]
293    pub fn record_success(&self) {
294        let state = self.get_state();
295
296        match state {
297            CircuitState::Closed => {
298                self.failures.store(0, Ordering::Relaxed);
299            }
300            CircuitState::HalfOpen => {
301                let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
302                if successes >= self.config.success_threshold {
303                    self.transition_to(CircuitState::Closed);
304                }
305            }
306            CircuitState::Open => {} // Shouldn't happen
307        }
308    }
309
310    /// Record failure
311    #[inline]
312    pub fn record_failure(&self) {
313        let state = self.get_state();
314
315        match state {
316            CircuitState::Closed => {
317                let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
318                if failures >= self.config.failure_threshold {
319                    self.transition_to(CircuitState::Open);
320                }
321            }
322            CircuitState::HalfOpen => {
323                self.transition_to(CircuitState::Open);
324            }
325            CircuitState::Open => {} // Already open
326        }
327    }
328
329    #[inline]
330    fn get_state(&self) -> CircuitState {
331        match self.state.load(Ordering::Relaxed) {
332            0 => CircuitState::Closed,
333            1 => CircuitState::Open,
334            2 => CircuitState::HalfOpen,
335            _ => unreachable!(),
336        }
337    }
338
339    fn transition_to(&self, new_state: CircuitState) {
340        self.state.store(new_state as u32, Ordering::Relaxed);
341        self.failures.store(0, Ordering::Relaxed);
342        self.successes.store(0, Ordering::Relaxed);
343        *self.last_state_change.lock() = Instant::now();
344    }
345}
346
347/// Backpressure controller
348pub struct BackpressureController {
349    max_pending: usize,
350    pending: AtomicU64,
351    rejected: AtomicU64,
352}
353
354impl BackpressureController {
355    /// Create new controller
356    pub fn new(max_pending: usize) -> Self {
357        Self {
358            max_pending,
359            pending: AtomicU64::new(0),
360            rejected: AtomicU64::new(0),
361        }
362    }
363
364    /// Try to acquire slot
365    #[inline]
366    pub fn try_acquire(&self) -> Option<BackpressureGuard<'_>> {
367        let pending = self.pending.fetch_add(1, Ordering::Relaxed);
368
369        if pending >= self.max_pending as u64 {
370            self.pending.fetch_sub(1, Ordering::Relaxed);
371            self.rejected.fetch_add(1, Ordering::Relaxed);
372            None
373        } else {
374            Some(BackpressureGuard { controller: self })
375        }
376    }
377
378    /// Get current pending count
379    #[inline]
380    pub fn pending_count(&self) -> u64 {
381        self.pending.load(Ordering::Relaxed)
382    }
383
384    /// Get rejected count
385    #[inline]
386    pub fn rejected_count(&self) -> u64 {
387        self.rejected.load(Ordering::Relaxed)
388    }
389}
390
391/// RAII guard for backpressure
392pub struct BackpressureGuard<'a> {
393    controller: &'a BackpressureController,
394}
395
396impl<'a> Drop for BackpressureGuard<'a> {
397    #[inline]
398    fn drop(&mut self) {
399        self.controller.pending.fetch_sub(1, Ordering::Relaxed);
400    }
401}
402
403// Re-export for convenience
404pub mod fastrand {
405    #[inline]
406    pub fn u32(range: std::ops::RangeInclusive<u32>) -> u32 {
407        let start = *range.start();
408        let end = *range.end();
409        if start == end {
410            return start;
411        }
412
413        // Fast thread-local RNG using xorshift
414        thread_local! {
415            static RNG: std::cell::Cell<u32> = std::cell::Cell::new({
416                let mut hasher = std::collections::hash_map::DefaultHasher::new();
417                std::hash::Hash::hash(&std::thread::current().id(), &mut hasher);
418                std::hash::Hasher::finish(&hasher) as u32 | 1
419            });
420        }
421
422        RNG.with(|rng| {
423            let mut x = rng.get();
424            x ^= x << 13;
425            x ^= x >> 17;
426            x ^= x << 5;
427            rng.set(x);
428            start + (x % (end - start + 1))
429        })
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn test_fixed_sampling() {
439        let sampler = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
440
441        let mut sampled = 0;
442        for _ in 0..1000 {
443            if sampler.should_sample() {
444                sampled += 1;
445            }
446        }
447
448        // Should be approximately 100 (10%)
449        assert!(sampled > 50 && sampled < 150);
450    }
451
452    #[test]
453    fn test_circuit_breaker() {
454        let breaker = MetricCircuitBreaker::new(CircuitBreakerConfig {
455            failure_threshold: 3,
456            success_threshold: 2,
457            timeout: Duration::from_millis(100),
458            half_open_max_calls: 5,
459        });
460
461        // Initially closed
462        assert!(breaker.is_allowed());
463
464        // Record failures to open
465        for _ in 0..3 {
466            breaker.record_failure();
467        }
468
469        // Should be open
470        assert!(!breaker.is_allowed());
471
472        // Wait for timeout
473        std::thread::sleep(Duration::from_millis(150));
474
475        // Should transition to half-open
476        assert!(breaker.is_allowed());
477
478        // Record successes to close
479        breaker.record_success();
480        breaker.record_success();
481
482        // Should be closed again
483        assert!(breaker.is_allowed());
484    }
485
486    #[test]
487    fn test_backpressure() {
488        let controller = BackpressureController::new(5);
489
490        let mut guards = Vec::new();
491
492        // Acquire up to limit
493        for _ in 0..5 {
494            guards.push(controller.try_acquire().unwrap());
495        }
496
497        // Next should fail
498        assert!(controller.try_acquire().is_none());
499        assert_eq!(controller.rejected_count(), 1);
500
501        // Drop one guard
502        guards.pop();
503
504        // Should be able to acquire again
505        assert!(controller.try_acquire().is_some());
506    }
507
508    #[test]
509    fn test_time_based_sampling_interval() {
510        // Min interval 5ms
511        let sampler = AdaptiveSampler::new(SamplingStrategy::TimeBased {
512            min_interval: 5_000_000,
513        });
514
515        // First call should sample (no prior sample)
516        assert!(sampler.should_sample());
517
518        // Immediate second call should be dropped due to interval
519        assert!(!sampler.should_sample());
520
521        // After waiting for interval, sampling allowed again
522        std::thread::sleep(Duration::from_millis(6));
523        assert!(sampler.should_sample());
524    }
525
526    #[test]
527    fn test_sampling_stats_percentage() {
528        // When no samples have occurred yet, percentage is 100%
529        let sampler_zero = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
530        let stats_zero = sampler_zero.stats();
531        assert_eq!(stats_zero.samples_taken, 0);
532        assert_eq!(stats_zero.samples_dropped, 0);
533        assert!((stats_zero.sampling_percentage() - 100.0).abs() < f64::EPSILON);
534
535        // Use time-based strategy to deterministically create 1 taken and 1 dropped
536        let sampler = AdaptiveSampler::new(SamplingStrategy::TimeBased {
537            min_interval: 5_000_000,
538        });
539
540        assert!(sampler.should_sample()); // taken += 1
541        assert!(!sampler.should_sample()); // dropped += 1 (interval not elapsed)
542
543        let stats = sampler.stats();
544        assert_eq!(stats.samples_taken, 1);
545        assert_eq!(stats.samples_dropped, 1);
546        assert!((stats.sampling_percentage() - 50.0).abs() < 0.0001);
547    }
548}