1use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
6use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone, Copy)]
10pub enum SamplingStrategy {
11 Fixed {
16 rate: u32,
18 },
19 Dynamic {
26 min_rate: u32,
28 max_rate: u32,
30 target_throughput: u64,
32 },
33 TimeBased {
38 min_interval: u64,
40 },
41}
42
43pub struct AdaptiveSampler {
45 strategy: SamplingStrategy,
46 current_rate: AtomicU32,
47 samples_taken: AtomicU64,
48 samples_dropped: AtomicU64,
49 last_adjustment: parking_lot::Mutex<Instant>,
50 overloaded: AtomicBool,
51}
52
53impl AdaptiveSampler {
54 pub fn new(strategy: SamplingStrategy) -> Self {
56 let initial_rate = match strategy {
57 SamplingStrategy::Fixed { rate } => rate,
58 SamplingStrategy::Dynamic { min_rate, .. } => min_rate,
59 SamplingStrategy::TimeBased { .. } => 1,
60 };
61
62 Self {
63 strategy,
64 current_rate: AtomicU32::new(initial_rate),
65 samples_taken: AtomicU64::new(0),
66 samples_dropped: AtomicU64::new(0),
67 last_adjustment: parking_lot::Mutex::new(Instant::now()),
68 overloaded: AtomicBool::new(false),
69 }
70 }
71
72 #[inline]
74 pub fn should_sample(&self) -> bool {
75 match self.strategy {
76 SamplingStrategy::Fixed { .. } => self.should_sample_fixed(),
77 SamplingStrategy::Dynamic { .. } => self.should_sample_dynamic(),
78 SamplingStrategy::TimeBased { min_interval } => {
79 self.should_sample_time_based(Duration::from_nanos(min_interval))
80 }
81 }
82 }
83
84 #[inline]
85 fn should_sample_fixed(&self) -> bool {
86 let rate = self.current_rate.load(Ordering::Relaxed);
87 if rate == 1 {
88 self.samples_taken.fetch_add(1, Ordering::Relaxed);
89 return true;
90 }
91
92 let should_sample = fastrand::u32(1..=rate) == 1;
94
95 if should_sample {
96 self.samples_taken.fetch_add(1, Ordering::Relaxed);
97 } else {
98 self.samples_dropped.fetch_add(1, Ordering::Relaxed);
99 }
100
101 should_sample
102 }
103
104 fn should_sample_dynamic(&self) -> bool {
105 let mut last_adjustment = self.last_adjustment.lock();
107 let now = Instant::now();
108
109 if now.duration_since(*last_adjustment) > Duration::from_secs(1) {
110 self.adjust_dynamic_rate();
111 *last_adjustment = now;
112 }
113 drop(last_adjustment);
114
115 self.should_sample_fixed()
116 }
117
118 fn should_sample_time_based(&self, min_interval: Duration) -> bool {
119 thread_local! {
120 static LAST_SAMPLE: std::cell::RefCell<Option<Instant>> = const { std::cell::RefCell::new(None) };
121 }
122
123 LAST_SAMPLE.with(|last| {
124 let mut last = last.borrow_mut();
125 let now = Instant::now();
126
127 match *last {
128 Some(last_time) if now.duration_since(last_time) < min_interval => {
129 self.samples_dropped.fetch_add(1, Ordering::Relaxed);
130 false
131 }
132 _ => {
133 *last = Some(now);
134 self.samples_taken.fetch_add(1, Ordering::Relaxed);
135 true
136 }
137 }
138 })
139 }
140
141 fn adjust_dynamic_rate(&self) {
142 if let SamplingStrategy::Dynamic {
143 min_rate,
144 max_rate,
145 target_throughput,
146 } = self.strategy
147 {
148 let taken = self.samples_taken.load(Ordering::Relaxed);
149 let current_rate = self.current_rate.load(Ordering::Relaxed);
150
151 let new_rate = if taken > target_throughput {
152 (current_rate * 2).min(max_rate)
154 } else if taken < target_throughput / 2 {
155 (current_rate / 2).max(min_rate)
157 } else {
158 current_rate
159 };
160
161 if new_rate != current_rate {
162 self.current_rate.store(new_rate, Ordering::Relaxed);
163 self.overloaded
164 .store(new_rate > min_rate * 2, Ordering::Relaxed);
165 }
166
167 self.samples_taken.store(0, Ordering::Relaxed);
169 self.samples_dropped.store(0, Ordering::Relaxed);
170 }
171 }
172
173 #[inline]
175 pub fn current_rate(&self) -> u32 {
176 self.current_rate.load(Ordering::Relaxed)
177 }
178
179 #[inline]
181 pub fn is_overloaded(&self) -> bool {
182 self.overloaded.load(Ordering::Relaxed)
183 }
184
185 pub fn stats(&self) -> SamplingStats {
187 SamplingStats {
188 samples_taken: self.samples_taken.load(Ordering::Relaxed),
189 samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
190 current_rate: self.current_rate.load(Ordering::Relaxed),
191 is_overloaded: self.is_overloaded(),
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
198pub struct SamplingStats {
199 pub samples_taken: u64,
200 pub samples_dropped: u64,
201 pub current_rate: u32,
202 pub is_overloaded: bool,
203}
204
205impl SamplingStats {
206 pub fn sampling_percentage(&self) -> f64 {
208 let total = self.samples_taken + self.samples_dropped;
209 if total == 0 {
210 100.0
211 } else {
212 (self.samples_taken as f64 / total as f64) * 100.0
213 }
214 }
215}
216
217pub struct MetricCircuitBreaker {
219 state: AtomicU32,
220 failures: AtomicU64,
221 successes: AtomicU64,
222 last_state_change: parking_lot::Mutex<Instant>,
223 config: CircuitBreakerConfig,
224}
225
226#[derive(Debug, Clone)]
227pub struct CircuitBreakerConfig {
228 pub failure_threshold: u64,
229 pub success_threshold: u64,
230 pub timeout: Duration,
231 pub half_open_max_calls: u64,
232}
233
234impl Default for CircuitBreakerConfig {
235 fn default() -> Self {
236 Self {
237 failure_threshold: 5,
238 success_threshold: 3,
239 timeout: Duration::from_secs(30),
240 half_open_max_calls: 10,
241 }
242 }
243}
244
245#[repr(u32)]
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247enum CircuitState {
248 Closed = 0,
249 Open = 1,
250 HalfOpen = 2,
251}
252
253impl MetricCircuitBreaker {
254 pub fn new(config: CircuitBreakerConfig) -> Self {
256 Self {
257 state: AtomicU32::new(CircuitState::Closed as u32),
258 failures: AtomicU64::new(0),
259 successes: AtomicU64::new(0),
260 last_state_change: parking_lot::Mutex::new(Instant::now()),
261 config,
262 }
263 }
264
265 #[inline]
267 pub fn is_allowed(&self) -> bool {
268 let state = self.get_state();
269
270 match state {
271 CircuitState::Closed => true,
272 CircuitState::Open => {
273 let last_change = *self.last_state_change.lock();
275 if Instant::now().duration_since(last_change) > self.config.timeout {
276 self.transition_to(CircuitState::HalfOpen);
277 true
278 } else {
279 false
280 }
281 }
282 CircuitState::HalfOpen => {
283 let calls =
285 self.successes.load(Ordering::Relaxed) + self.failures.load(Ordering::Relaxed);
286 calls < self.config.half_open_max_calls
287 }
288 }
289 }
290
291 #[inline]
293 pub fn record_success(&self) {
294 let state = self.get_state();
295
296 match state {
297 CircuitState::Closed => {
298 self.failures.store(0, Ordering::Relaxed);
299 }
300 CircuitState::HalfOpen => {
301 let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
302 if successes >= self.config.success_threshold {
303 self.transition_to(CircuitState::Closed);
304 }
305 }
306 CircuitState::Open => {} }
308 }
309
310 #[inline]
312 pub fn record_failure(&self) {
313 let state = self.get_state();
314
315 match state {
316 CircuitState::Closed => {
317 let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
318 if failures >= self.config.failure_threshold {
319 self.transition_to(CircuitState::Open);
320 }
321 }
322 CircuitState::HalfOpen => {
323 self.transition_to(CircuitState::Open);
324 }
325 CircuitState::Open => {} }
327 }
328
329 #[inline]
330 fn get_state(&self) -> CircuitState {
331 match self.state.load(Ordering::Relaxed) {
332 0 => CircuitState::Closed,
333 1 => CircuitState::Open,
334 2 => CircuitState::HalfOpen,
335 _ => unreachable!(),
336 }
337 }
338
339 fn transition_to(&self, new_state: CircuitState) {
340 self.state.store(new_state as u32, Ordering::Relaxed);
341 self.failures.store(0, Ordering::Relaxed);
342 self.successes.store(0, Ordering::Relaxed);
343 *self.last_state_change.lock() = Instant::now();
344 }
345}
346
347pub struct BackpressureController {
349 max_pending: usize,
350 pending: AtomicU64,
351 rejected: AtomicU64,
352}
353
354impl BackpressureController {
355 pub fn new(max_pending: usize) -> Self {
357 Self {
358 max_pending,
359 pending: AtomicU64::new(0),
360 rejected: AtomicU64::new(0),
361 }
362 }
363
364 #[inline]
366 pub fn try_acquire(&self) -> Option<BackpressureGuard<'_>> {
367 let pending = self.pending.fetch_add(1, Ordering::Relaxed);
368
369 if pending >= self.max_pending as u64 {
370 self.pending.fetch_sub(1, Ordering::Relaxed);
371 self.rejected.fetch_add(1, Ordering::Relaxed);
372 None
373 } else {
374 Some(BackpressureGuard { controller: self })
375 }
376 }
377
378 #[inline]
380 pub fn pending_count(&self) -> u64 {
381 self.pending.load(Ordering::Relaxed)
382 }
383
384 #[inline]
386 pub fn rejected_count(&self) -> u64 {
387 self.rejected.load(Ordering::Relaxed)
388 }
389}
390
391pub struct BackpressureGuard<'a> {
393 controller: &'a BackpressureController,
394}
395
396impl<'a> Drop for BackpressureGuard<'a> {
397 #[inline]
398 fn drop(&mut self) {
399 self.controller.pending.fetch_sub(1, Ordering::Relaxed);
400 }
401}
402
403pub mod fastrand {
405 #[inline]
406 pub fn u32(range: std::ops::RangeInclusive<u32>) -> u32 {
407 let start = *range.start();
408 let end = *range.end();
409 if start == end {
410 return start;
411 }
412
413 thread_local! {
415 static RNG: std::cell::Cell<u32> = std::cell::Cell::new({
416 let mut hasher = std::collections::hash_map::DefaultHasher::new();
417 std::hash::Hash::hash(&std::thread::current().id(), &mut hasher);
418 std::hash::Hasher::finish(&hasher) as u32 | 1
419 });
420 }
421
422 RNG.with(|rng| {
423 let mut x = rng.get();
424 x ^= x << 13;
425 x ^= x >> 17;
426 x ^= x << 5;
427 rng.set(x);
428 start + (x % (end - start + 1))
429 })
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_fixed_sampling() {
439 let sampler = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
440
441 let mut sampled = 0;
442 for _ in 0..1000 {
443 if sampler.should_sample() {
444 sampled += 1;
445 }
446 }
447
448 assert!(sampled > 50 && sampled < 150);
450 }
451
452 #[test]
453 fn test_circuit_breaker() {
454 let breaker = MetricCircuitBreaker::new(CircuitBreakerConfig {
455 failure_threshold: 3,
456 success_threshold: 2,
457 timeout: Duration::from_millis(100),
458 half_open_max_calls: 5,
459 });
460
461 assert!(breaker.is_allowed());
463
464 for _ in 0..3 {
466 breaker.record_failure();
467 }
468
469 assert!(!breaker.is_allowed());
471
472 std::thread::sleep(Duration::from_millis(150));
474
475 assert!(breaker.is_allowed());
477
478 breaker.record_success();
480 breaker.record_success();
481
482 assert!(breaker.is_allowed());
484 }
485
486 #[test]
487 fn test_backpressure() {
488 let controller = BackpressureController::new(5);
489
490 let mut guards = Vec::new();
491
492 for _ in 0..5 {
494 guards.push(controller.try_acquire().unwrap());
495 }
496
497 assert!(controller.try_acquire().is_none());
499 assert_eq!(controller.rejected_count(), 1);
500
501 guards.pop();
503
504 assert!(controller.try_acquire().is_some());
506 }
507
508 #[test]
509 fn test_time_based_sampling_interval() {
510 let sampler = AdaptiveSampler::new(SamplingStrategy::TimeBased {
512 min_interval: 5_000_000,
513 });
514
515 assert!(sampler.should_sample());
517
518 assert!(!sampler.should_sample());
520
521 std::thread::sleep(Duration::from_millis(6));
523 assert!(sampler.should_sample());
524 }
525
526 #[test]
527 fn test_sampling_stats_percentage() {
528 let sampler_zero = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
530 let stats_zero = sampler_zero.stats();
531 assert_eq!(stats_zero.samples_taken, 0);
532 assert_eq!(stats_zero.samples_dropped, 0);
533 assert!((stats_zero.sampling_percentage() - 100.0).abs() < f64::EPSILON);
534
535 let sampler = AdaptiveSampler::new(SamplingStrategy::TimeBased {
537 min_interval: 5_000_000,
538 });
539
540 assert!(sampler.should_sample()); assert!(!sampler.should_sample()); let stats = sampler.stats();
544 assert_eq!(stats.samples_taken, 1);
545 assert_eq!(stats.samples_dropped, 1);
546 assert!((stats.sampling_percentage() - 50.0).abs() < 0.0001);
547 }
548}