metrics_lib/
adaptive.rs

1//! Adaptive sampling and backpressure mechanisms
2//! 
3//! Provides automatic load shedding and sampling to maintain performance under pressure
4
5use std::sync::atomic::{AtomicU32, AtomicU64, AtomicBool, Ordering};
6use std::time::{Duration, Instant};
7
8/// Adaptive sampling strategy
9#[derive(Debug, Clone, Copy)]
10pub enum SamplingStrategy {
11    /// Fixed rate sampling (1 in N)
12    /// 
13    /// # Fields
14    /// - `rate`: The fixed sampling rate, where 1 in `rate` samples are taken.
15    Fixed {
16        /// The fixed sampling rate, where 1 in `rate` samples are taken.
17        rate: u32 
18    },
19    /// Dynamic rate based on load
20    /// 
21    /// # Fields
22    /// - `min_rate`: The minimum sampling rate.
23    /// - `max_rate`: The maximum sampling rate.
24    /// - `target_throughput`: The target throughput to maintain.
25    Dynamic { 
26        /// The minimum sampling rate.
27        min_rate: u32,
28        /// The maximum sampling rate.
29        max_rate: u32,
30        /// The target throughput to maintain.
31        target_throughput: u64,
32    },
33    /// Time-based sampling
34    /// 
35    /// # Fields
36    /// - `min_interval`: The minimum interval between samples.
37    TimeBased { 
38        /// Duration in nanoseconds
39        min_interval: u64,
40    },
41}
42
43/// Adaptive sampler for load shedding
44pub struct AdaptiveSampler {
45    strategy: SamplingStrategy,
46    current_rate: AtomicU32,
47    samples_taken: AtomicU64,
48    samples_dropped: AtomicU64,
49    last_adjustment: parking_lot::Mutex<Instant>,
50    overloaded: AtomicBool,
51}
52
53impl AdaptiveSampler {
54    /// Create new sampler with strategy
55    pub fn new(strategy: SamplingStrategy) -> Self {
56        let initial_rate = match strategy {
57            SamplingStrategy::Fixed { rate } => rate,
58            SamplingStrategy::Dynamic { min_rate, .. } => min_rate,
59            SamplingStrategy::TimeBased { .. } => 1,
60        };
61        
62        Self {
63            strategy,
64            current_rate: AtomicU32::new(initial_rate),
65            samples_taken: AtomicU64::new(0),
66            samples_dropped: AtomicU64::new(0),
67            last_adjustment: parking_lot::Mutex::new(Instant::now()),
68            overloaded: AtomicBool::new(false),
69        }
70    }
71    
72    /// Check if sample should be taken
73    #[inline]
74    pub fn should_sample(&self) -> bool {
75        match self.strategy {
76            SamplingStrategy::Fixed { .. } => {
77                self.should_sample_fixed()
78            }
79            SamplingStrategy::Dynamic { .. } => {
80                self.should_sample_dynamic()
81            }
82            SamplingStrategy::TimeBased { min_interval } => {
83                self.should_sample_time_based(Duration::from_nanos(min_interval))
84            }
85        }
86    }
87    
88    #[inline]
89    fn should_sample_fixed(&self) -> bool {
90        let rate = self.current_rate.load(Ordering::Relaxed);
91        if rate == 1 {
92            self.samples_taken.fetch_add(1, Ordering::Relaxed);
93            return true;
94        }
95        
96        // Fast thread-local random
97        let should_sample = fastrand::u32(1..=rate) == 1;
98        
99        if should_sample {
100            self.samples_taken.fetch_add(1, Ordering::Relaxed);
101        } else {
102            self.samples_dropped.fetch_add(1, Ordering::Relaxed);
103        }
104        
105        should_sample
106    }
107    
108    fn should_sample_dynamic(&self) -> bool {
109        // Check if we need to adjust rate
110        let mut last_adjustment = self.last_adjustment.lock();
111        let now = Instant::now();
112        
113        if now.duration_since(*last_adjustment) > Duration::from_secs(1) {
114            self.adjust_dynamic_rate();
115            *last_adjustment = now;
116        }
117        drop(last_adjustment);
118        
119        self.should_sample_fixed()
120    }
121    
122    fn should_sample_time_based(&self, min_interval: Duration) -> bool {
123        thread_local! {
124            static LAST_SAMPLE: std::cell::RefCell<Option<Instant>> = const { std::cell::RefCell::new(None) };
125        }
126        
127        LAST_SAMPLE.with(|last| {
128            let mut last = last.borrow_mut();
129            let now = Instant::now();
130            
131            match *last {
132                Some(last_time) if now.duration_since(last_time) < min_interval => {
133                    self.samples_dropped.fetch_add(1, Ordering::Relaxed);
134                    false
135                }
136                _ => {
137                    *last = Some(now);
138                    self.samples_taken.fetch_add(1, Ordering::Relaxed);
139                    true
140                }
141            }
142        })
143    }
144    
145    fn adjust_dynamic_rate(&self) {
146        if let SamplingStrategy::Dynamic { min_rate, max_rate, target_throughput } = self.strategy {
147            let taken = self.samples_taken.load(Ordering::Relaxed);
148            let current_rate = self.current_rate.load(Ordering::Relaxed);
149            
150            let new_rate = if taken > target_throughput {
151                // Increase sampling rate (sample less)
152                (current_rate * 2).min(max_rate)
153            } else if taken < target_throughput / 2 {
154                // Decrease sampling rate (sample more)
155                (current_rate / 2).max(min_rate)
156            } else {
157                current_rate
158            };
159            
160            if new_rate != current_rate {
161                self.current_rate.store(new_rate, Ordering::Relaxed);
162                self.overloaded.store(new_rate > min_rate * 2, Ordering::Relaxed);
163            }
164            
165            // Reset counters
166            self.samples_taken.store(0, Ordering::Relaxed);
167            self.samples_dropped.store(0, Ordering::Relaxed);
168        }
169    }
170    
171    /// Get current sampling rate
172    #[inline]
173    pub fn current_rate(&self) -> u32 {
174        self.current_rate.load(Ordering::Relaxed)
175    }
176    
177    /// Check if system is overloaded
178    #[inline]
179    pub fn is_overloaded(&self) -> bool {
180        self.overloaded.load(Ordering::Relaxed)
181    }
182    
183    /// Get sampling statistics
184    pub fn stats(&self) -> SamplingStats {
185        SamplingStats {
186            samples_taken: self.samples_taken.load(Ordering::Relaxed),
187            samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
188            current_rate: self.current_rate.load(Ordering::Relaxed),
189            is_overloaded: self.is_overloaded(),
190        }
191    }
192}
193
194/// Sampling statistics
195#[derive(Debug, Clone)]
196pub struct SamplingStats {
197    pub samples_taken: u64,
198    pub samples_dropped: u64,
199    pub current_rate: u32,
200    pub is_overloaded: bool,
201}
202
203impl SamplingStats {
204    /// Calculate sampling percentage
205    pub fn sampling_percentage(&self) -> f64 {
206        let total = self.samples_taken + self.samples_dropped;
207        if total == 0 {
208            100.0
209        } else {
210            (self.samples_taken as f64 / total as f64) * 100.0
211        }
212    }
213}
214
215/// Circuit breaker for metric recording
216pub struct MetricCircuitBreaker {
217    state: AtomicU32,
218    failures: AtomicU64,
219    successes: AtomicU64,
220    last_state_change: parking_lot::Mutex<Instant>,
221    config: CircuitBreakerConfig,
222}
223
224#[derive(Debug, Clone)]
225pub struct CircuitBreakerConfig {
226    pub failure_threshold: u64,
227    pub success_threshold: u64,
228    pub timeout: Duration,
229    pub half_open_max_calls: u64,
230}
231
232impl Default for CircuitBreakerConfig {
233    fn default() -> Self {
234        Self {
235            failure_threshold: 5,
236            success_threshold: 3,
237            timeout: Duration::from_secs(30),
238            half_open_max_calls: 10,
239        }
240    }
241}
242
243#[repr(u32)]
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245enum CircuitState {
246    Closed = 0,
247    Open = 1,
248    HalfOpen = 2,
249}
250
251impl MetricCircuitBreaker {
252    /// Create new circuit breaker
253    pub fn new(config: CircuitBreakerConfig) -> Self {
254        Self {
255            state: AtomicU32::new(CircuitState::Closed as u32),
256            failures: AtomicU64::new(0),
257            successes: AtomicU64::new(0),
258            last_state_change: parking_lot::Mutex::new(Instant::now()),
259            config,
260        }
261    }
262    
263    /// Check if operation is allowed
264    #[inline]
265    pub fn is_allowed(&self) -> bool {
266        let state = self.get_state();
267        
268        match state {
269            CircuitState::Closed => true,
270            CircuitState::Open => {
271                // Check if timeout has passed
272                let last_change = *self.last_state_change.lock();
273                if Instant::now().duration_since(last_change) > self.config.timeout {
274                    self.transition_to(CircuitState::HalfOpen);
275                    true
276                } else {
277                    false
278                }
279            }
280            CircuitState::HalfOpen => {
281                // Allow limited calls
282                let calls = self.successes.load(Ordering::Relaxed) + 
283                           self.failures.load(Ordering::Relaxed);
284                calls < self.config.half_open_max_calls
285            }
286        }
287    }
288    
289    /// Record success
290    #[inline]
291    pub fn record_success(&self) {
292        let state = self.get_state();
293        
294        match state {
295            CircuitState::Closed => {
296                self.failures.store(0, Ordering::Relaxed);
297            }
298            CircuitState::HalfOpen => {
299                let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
300                if successes >= self.config.success_threshold {
301                    self.transition_to(CircuitState::Closed);
302                }
303            }
304            CircuitState::Open => {} // Shouldn't happen
305        }
306    }
307    
308    /// Record failure
309    #[inline]
310    pub fn record_failure(&self) {
311        let state = self.get_state();
312        
313        match state {
314            CircuitState::Closed => {
315                let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
316                if failures >= self.config.failure_threshold {
317                    self.transition_to(CircuitState::Open);
318                }
319            }
320            CircuitState::HalfOpen => {
321                self.transition_to(CircuitState::Open);
322            }
323            CircuitState::Open => {} // Already open
324        }
325    }
326    
327    #[inline]
328    fn get_state(&self) -> CircuitState {
329        match self.state.load(Ordering::Relaxed) {
330            0 => CircuitState::Closed,
331            1 => CircuitState::Open,
332            2 => CircuitState::HalfOpen,
333            _ => unreachable!(),
334        }
335    }
336    
337    fn transition_to(&self, new_state: CircuitState) {
338        self.state.store(new_state as u32, Ordering::Relaxed);
339        self.failures.store(0, Ordering::Relaxed);
340        self.successes.store(0, Ordering::Relaxed);
341        *self.last_state_change.lock() = Instant::now();
342    }
343}
344
345/// Backpressure controller
346pub struct BackpressureController {
347    max_pending: usize,
348    pending: AtomicU64,
349    rejected: AtomicU64,
350}
351
352impl BackpressureController {
353    /// Create new controller
354    pub fn new(max_pending: usize) -> Self {
355        Self {
356            max_pending,
357            pending: AtomicU64::new(0),
358            rejected: AtomicU64::new(0),
359        }
360    }
361    
362    /// Try to acquire slot
363    #[inline]
364    pub fn try_acquire(&self) -> Option<BackpressureGuard<'_>> {
365        let pending = self.pending.fetch_add(1, Ordering::Relaxed);
366        
367        if pending >= self.max_pending as u64 {
368            self.pending.fetch_sub(1, Ordering::Relaxed);
369            self.rejected.fetch_add(1, Ordering::Relaxed);
370            None
371        } else {
372            Some(BackpressureGuard { controller: self })
373        }
374    }
375    
376    /// Get current pending count
377    #[inline]
378    pub fn pending_count(&self) -> u64 {
379        self.pending.load(Ordering::Relaxed)
380    }
381    
382    /// Get rejected count
383    #[inline]
384    pub fn rejected_count(&self) -> u64 {
385        self.rejected.load(Ordering::Relaxed)
386    }
387}
388
389/// RAII guard for backpressure
390pub struct BackpressureGuard<'a> {
391    controller: &'a BackpressureController,
392}
393
394impl<'a> Drop for BackpressureGuard<'a> {
395    #[inline]
396    fn drop(&mut self) {
397        self.controller.pending.fetch_sub(1, Ordering::Relaxed);
398    }
399}
400
401// Re-export for convenience
402pub mod fastrand {
403    #[inline]
404    pub fn u32(range: std::ops::RangeInclusive<u32>) -> u32 {
405        let start = *range.start();
406        let end = *range.end();
407        if start == end {
408            return start;
409        }
410        
411        // Fast thread-local RNG using xorshift
412        thread_local! {
413            static RNG: std::cell::Cell<u32> = std::cell::Cell::new({
414                let mut hasher = std::collections::hash_map::DefaultHasher::new();
415                std::hash::Hash::hash(&std::thread::current().id(), &mut hasher);
416                std::hash::Hasher::finish(&hasher) as u32 | 1
417            });
418        }
419        
420        RNG.with(|rng| {
421            let mut x = rng.get();
422            x ^= x << 13;
423            x ^= x >> 17;
424            x ^= x << 5;
425            rng.set(x);
426            start + (x % (end - start + 1))
427        })
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    
435    #[test]
436    fn test_fixed_sampling() {
437        let sampler = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
438        
439        let mut sampled = 0;
440        for _ in 0..1000 {
441            if sampler.should_sample() {
442                sampled += 1;
443            }
444        }
445        
446        // Should be approximately 100 (10%)
447        assert!(sampled > 50 && sampled < 150);
448    }
449    
450    #[test]
451    fn test_circuit_breaker() {
452        let breaker = MetricCircuitBreaker::new(CircuitBreakerConfig {
453            failure_threshold: 3,
454            success_threshold: 2,
455            timeout: Duration::from_millis(100),
456            half_open_max_calls: 5,
457        });
458        
459        // Initially closed
460        assert!(breaker.is_allowed());
461        
462        // Record failures to open
463        for _ in 0..3 {
464            breaker.record_failure();
465        }
466        
467        // Should be open
468        assert!(!breaker.is_allowed());
469        
470        // Wait for timeout
471        std::thread::sleep(Duration::from_millis(150));
472        
473        // Should transition to half-open
474        assert!(breaker.is_allowed());
475        
476        // Record successes to close
477        breaker.record_success();
478        breaker.record_success();
479        
480        // Should be closed again
481        assert!(breaker.is_allowed());
482    }
483    
484    #[test]
485    fn test_backpressure() {
486        let controller = BackpressureController::new(5);
487        
488        let mut guards = Vec::new();
489        
490        // Acquire up to limit
491        for _ in 0..5 {
492            guards.push(controller.try_acquire().unwrap());
493        }
494        
495        // Next should fail
496        assert!(controller.try_acquire().is_none());
497        assert_eq!(controller.rejected_count(), 1);
498        
499        // Drop one guard
500        guards.pop();
501        
502        // Should be able to acquire again
503        assert!(controller.try_acquire().is_some());
504    }
505}