use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
pub enum SamplingStrategy {
Fixed {
rate: u32,
},
Dynamic {
min_rate: u32,
max_rate: u32,
target_throughput: u64,
},
TimeBased {
min_interval: u64,
},
}
pub struct AdaptiveSampler {
strategy: SamplingStrategy,
current_rate: AtomicU32,
samples_taken: AtomicU64,
samples_dropped: AtomicU64,
last_adjustment: parking_lot::Mutex<Instant>,
overloaded: AtomicBool,
}
impl AdaptiveSampler {
pub fn new(strategy: SamplingStrategy) -> Self {
let initial_rate = match strategy {
SamplingStrategy::Fixed { rate } => rate,
SamplingStrategy::Dynamic { min_rate, .. } => min_rate,
SamplingStrategy::TimeBased { .. } => 1,
};
Self {
strategy,
current_rate: AtomicU32::new(initial_rate),
samples_taken: AtomicU64::new(0),
samples_dropped: AtomicU64::new(0),
last_adjustment: parking_lot::Mutex::new(Instant::now()),
overloaded: AtomicBool::new(false),
}
}
#[inline]
pub fn should_sample(&self) -> bool {
match self.strategy {
SamplingStrategy::Fixed { .. } => self.should_sample_fixed(),
SamplingStrategy::Dynamic { .. } => self.should_sample_dynamic(),
SamplingStrategy::TimeBased { min_interval } => {
self.should_sample_time_based(Duration::from_nanos(min_interval))
}
}
}
#[inline]
fn should_sample_fixed(&self) -> bool {
let rate = self.current_rate.load(Ordering::Relaxed);
if rate == 1 {
self.samples_taken.fetch_add(1, Ordering::Relaxed);
return true;
}
let should_sample = fastrand::u32(1..=rate) == 1;
if should_sample {
self.samples_taken.fetch_add(1, Ordering::Relaxed);
} else {
self.samples_dropped.fetch_add(1, Ordering::Relaxed);
}
should_sample
}
fn should_sample_dynamic(&self) -> bool {
let mut last_adjustment = self.last_adjustment.lock();
let now = Instant::now();
if now.duration_since(*last_adjustment) > Duration::from_secs(1) {
self.adjust_dynamic_rate();
*last_adjustment = now;
}
drop(last_adjustment);
self.should_sample_fixed()
}
fn should_sample_time_based(&self, min_interval: Duration) -> bool {
thread_local! {
static LAST_SAMPLE: std::cell::RefCell<Option<Instant>> = const { std::cell::RefCell::new(None) };
}
LAST_SAMPLE.with(|last| {
let mut last = last.borrow_mut();
let now = Instant::now();
match *last {
Some(last_time) if now.duration_since(last_time) < min_interval => {
self.samples_dropped.fetch_add(1, Ordering::Relaxed);
false
}
_ => {
*last = Some(now);
self.samples_taken.fetch_add(1, Ordering::Relaxed);
true
}
}
})
}
fn adjust_dynamic_rate(&self) {
if let SamplingStrategy::Dynamic {
min_rate,
max_rate,
target_throughput,
} = self.strategy
{
let taken = self.samples_taken.load(Ordering::Relaxed);
let current_rate = self.current_rate.load(Ordering::Relaxed);
let new_rate = if taken > target_throughput {
(current_rate * 2).min(max_rate)
} else if taken < target_throughput / 2 {
(current_rate / 2).max(min_rate)
} else {
current_rate
};
if new_rate != current_rate {
self.current_rate.store(new_rate, Ordering::Relaxed);
self.overloaded
.store(new_rate > min_rate * 2, Ordering::Relaxed);
}
self.samples_taken.store(0, Ordering::Relaxed);
self.samples_dropped.store(0, Ordering::Relaxed);
}
}
#[inline]
pub fn current_rate(&self) -> u32 {
self.current_rate.load(Ordering::Relaxed)
}
#[inline]
pub fn is_overloaded(&self) -> bool {
self.overloaded.load(Ordering::Relaxed)
}
pub fn stats(&self) -> SamplingStats {
SamplingStats {
samples_taken: self.samples_taken.load(Ordering::Relaxed),
samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
current_rate: self.current_rate.load(Ordering::Relaxed),
is_overloaded: self.is_overloaded(),
}
}
}
#[derive(Debug, Clone)]
pub struct SamplingStats {
pub samples_taken: u64,
pub samples_dropped: u64,
pub current_rate: u32,
pub is_overloaded: bool,
}
impl SamplingStats {
pub fn sampling_percentage(&self) -> f64 {
let total = self.samples_taken + self.samples_dropped;
if total == 0 {
100.0
} else {
(self.samples_taken as f64 / total as f64) * 100.0
}
}
}
pub struct MetricCircuitBreaker {
state: AtomicU32,
failures: AtomicU64,
successes: AtomicU64,
last_state_change: parking_lot::Mutex<Instant>,
config: CircuitBreakerConfig,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u64,
pub success_threshold: u64,
pub timeout: Duration,
pub half_open_max_calls: u64,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
success_threshold: 3,
timeout: Duration::from_secs(30),
half_open_max_calls: 10,
}
}
}
#[repr(u32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CircuitState {
Closed = 0,
Open = 1,
HalfOpen = 2,
}
impl MetricCircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
state: AtomicU32::new(CircuitState::Closed as u32),
failures: AtomicU64::new(0),
successes: AtomicU64::new(0),
last_state_change: parking_lot::Mutex::new(Instant::now()),
config,
}
}
#[inline]
pub fn is_allowed(&self) -> bool {
let state = self.get_state();
match state {
CircuitState::Closed => true,
CircuitState::Open => {
let last_change = *self.last_state_change.lock();
if Instant::now().duration_since(last_change) > self.config.timeout {
self.transition_to(CircuitState::HalfOpen);
true
} else {
false
}
}
CircuitState::HalfOpen => {
let calls =
self.successes.load(Ordering::Relaxed) + self.failures.load(Ordering::Relaxed);
calls < self.config.half_open_max_calls
}
}
}
#[inline]
pub fn record_success(&self) {
let state = self.get_state();
match state {
CircuitState::Closed => {
self.failures.store(0, Ordering::Relaxed);
}
CircuitState::HalfOpen => {
let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
if successes >= self.config.success_threshold {
self.transition_to(CircuitState::Closed);
}
}
CircuitState::Open => {} }
}
#[inline]
pub fn record_failure(&self) {
let state = self.get_state();
match state {
CircuitState::Closed => {
let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
if failures >= self.config.failure_threshold {
self.transition_to(CircuitState::Open);
}
}
CircuitState::HalfOpen => {
self.transition_to(CircuitState::Open);
}
CircuitState::Open => {} }
}
#[inline]
fn get_state(&self) -> CircuitState {
match self.state.load(Ordering::Relaxed) {
0 => CircuitState::Closed,
1 => CircuitState::Open,
2 => CircuitState::HalfOpen,
_ => unreachable!(),
}
}
fn transition_to(&self, new_state: CircuitState) {
self.state.store(new_state as u32, Ordering::Relaxed);
self.failures.store(0, Ordering::Relaxed);
self.successes.store(0, Ordering::Relaxed);
*self.last_state_change.lock() = Instant::now();
}
}
pub struct BackpressureController {
max_pending: usize,
pending: AtomicU64,
rejected: AtomicU64,
}
impl BackpressureController {
pub fn new(max_pending: usize) -> Self {
Self {
max_pending,
pending: AtomicU64::new(0),
rejected: AtomicU64::new(0),
}
}
#[inline]
pub fn try_acquire(&self) -> Option<BackpressureGuard<'_>> {
let pending = self.pending.fetch_add(1, Ordering::Relaxed);
if pending >= self.max_pending as u64 {
self.pending.fetch_sub(1, Ordering::Relaxed);
self.rejected.fetch_add(1, Ordering::Relaxed);
None
} else {
Some(BackpressureGuard { controller: self })
}
}
#[inline]
pub fn pending_count(&self) -> u64 {
self.pending.load(Ordering::Relaxed)
}
#[inline]
pub fn rejected_count(&self) -> u64 {
self.rejected.load(Ordering::Relaxed)
}
}
pub struct BackpressureGuard<'a> {
controller: &'a BackpressureController,
}
impl<'a> Drop for BackpressureGuard<'a> {
#[inline]
fn drop(&mut self) {
self.controller.pending.fetch_sub(1, Ordering::Relaxed);
}
}
pub mod fastrand {
#[inline]
pub fn u32(range: std::ops::RangeInclusive<u32>) -> u32 {
let start = *range.start();
let end = *range.end();
if start == end {
return start;
}
thread_local! {
static RNG: std::cell::Cell<u64> = std::cell::Cell::new({
let addr = &() as *const () as u64;
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0xdeadbeef_cafebabe);
(addr ^ nanos.wrapping_mul(0x9e3779b97f4a7c15)) | 1
});
}
RNG.with(|rng| {
let mut z = rng.get().wrapping_add(0x9e3779b97f4a7c15);
rng.set(z);
z = (z ^ (z >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
z = (z ^ (z >> 27)).wrapping_mul(0x94d049bb133111eb);
z ^= z >> 31;
start + ((z >> 32) as u32 % (end - start + 1))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fixed_sampling() {
let sampler = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
let mut sampled = 0;
for _ in 0..1000 {
if sampler.should_sample() {
sampled += 1;
}
}
assert!(sampled > 50 && sampled < 150);
}
#[test]
fn test_circuit_breaker() {
let breaker = MetricCircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 3,
success_threshold: 2,
timeout: Duration::from_millis(100),
half_open_max_calls: 5,
});
assert!(breaker.is_allowed());
for _ in 0..3 {
breaker.record_failure();
}
assert!(!breaker.is_allowed());
std::thread::sleep(Duration::from_millis(150));
assert!(breaker.is_allowed());
breaker.record_success();
breaker.record_success();
assert!(breaker.is_allowed());
}
#[test]
fn test_backpressure() {
let controller = BackpressureController::new(5);
let mut guards = Vec::new();
for _ in 0..5 {
guards.push(controller.try_acquire().unwrap());
}
assert!(controller.try_acquire().is_none());
assert_eq!(controller.rejected_count(), 1);
guards.pop();
assert!(controller.try_acquire().is_some());
}
#[test]
fn test_time_based_sampling_interval() {
let sampler = AdaptiveSampler::new(SamplingStrategy::TimeBased {
min_interval: 5_000_000,
});
assert!(sampler.should_sample());
assert!(!sampler.should_sample());
std::thread::sleep(Duration::from_millis(6));
assert!(sampler.should_sample());
}
#[test]
fn test_sampling_stats_percentage() {
let sampler_zero = AdaptiveSampler::new(SamplingStrategy::Fixed { rate: 10 });
let stats_zero = sampler_zero.stats();
assert_eq!(stats_zero.samples_taken, 0);
assert_eq!(stats_zero.samples_dropped, 0);
assert!((stats_zero.sampling_percentage() - 100.0).abs() < f64::EPSILON);
let sampler = AdaptiveSampler::new(SamplingStrategy::TimeBased {
min_interval: 5_000_000,
});
assert!(sampler.should_sample()); assert!(!sampler.should_sample());
let stats = sampler.stats();
assert_eq!(stats.samples_taken, 1);
assert_eq!(stats.samples_dropped, 1);
assert!((stats.sampling_percentage() - 50.0).abs() < 0.0001);
}
}