use parking_lot::RwLock;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex, Notify, Semaphore};
#[derive(Debug)]
pub struct TokenBucket {
tokens: AtomicU64,
capacity: u64,
refill_rate_bits: AtomicU64,
created: Instant,
last_refill: AtomicU64,
stats: TokenBucketStats,
}
impl TokenBucket {
pub fn new(capacity: u64, refill_rate: f64) -> Self {
let now = Instant::now();
Self {
tokens: AtomicU64::new(capacity),
capacity,
refill_rate_bits: AtomicU64::new(refill_rate.to_bits()),
created: now,
last_refill: AtomicU64::new(0),
stats: TokenBucketStats::new(),
}
}
pub fn try_acquire(&self, count: u64) -> bool {
self.refill();
loop {
let current = self.tokens.load(Ordering::Acquire);
if current < count {
self.stats.rejected.fetch_add(1, Ordering::Relaxed);
return false;
}
if self
.tokens
.compare_exchange_weak(
current,
current - count,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
self.stats.acquired.fetch_add(count, Ordering::Relaxed);
return true;
}
std::hint::spin_loop();
}
}
pub async fn acquire(&self, count: u64) {
while !self.try_acquire(count) {
let tokens_needed = count.saturating_sub(self.tokens.load(Ordering::Relaxed));
let rate = self.refill_rate();
let wait_secs = tokens_needed as f64 / rate;
let wait_duration = Duration::from_secs_f64(wait_secs.max(0.001));
tokio::time::sleep(wait_duration).await;
}
}
fn refill_rate(&self) -> f64 {
f64::from_bits(self.refill_rate_bits.load(Ordering::Relaxed))
}
pub fn update_rate(&self, new_rate: f64) {
self.refill_rate_bits
.store(new_rate.to_bits(), Ordering::Relaxed);
}
fn refill(&self) {
let now = self.created.elapsed().as_nanos() as u64;
let last = self.last_refill.load(Ordering::Acquire);
if now <= last {
return;
}
let rate = self.refill_rate();
let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
let new_tokens = (elapsed_secs * rate) as u64;
if new_tokens == 0 {
return;
}
if self
.last_refill
.compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
loop {
let current = self.tokens.load(Ordering::Acquire);
let new_total = (current + new_tokens).min(self.capacity);
if self
.tokens
.compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
}
pub fn available(&self) -> u64 {
self.refill();
self.tokens.load(Ordering::Relaxed)
}
pub fn stats(&self) -> TokenBucketStatsSnapshot {
TokenBucketStatsSnapshot {
acquired: self.stats.acquired.load(Ordering::Relaxed),
rejected: self.stats.rejected.load(Ordering::Relaxed),
available: self.available(),
capacity: self.capacity,
}
}
}
#[derive(Debug)]
struct TokenBucketStats {
acquired: AtomicU64,
rejected: AtomicU64,
}
impl TokenBucketStats {
fn new() -> Self {
Self {
acquired: AtomicU64::new(0),
rejected: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct TokenBucketStatsSnapshot {
pub acquired: u64,
pub rejected: u64,
pub available: u64,
pub capacity: u64,
}
#[derive(Debug)]
pub struct CreditFlowControl {
credits: AtomicU64,
max_credits: u64,
credit_notify: Notify,
stats: CreditStats,
}
impl CreditFlowControl {
pub fn new(initial_credits: u64, max_credits: u64) -> Self {
Self {
credits: AtomicU64::new(initial_credits),
max_credits,
credit_notify: Notify::new(),
stats: CreditStats::new(),
}
}
pub fn try_consume(&self, count: u64) -> bool {
loop {
let current = self.credits.load(Ordering::Acquire);
if current < count {
self.stats.blocked.fetch_add(1, Ordering::Relaxed);
return false;
}
if self
.credits
.compare_exchange_weak(
current,
current - count,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
self.stats.consumed.fetch_add(count, Ordering::Relaxed);
return true;
}
}
}
pub async fn consume(&self, count: u64) {
while !self.try_consume(count) {
self.credit_notify.notified().await;
}
}
pub fn grant(&self, count: u64) {
loop {
let current = self.credits.load(Ordering::Acquire);
let new_total = (current + count).min(self.max_credits);
if self
.credits
.compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.stats.granted.fetch_add(count, Ordering::Relaxed);
self.credit_notify.notify_waiters();
break;
}
}
}
pub fn available(&self) -> u64 {
self.credits.load(Ordering::Relaxed)
}
pub fn stats(&self) -> CreditStatsSnapshot {
CreditStatsSnapshot {
consumed: self.stats.consumed.load(Ordering::Relaxed),
granted: self.stats.granted.load(Ordering::Relaxed),
blocked: self.stats.blocked.load(Ordering::Relaxed),
available: self.available(),
}
}
}
#[derive(Debug)]
struct CreditStats {
consumed: AtomicU64,
granted: AtomicU64,
blocked: AtomicU64,
}
impl CreditStats {
fn new() -> Self {
Self {
consumed: AtomicU64::new(0),
granted: AtomicU64::new(0),
blocked: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct CreditStatsSnapshot {
pub consumed: u64,
pub granted: u64,
pub blocked: u64,
pub available: u64,
}
#[derive(Debug)]
pub struct AdaptiveRateLimiter {
rate: AtomicU64,
min_rate: u64,
max_rate: u64,
target_latency_us: u64,
additive_increase: u64,
multiplicative_decrease: f64,
bucket: TokenBucket,
latency_samples: RwLock<LatencySamples>,
stats: AdaptiveStats,
}
#[derive(Debug)]
struct LatencySamples {
samples: Vec<u64>,
index: usize,
filled: bool,
}
impl LatencySamples {
fn new(size: usize) -> Self {
Self {
samples: vec![0; size],
index: 0,
filled: false,
}
}
fn add(&mut self, latency_us: u64) {
self.samples[self.index] = latency_us;
self.index = (self.index + 1) % self.samples.len();
if self.index == 0 {
self.filled = true;
}
}
fn percentile(&self, p: f64) -> u64 {
let count = if self.filled {
self.samples.len()
} else {
self.index
};
if count == 0 {
return 0;
}
let mut sorted: Vec<u64> = self.samples[..count].to_vec();
sorted.sort_unstable();
let idx = ((count as f64 * p) as usize).min(count - 1);
sorted[idx]
}
}
impl AdaptiveRateLimiter {
pub fn new(config: AdaptiveRateLimiterConfig) -> Self {
let bucket = TokenBucket::new(config.initial_rate, config.initial_rate as f64);
Self {
rate: AtomicU64::new(config.initial_rate),
min_rate: config.min_rate,
max_rate: config.max_rate,
target_latency_us: config.target_latency_us,
additive_increase: config.additive_increase,
multiplicative_decrease: config.multiplicative_decrease,
bucket,
latency_samples: RwLock::new(LatencySamples::new(100)),
stats: AdaptiveStats::new(),
}
}
pub fn try_acquire(&self) -> bool {
self.bucket.try_acquire(1)
}
pub async fn acquire(&self) {
self.bucket.acquire(1).await
}
pub fn record_latency(&self, latency: Duration) {
let latency_us = latency.as_micros() as u64;
{
let mut samples = self.latency_samples.write();
samples.add(latency_us);
}
let p99 = {
let samples = self.latency_samples.read();
samples.percentile(0.99)
};
let current_rate = self.rate.load(Ordering::Relaxed);
if p99 > self.target_latency_us {
let new_rate =
((current_rate as f64 * self.multiplicative_decrease) as u64).max(self.min_rate);
self.rate.store(new_rate, Ordering::Relaxed);
self.bucket.update_rate(new_rate as f64);
self.stats.decreases.fetch_add(1, Ordering::Relaxed);
} else if p99 < self.target_latency_us / 2 {
let new_rate = (current_rate + self.additive_increase).min(self.max_rate);
self.rate.store(new_rate, Ordering::Relaxed);
self.bucket.update_rate(new_rate as f64);
self.stats.increases.fetch_add(1, Ordering::Relaxed);
}
}
pub fn current_rate(&self) -> u64 {
self.rate.load(Ordering::Relaxed)
}
pub fn stats(&self) -> AdaptiveStatsSnapshot {
let samples = self.latency_samples.read();
AdaptiveStatsSnapshot {
current_rate: self.current_rate(),
increases: self.stats.increases.load(Ordering::Relaxed),
decreases: self.stats.decreases.load(Ordering::Relaxed),
p50_latency_us: samples.percentile(0.5),
p99_latency_us: samples.percentile(0.99),
}
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveRateLimiterConfig {
pub initial_rate: u64,
pub min_rate: u64,
pub max_rate: u64,
pub target_latency_us: u64,
pub additive_increase: u64,
pub multiplicative_decrease: f64,
}
impl Default for AdaptiveRateLimiterConfig {
fn default() -> Self {
Self {
initial_rate: 10000,
min_rate: 100,
max_rate: 1000000,
target_latency_us: 10000, additive_increase: 100,
multiplicative_decrease: 0.5,
}
}
}
#[derive(Debug)]
struct AdaptiveStats {
increases: AtomicU64,
decreases: AtomicU64,
}
impl AdaptiveStats {
fn new() -> Self {
Self {
increases: AtomicU64::new(0),
decreases: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveStatsSnapshot {
pub current_rate: u64,
pub increases: u64,
pub decreases: u64,
pub p50_latency_us: u64,
pub p99_latency_us: u64,
}
#[derive(Debug)]
pub struct CircuitBreaker {
state: RwLock<CircuitState>,
config: CircuitBreakerConfig,
stats: CircuitBreakerStats,
window_start: parking_lot::Mutex<Instant>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open { opened_at: Instant },
HalfOpen,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub failure_window: Duration,
pub recovery_timeout: Duration,
pub success_threshold: u32,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
failure_window: Duration::from_secs(60),
recovery_timeout: Duration::from_secs(30),
success_threshold: 3,
}
}
}
impl CircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
state: RwLock::new(CircuitState::Closed),
config,
stats: CircuitBreakerStats::new(),
window_start: parking_lot::Mutex::new(Instant::now()),
}
}
pub fn allow(&self) -> bool {
{
let state = *self.state.read();
match state {
CircuitState::Closed => {
self.stats.allowed.fetch_add(1, Ordering::Relaxed);
return true;
}
CircuitState::HalfOpen => {
self.stats.allowed.fetch_add(1, Ordering::Relaxed);
return true;
}
CircuitState::Open { opened_at } => {
if opened_at.elapsed() <= self.config.recovery_timeout {
self.stats.rejected.fetch_add(1, Ordering::Relaxed);
return false;
}
}
}
}
let mut state = self.state.write();
match *state {
CircuitState::Open { opened_at }
if opened_at.elapsed() > self.config.recovery_timeout =>
{
*state = CircuitState::HalfOpen;
self.stats.allowed.fetch_add(1, Ordering::Relaxed);
true
}
CircuitState::Open { .. } => {
self.stats.rejected.fetch_add(1, Ordering::Relaxed);
false
}
_ => {
self.stats.allowed.fetch_add(1, Ordering::Relaxed);
true
}
}
}
pub fn record_success(&self) {
let mut state = self.state.write();
self.stats.successes.fetch_add(1, Ordering::Relaxed);
match *state {
CircuitState::HalfOpen => {
let successes = self
.stats
.half_open_successes
.fetch_add(1, Ordering::Relaxed)
+ 1;
if successes >= self.config.success_threshold as u64 {
*state = CircuitState::Closed;
self.stats.half_open_successes.store(0, Ordering::Relaxed);
self.stats.failures_in_window.store(0, Ordering::Relaxed);
}
}
CircuitState::Closed => {
self.stats.failures_in_window.store(0, Ordering::Relaxed);
}
_ => {}
}
}
pub fn record_failure(&self) {
let mut state = self.state.write();
self.stats.failures.fetch_add(1, Ordering::Relaxed);
match *state {
CircuitState::Closed => {
let mut ws = self.window_start.lock();
if ws.elapsed() > self.config.failure_window {
self.stats.failures_in_window.store(1, Ordering::Relaxed);
*ws = Instant::now();
} else {
let failures = self
.stats
.failures_in_window
.fetch_add(1, Ordering::Relaxed)
+ 1;
if failures >= self.config.failure_threshold as u64 {
*state = CircuitState::Open {
opened_at: Instant::now(),
};
self.stats.opens.fetch_add(1, Ordering::Relaxed);
}
}
}
CircuitState::HalfOpen => {
*state = CircuitState::Open {
opened_at: Instant::now(),
};
self.stats.half_open_successes.store(0, Ordering::Relaxed);
self.stats.opens.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
}
pub fn state(&self) -> CircuitState {
*self.state.read()
}
pub fn stats(&self) -> CircuitBreakerStatsSnapshot {
CircuitBreakerStatsSnapshot {
state: self.state(),
allowed: self.stats.allowed.load(Ordering::Relaxed),
rejected: self.stats.rejected.load(Ordering::Relaxed),
successes: self.stats.successes.load(Ordering::Relaxed),
failures: self.stats.failures.load(Ordering::Relaxed),
opens: self.stats.opens.load(Ordering::Relaxed),
}
}
}
#[derive(Debug)]
struct CircuitBreakerStats {
allowed: AtomicU64,
rejected: AtomicU64,
successes: AtomicU64,
failures: AtomicU64,
failures_in_window: AtomicU64,
half_open_successes: AtomicU64,
opens: AtomicU64,
}
impl CircuitBreakerStats {
fn new() -> Self {
Self {
allowed: AtomicU64::new(0),
rejected: AtomicU64::new(0),
successes: AtomicU64::new(0),
failures: AtomicU64::new(0),
failures_in_window: AtomicU64::new(0),
half_open_successes: AtomicU64::new(0),
opens: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerStatsSnapshot {
pub state: CircuitState,
pub allowed: u64,
pub rejected: u64,
pub successes: u64,
pub failures: u64,
pub opens: u64,
}
pub struct BackpressureChannel<T> {
tx: mpsc::Sender<T>,
rx: Mutex<mpsc::Receiver<T>>,
permits: Arc<Semaphore>,
capacity: usize,
stats: ChannelStats,
}
impl<T> BackpressureChannel<T> {
pub fn new(capacity: usize) -> Self {
let (tx, rx) = mpsc::channel(capacity);
Self {
tx,
rx: Mutex::new(rx),
permits: Arc::new(Semaphore::new(0)), capacity,
stats: ChannelStats::new(),
}
}
pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
self.stats.sent.fetch_add(1, Ordering::Relaxed);
let result = self.tx.send(value).await;
if result.is_ok() {
self.permits.add_permits(1);
}
result
}
pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
let result = self.tx.try_send(value);
match &result {
Ok(()) => {
self.stats.sent.fetch_add(1, Ordering::Relaxed);
self.permits.add_permits(1);
}
Err(mpsc::error::TrySendError::Full(_)) => {
self.stats.blocked.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
result
}
pub async fn recv(&self) -> Option<T> {
let permit = self.permits.acquire().await.ok()?;
let mut rx = self.rx.lock().await;
let result = rx.recv().await;
if result.is_some() {
permit.forget();
self.stats.received.fetch_add(1, Ordering::Relaxed);
}
result
}
pub fn len(&self) -> usize {
self.permits.available_permits()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn stats(&self) -> ChannelStatsSnapshot {
ChannelStatsSnapshot {
sent: self.stats.sent.load(Ordering::Relaxed),
received: self.stats.received.load(Ordering::Relaxed),
blocked: self.stats.blocked.load(Ordering::Relaxed),
current_len: self.len(),
capacity: self.capacity,
}
}
}
struct ChannelStats {
sent: AtomicU64,
received: AtomicU64,
blocked: AtomicU64,
}
impl ChannelStats {
fn new() -> Self {
Self {
sent: AtomicU64::new(0),
received: AtomicU64::new(0),
blocked: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ChannelStatsSnapshot {
pub sent: u64,
pub received: u64,
pub blocked: u64,
pub current_len: usize,
pub capacity: usize,
}
pub struct WindowedRateTracker {
buckets: RwLock<Vec<AtomicU64>>,
bucket_duration: Duration,
num_buckets: usize,
current_bucket: AtomicUsize,
last_rotation: RwLock<Instant>,
}
impl WindowedRateTracker {
pub fn new(window_duration: Duration, num_buckets: usize) -> Self {
let buckets: Vec<AtomicU64> = (0..num_buckets).map(|_| AtomicU64::new(0)).collect();
Self {
buckets: RwLock::new(buckets),
bucket_duration: window_duration / num_buckets as u32,
num_buckets,
current_bucket: AtomicUsize::new(0),
last_rotation: RwLock::new(Instant::now()),
}
}
pub fn record(&self, count: u64) {
self.maybe_rotate();
let buckets = self.buckets.read();
let idx = self.current_bucket.load(Ordering::Relaxed);
buckets[idx].fetch_add(count, Ordering::Relaxed);
}
pub fn rate(&self) -> f64 {
self.maybe_rotate();
let buckets = self.buckets.read();
let total: u64 = buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum();
let window_secs = self.bucket_duration.as_secs_f64() * self.num_buckets as f64;
total as f64 / window_secs
}
pub fn total(&self) -> u64 {
self.maybe_rotate();
let buckets = self.buckets.read();
buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
}
fn maybe_rotate(&self) {
let now = Instant::now();
let elapsed = {
let last = self.last_rotation.read();
now.duration_since(*last)
};
if elapsed < self.bucket_duration {
return;
}
let buckets_to_rotate =
(elapsed.as_secs_f64() / self.bucket_duration.as_secs_f64()) as usize;
if buckets_to_rotate == 0 {
return;
}
{
let mut last = self.last_rotation.write();
*last = now;
}
let buckets = self.buckets.read();
for _ in 0..buckets_to_rotate.min(self.num_buckets) {
let next = (self.current_bucket.load(Ordering::Relaxed) + 1) % self.num_buckets;
buckets[next].store(0, Ordering::Relaxed);
self.current_bucket.store(next, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_token_bucket_basic() {
let bucket = TokenBucket::new(10, 10.0);
assert!(bucket.try_acquire(5));
assert!(bucket.try_acquire(5));
assert!(!bucket.try_acquire(1));
let stats = bucket.stats();
assert_eq!(stats.acquired, 10);
assert_eq!(stats.rejected, 1);
}
#[tokio::test]
async fn test_token_bucket_refill() {
let bucket = TokenBucket::new(1000, 100.0);
bucket.try_acquire(1000);
assert!(!bucket.try_acquire(1));
tokio::time::sleep(Duration::from_millis(50)).await;
let available = bucket.available();
assert!(
available >= 4,
"Expected at least 4 tokens, got {}",
available
);
}
#[test]
fn test_credit_flow_control() {
let flow = CreditFlowControl::new(10, 100);
assert!(flow.try_consume(5));
assert!(flow.try_consume(5));
assert!(!flow.try_consume(1));
assert_eq!(flow.available(), 0);
flow.grant(5);
assert_eq!(flow.available(), 5);
assert!(flow.try_consume(5));
}
#[test]
fn test_adaptive_rate_limiter() {
let config = AdaptiveRateLimiterConfig {
initial_rate: 1000,
min_rate: 100,
max_rate: 10000,
target_latency_us: 10000,
additive_increase: 100,
multiplicative_decrease: 0.5,
};
let limiter = AdaptiveRateLimiter::new(config);
assert_eq!(limiter.current_rate(), 1000);
for _ in 0..100 {
limiter.record_latency(Duration::from_millis(20));
}
assert!(limiter.current_rate() < 1000);
}
#[test]
fn test_circuit_breaker_closed() {
let breaker = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 3,
..Default::default()
});
assert!(breaker.allow());
breaker.record_success();
assert_eq!(breaker.state(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_opens() {
let breaker = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 3,
..Default::default()
});
for _ in 0..3 {
assert!(breaker.allow());
breaker.record_failure();
}
match breaker.state() {
CircuitState::Open { .. } => {}
_ => panic!("Expected open state"),
}
assert!(!breaker.allow());
}
#[tokio::test]
async fn test_backpressure_channel() {
let channel = BackpressureChannel::new(3);
channel.send(1).await.unwrap();
channel.send(2).await.unwrap();
channel.send(3).await.unwrap();
assert_eq!(channel.len(), 3);
assert!(channel.try_send(4).is_err());
assert_eq!(channel.recv().await, Some(1));
assert_eq!(channel.len(), 2);
channel.send(4).await.unwrap();
}
#[test]
fn test_windowed_rate_tracker() {
let tracker = WindowedRateTracker::new(Duration::from_secs(1), 10);
tracker.record(100);
tracker.record(100);
assert_eq!(tracker.total(), 200);
assert!(tracker.rate() > 0.0);
}
}