1use std::sync::atomic::{AtomicU32, AtomicU64, AtomicBool, Ordering};
6use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone, Copy)]
10pub enum SamplingStrategy {
11 Fixed {
16 rate: u32
18 },
19 Dynamic {
26 min_rate: u32,
28 max_rate: u32,
30 target_throughput: u64,
32 },
33 TimeBased {
38 min_interval: u64,
40 },
41}
42
43pub struct AdaptiveSampler {
45 strategy: SamplingStrategy,
46 current_rate: AtomicU32,
47 samples_taken: AtomicU64,
48 samples_dropped: AtomicU64,
49 last_adjustment: parking_lot::Mutex<Instant>,
50 overloaded: AtomicBool,
51}
52
53impl AdaptiveSampler {
54 pub fn new(strategy: SamplingStrategy) -> Self {
56 let initial_rate = match strategy {
57 SamplingStrategy::Fixed { rate } => rate,
58 SamplingStrategy::Dynamic { min_rate, .. } => min_rate,
59 SamplingStrategy::TimeBased { .. } => 1,
60 };
61
62 Self {
63 strategy,
64 current_rate: AtomicU32::new(initial_rate),
65 samples_taken: AtomicU64::new(0),
66 samples_dropped: AtomicU64::new(0),
67 last_adjustment: parking_lot::Mutex::new(Instant::now()),
68 overloaded: AtomicBool::new(false),
69 }
70 }
71
72 #[inline]
74 pub fn should_sample(&self) -> bool {
75 match self.strategy {
76 SamplingStrategy::Fixed { .. } => {
77 self.should_sample_fixed()
78 }
79 SamplingStrategy::Dynamic { .. } => {
80 self.should_sample_dynamic()
81 }
82 SamplingStrategy::TimeBased { min_interval } => {
83 self.should_sample_time_based(Duration::from_nanos(min_interval))
84 }
85 }
86 }
87
88 #[inline]
89 fn should_sample_fixed(&self) -> bool {
90 let rate = self.current_rate.load(Ordering::Relaxed);
91 if rate == 1 {
92 self.samples_taken.fetch_add(1, Ordering::Relaxed);
93 return true;
94 }
95
96 let should_sample = fastrand::u32(1..=rate) == 1;
98
99 if should_sample {
100 self.samples_taken.fetch_add(1, Ordering::Relaxed);
101 } else {
102 self.samples_dropped.fetch_add(1, Ordering::Relaxed);
103 }
104
105 should_sample
106 }
107
108 fn should_sample_dynamic(&self) -> bool {
109 let mut last_adjustment = self.last_adjustment.lock();
111 let now = Instant::now();
112
113 if now.duration_since(*last_adjustment) > Duration::from_secs(1) {
114 self.adjust_dynamic_rate();
115 *last_adjustment = now;
116 }
117 drop(last_adjustment);
118
119 self.should_sample_fixed()
120 }
121
122 fn should_sample_time_based(&self, min_interval: Duration) -> bool {
123 thread_local! {
124 static LAST_SAMPLE: std::cell::RefCell<Option<Instant>> = const { std::cell::RefCell::new(None) };
125 }
126
127 LAST_SAMPLE.with(|last| {
128 let mut last = last.borrow_mut();
129 let now = Instant::now();
130
131 match *last {
132 Some(last_time) if now.duration_since(last_time) < min_interval => {
133 self.samples_dropped.fetch_add(1, Ordering::Relaxed);
134 false
135 }
136 _ => {
137 *last = Some(now);
138 self.samples_taken.fetch_add(1, Ordering::Relaxed);
139 true
140 }
141 }
142 })
143 }
144
145 fn adjust_dynamic_rate(&self) {
146 if let SamplingStrategy::Dynamic { min_rate, max_rate, target_throughput } = self.strategy {
147 let taken = self.samples_taken.load(Ordering::Relaxed);
148 let current_rate = self.current_rate.load(Ordering::Relaxed);
149
150 let new_rate = if taken > target_throughput {
151 (current_rate * 2).min(max_rate)
153 } else if taken < target_throughput / 2 {
154 (current_rate / 2).max(min_rate)
156 } else {
157 current_rate
158 };
159
160 if new_rate != current_rate {
161 self.current_rate.store(new_rate, Ordering::Relaxed);
162 self.overloaded.store(new_rate > min_rate * 2, Ordering::Relaxed);
163 }
164
165 self.samples_taken.store(0, Ordering::Relaxed);
167 self.samples_dropped.store(0, Ordering::Relaxed);
168 }
169 }
170
171 #[inline]
173 pub fn current_rate(&self) -> u32 {
174 self.current_rate.load(Ordering::Relaxed)
175 }
176
177 #[inline]
179 pub fn is_overloaded(&self) -> bool {
180 self.overloaded.load(Ordering::Relaxed)
181 }
182
183 pub fn stats(&self) -> SamplingStats {
185 SamplingStats {
186 samples_taken: self.samples_taken.load(Ordering::Relaxed),
187 samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
188 current_rate: self.current_rate.load(Ordering::Relaxed),
189 is_overloaded: self.is_overloaded(),
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
196pub struct SamplingStats {
197 pub samples_taken: u64,
198 pub samples_dropped: u64,
199 pub current_rate: u32,
200 pub is_overloaded: bool,
201}
202
203impl SamplingStats {
204 pub fn sampling_percentage(&self) -> f64 {
206 let total = self.samples_taken + self.samples_dropped;
207 if total == 0 {
208 100.0
209 } else {
210 (self.samples_taken as f64 / total as f64) * 100.0
211 }
212 }
213}
214
215pub struct MetricCircuitBreaker {
217 state: AtomicU32,
218 failures: AtomicU64,
219 successes: AtomicU64,
220 last_state_change: parking_lot::Mutex<Instant>,
221 config: CircuitBreakerConfig,
222}
223
224#[derive(Debug, Clone)]
225pub struct CircuitBreakerConfig {
226 pub failure_threshold: u64,
227 pub success_threshold: u64,
228 pub timeout: Duration,
229 pub half_open_max_calls: u64,
230}
231
232impl Default for CircuitBreakerConfig {
233 fn default() -> Self {
234 Self {
235 failure_threshold: 5,
236 success_threshold: 3,
237 timeout: Duration::from_secs(30),
238 half_open_max_calls: 10,
239 }
240 }
241}
242
243#[repr(u32)]
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245enum CircuitState {
246 Closed = 0,
247 Open = 1,
248 HalfOpen = 2,
249}
250
251impl MetricCircuitBreaker {
252 pub fn new(config: CircuitBreakerConfig) -> Self {
254 Self {
255 state: AtomicU32::new(CircuitState::Closed as u32),
256 failures: AtomicU64::new(0),
257 successes: AtomicU64::new(0),
258 last_state_change: parking_lot::Mutex::new(Instant::now()),
259 config,
260 }
261 }
262
263 #[inline]
265 pub fn is_allowed(&self) -> bool {
266 let state = self.get_state();
267
268 match state {
269 CircuitState::Closed => true,
270 CircuitState::Open => {
271 let last_change = *self.last_state_change.lock();
273 if Instant::now().duration_since(last_change) > self.config.timeout {
274 self.transition_to(CircuitState::HalfOpen);
275 true
276 } else {
277 false
278 }
279 }
280 CircuitState::HalfOpen => {
281 let calls = self.successes.load(Ordering::Relaxed) +
283 self.failures.load(Ordering::Relaxed);
284 calls < self.config.half_open_max_calls
285 }
286 }
287 }
288
289 #[inline]
291 pub fn record_success(&self) {
292 let state = self.get_state();
293
294 match state {
295 CircuitState::Closed => {
296 self.failures.store(0, Ordering::Relaxed);
297 }
298 CircuitState::HalfOpen => {
299 let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
300 if successes >= self.config.success_threshold {
301 self.transition_to(CircuitState::Closed);
302 }
303 }
304 CircuitState::Open => {} }
306 }
307
308 #[inline]
310 pub fn record_failure(&self) {
311 let state = self.get_state();
312
313 match state {
314 CircuitState::Closed => {
315 let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
316 if failures >= self.config.failure_threshold {
317 self.transition_to(CircuitState::Open);
318 }
319 }
320 CircuitState::HalfOpen => {
321 self.transition_to(CircuitState::Open);
322 }
323 CircuitState::Open => {} }
325 }
326
327 #[inline]
328 fn get_state(&self) -> CircuitState {
329 match self.state.load(Ordering::Relaxed) {
330 0 => CircuitState::Closed,
331 1 => CircuitState::Open,
332 2 => CircuitState::HalfOpen,
333 _ => unreachable!(),
334 }
335 }
336
337 fn transition_to(&self, new_state: CircuitState) {
338 self.state.store(new_state as u32, Ordering::Relaxed);
339 self.failures.store(0, Ordering::Relaxed);
340 self.successes.store(0, Ordering::Relaxed);
341 *self.last_state_change.lock() = Instant::now();
342 }
343}
344
345pub struct BackpressureController {
347 max_pending: usize,
348 pending: AtomicU64,
349 rejected: AtomicU64,
350}
351
352impl BackpressureController {
353 pub fn new(max_pending: usize) -> Self {
355 Self {
356 max_pending,
357 pending: AtomicU64::new(0),
358 rejected: AtomicU64::new(0),
359 }
360 }
361
362 #[inline]
364 pub fn try_acquire(&self) -> Option<BackpressureGuard<'_>> {
365 let pending = self.pending.fetch_add(1, Ordering::Relaxed);
366
367 if pending >= self.max_pending as u64 {
368 self.pending.fetch_sub(1, Ordering::Relaxed);
369 self.rejected.fetch_add(1, Ordering::Relaxed);
370 None
371 } else {
372 Some(BackpressureGuard { controller: self })
373 }
374 }
375
376 #[inline]
378 pub fn pending_count(&self) -> u64 {
379 self.pending.load(Ordering::Relaxed)
380 }
381
382 #[inline]
384 pub fn rejected_count(&self) -> u64 {
385 self.rejected.load(Ordering::Relaxed)
386 }
387}
388
389pub struct BackpressureGuard<'a> {
391 controller: &'a BackpressureController,
392}
393
394impl<'a> Drop for BackpressureGuard<'a> {
395 #[inline]
396 fn drop(&mut self) {
397 self.controller.pending.fetch_sub(1, Ordering::Relaxed);
398 }
399}
400
401pub mod fastrand {
403 #[inline]
404 pub fn u32(range: std::ops::RangeInclusive<u32>) -> u32 {
405 let start = *range.start();
406 let end = *range.end();
407 if start == end {
408 return start;
409 }
410
411 thread_local! {
413 static RNG: std::cell::Cell<u32> = std::cell::Cell::new({
414 let mut hasher = std::collections::hash_map::DefaultHasher::new();
415 std::hash::Hash::hash(&std::thread::current().id(), &mut hasher);
416 std::hash::Hasher::finish(&hasher) as u32 | 1
417 });
418 }
419
420 RNG.with(|rng| {
421 let mut x = rng.get();
422 x ^= x << 13;
423 x ^= x >> 17;
424 x ^= x << 5;
425 rng.set(x);
426 start + (x % (end - start + 1))
427 })
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn test_fixed_sampling() {
437 let sampler = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
438
439 let mut sampled = 0;
440 for _ in 0..1000 {
441 if sampler.should_sample() {
442 sampled += 1;
443 }
444 }
445
446 assert!(sampled > 50 && sampled < 150);
448 }
449
450 #[test]
451 fn test_circuit_breaker() {
452 let breaker = MetricCircuitBreaker::new(CircuitBreakerConfig {
453 failure_threshold: 3,
454 success_threshold: 2,
455 timeout: Duration::from_millis(100),
456 half_open_max_calls: 5,
457 });
458
459 assert!(breaker.is_allowed());
461
462 for _ in 0..3 {
464 breaker.record_failure();
465 }
466
467 assert!(!breaker.is_allowed());
469
470 std::thread::sleep(Duration::from_millis(150));
472
473 assert!(breaker.is_allowed());
475
476 breaker.record_success();
478 breaker.record_success();
479
480 assert!(breaker.is_allowed());
482 }
483
484 #[test]
485 fn test_backpressure() {
486 let controller = BackpressureController::new(5);
487
488 let mut guards = Vec::new();
489
490 for _ in 0..5 {
492 guards.push(controller.try_acquire().unwrap());
493 }
494
495 assert!(controller.try_acquire().is_none());
497 assert_eq!(controller.rejected_count(), 1);
498
499 guards.pop();
501
502 assert!(controller.try_acquire().is_some());
504 }
505}