use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use super::config::{BackpressureStrategyConfig, ResourceLimiterConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ResourceType {
Memory,
Connections,
RequestRate,
}
impl std::fmt::Display for ResourceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Memory => write!(f, "memory"),
Self::Connections => write!(f, "connections"),
Self::RequestRate => write!(f, "request_rate"),
}
}
}
#[derive(Debug, Clone)]
pub enum LimitResult {
Allowed,
AllowedWithWarning {
resource: ResourceType,
utilization: f64,
},
Delayed {
resource: ResourceType,
delay: Duration,
},
Rejected {
resource: ResourceType,
current: u64,
limit: u64,
},
}
impl LimitResult {
pub fn is_allowed(&self) -> bool {
matches!(self, Self::Allowed | Self::AllowedWithWarning { .. })
}
pub fn needs_delay(&self) -> bool {
matches!(self, Self::Delayed { .. })
}
pub fn is_rejected(&self) -> bool {
matches!(self, Self::Rejected { .. })
}
pub fn delay(&self) -> Option<Duration> {
match self {
Self::Delayed { delay, .. } => Some(*delay),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureStrategy {
None,
Gradual,
Adaptive,
Aggressive,
}
impl From<BackpressureStrategyConfig> for BackpressureStrategy {
fn from(config: BackpressureStrategyConfig) -> Self {
match config {
BackpressureStrategyConfig::None => Self::None,
BackpressureStrategyConfig::Gradual => Self::Gradual,
BackpressureStrategyConfig::Adaptive => Self::Adaptive,
BackpressureStrategyConfig::Aggressive => Self::Aggressive,
}
}
}
struct TokenBucket {
tokens: AtomicU64,
max_tokens: u64,
refill_rate: u64,
last_refill: RwLock<Instant>,
}
impl TokenBucket {
fn new(max_tokens: u64, refill_rate: u64) -> Self {
Self {
tokens: AtomicU64::new(max_tokens),
max_tokens,
refill_rate,
last_refill: RwLock::new(Instant::now()),
}
}
fn try_consume(&self, count: u64) -> bool {
self.refill();
loop {
let current = self.tokens.load(Ordering::Acquire);
if current < count {
return false;
}
if self
.tokens
.compare_exchange_weak(
current,
current - count,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
return true;
}
}
}
fn refill(&self) {
let mut last = self.last_refill.write();
let elapsed = last.elapsed();
let tokens_to_add = (elapsed.as_secs_f64() * self.refill_rate as f64) as u64;
if tokens_to_add > 0 {
*last = Instant::now();
loop {
let current = self.tokens.load(Ordering::Acquire);
let new_tokens = (current + tokens_to_add).min(self.max_tokens);
if self
.tokens
.compare_exchange_weak(current, new_tokens, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
}
fn current(&self) -> u64 {
self.refill();
self.tokens.load(Ordering::Relaxed)
}
fn utilization(&self) -> f64 {
let current = self.current() as f64;
1.0 - (current / self.max_tokens as f64)
}
fn delay_for_token(&self) -> Duration {
let current = self.current();
if current > 0 {
Duration::ZERO
} else {
Duration::from_secs_f64(1.0 / self.refill_rate as f64)
}
}
}
#[derive(Debug, Clone)]
pub struct ResourceSnapshot {
pub memory_bytes: u64,
pub memory_limit: u64,
pub connections: usize,
pub connection_limit: usize,
pub request_rate: f64,
pub request_rate_limit: u64,
pub timestamp: Instant,
}
impl ResourceSnapshot {
pub fn memory_utilization(&self) -> f64 {
self.memory_bytes as f64 / self.memory_limit as f64
}
pub fn connection_utilization(&self) -> f64 {
self.connections as f64 / self.connection_limit as f64
}
pub fn overall_utilization(&self) -> f64 {
self.memory_utilization().max(self.connection_utilization())
}
}
#[derive(Debug, Clone, Default)]
pub struct LimiterStatistics {
pub total_checks: u64,
pub allowed: u64,
pub warned: u64,
pub delayed: u64,
pub rejected: u64,
pub total_delay_us: u64,
pub rejections_by_type: [u64; 3], }
pub struct ResourceLimiter {
config: ResourceLimiterConfig,
strategy: BackpressureStrategy,
memory_used: AtomicU64,
connections: AtomicUsize,
rate_bucket: TokenBucket,
stats: RwLock<LimiterStatistics>,
created_at: Instant,
}
impl ResourceLimiter {
pub fn new(config: ResourceLimiterConfig) -> Self {
let strategy = config.strategy.into();
let bucket_size = config.max_requests_per_second;
Self {
strategy,
memory_used: AtomicU64::new(0),
connections: AtomicUsize::new(0),
rate_bucket: TokenBucket::new(bucket_size, config.max_requests_per_second),
config,
stats: RwLock::new(LimiterStatistics::default()),
created_at: Instant::now(),
}
}
pub fn default_config() -> Self {
Self::new(ResourceLimiterConfig::default())
}
pub fn check(&self) -> LimitResult {
let mut stats = self.stats.write();
stats.total_checks += 1;
let memory_result = self.check_memory();
if memory_result.is_rejected() {
stats.rejected += 1;
stats.rejections_by_type[0] += 1;
return memory_result;
}
let conn_result = self.check_connections();
if conn_result.is_rejected() {
stats.rejected += 1;
stats.rejections_by_type[1] += 1;
return conn_result;
}
let rate_result = self.check_rate();
if rate_result.is_rejected() {
stats.rejected += 1;
stats.rejections_by_type[2] += 1;
return rate_result;
}
let combined = self.combine_results(memory_result, conn_result, rate_result);
match &combined {
LimitResult::Allowed => stats.allowed += 1,
LimitResult::AllowedWithWarning { .. } => stats.warned += 1,
LimitResult::Delayed { delay, .. } => {
stats.delayed += 1;
stats.total_delay_us += delay.as_micros() as u64;
}
LimitResult::Rejected { .. } => stats.rejected += 1,
}
combined
}
pub fn try_acquire(&self) -> LimitResult {
let result = self.check();
if result.is_allowed() || result.needs_delay() {
self.rate_bucket.try_consume(1);
}
result
}
fn check_memory(&self) -> LimitResult {
let used = self.memory_used.load(Ordering::Relaxed);
let limit = self.config.max_memory_bytes;
let utilization = used as f64 / limit as f64;
if utilization >= 1.0 {
return LimitResult::Rejected {
resource: ResourceType::Memory,
current: used,
limit,
};
}
self.apply_backpressure(ResourceType::Memory, utilization)
}
fn check_connections(&self) -> LimitResult {
let current = self.connections.load(Ordering::Relaxed);
let limit = self.config.max_connections;
let utilization = current as f64 / limit as f64;
if current >= limit {
return LimitResult::Rejected {
resource: ResourceType::Connections,
current: current as u64,
limit: limit as u64,
};
}
self.apply_backpressure(ResourceType::Connections, utilization)
}
fn check_rate(&self) -> LimitResult {
let utilization = self.rate_bucket.utilization();
if utilization >= 1.0 {
let delay = self.rate_bucket.delay_for_token();
if self.strategy == BackpressureStrategy::None {
return LimitResult::Rejected {
resource: ResourceType::RequestRate,
current: 0,
limit: self.config.max_requests_per_second,
};
} else {
return LimitResult::Delayed {
resource: ResourceType::RequestRate,
delay,
};
}
}
self.apply_backpressure(ResourceType::RequestRate, utilization)
}
fn apply_backpressure(&self, resource: ResourceType, utilization: f64) -> LimitResult {
let threshold = self.config.backpressure_threshold;
if utilization < threshold {
return LimitResult::Allowed;
}
match self.strategy {
BackpressureStrategy::None => {
if utilization >= 1.0 {
LimitResult::Rejected {
resource,
current: 0,
limit: 0,
}
} else {
LimitResult::Allowed
}
}
BackpressureStrategy::Gradual => {
let excess = (utilization - threshold) / (1.0 - threshold);
if excess > 0.0 {
LimitResult::AllowedWithWarning {
resource,
utilization,
}
} else {
LimitResult::Allowed
}
}
BackpressureStrategy::Adaptive => {
if utilization > 0.9 {
let delay_ms = ((utilization - 0.9) * 1000.0) as u64;
LimitResult::Delayed {
resource,
delay: Duration::from_millis(delay_ms.min(100)),
}
} else if utilization > threshold {
LimitResult::AllowedWithWarning {
resource,
utilization,
}
} else {
LimitResult::Allowed
}
}
BackpressureStrategy::Aggressive => {
if utilization > 0.8 {
let excess = (utilization - 0.8) / 0.2;
let delay_ms = (excess * 200.0) as u64; LimitResult::Delayed {
resource,
delay: Duration::from_millis(delay_ms.min(200)),
}
} else if utilization > 0.6 {
LimitResult::AllowedWithWarning {
resource,
utilization,
}
} else {
LimitResult::Allowed
}
}
}
}
fn combine_results(
&self,
memory: LimitResult,
connections: LimitResult,
rate: LimitResult,
) -> LimitResult {
for result in [&memory, &connections, &rate] {
if result.is_rejected() {
return result.clone();
}
}
let mut max_delay = Duration::ZERO;
let mut delayed_resource = None;
for result in [&memory, &connections, &rate] {
if let Some(delay) = result.delay() {
if delay > max_delay {
max_delay = delay;
delayed_resource = match result {
LimitResult::Delayed { resource, .. } => Some(*resource),
_ => None,
};
}
}
}
if let Some(resource) = delayed_resource {
return LimitResult::Delayed {
resource,
delay: max_delay,
};
}
for result in [&memory, &connections, &rate] {
if let LimitResult::AllowedWithWarning {
resource,
utilization,
} = result
{
return LimitResult::AllowedWithWarning {
resource: *resource,
utilization: *utilization,
};
}
}
LimitResult::Allowed
}
pub fn set_memory_used(&self, bytes: u64) {
self.memory_used.store(bytes, Ordering::Relaxed);
}
pub fn add_memory(&self, bytes: u64) {
self.memory_used.fetch_add(bytes, Ordering::Relaxed);
}
pub fn release_memory(&self, bytes: u64) {
self.memory_used.fetch_sub(bytes, Ordering::Relaxed);
}
pub fn set_connections(&self, count: usize) {
self.connections.store(count, Ordering::Relaxed);
}
pub fn add_connection(&self) -> bool {
let current = self.connections.fetch_add(1, Ordering::Relaxed);
if current >= self.config.max_connections {
self.connections.fetch_sub(1, Ordering::Relaxed);
false
} else {
true
}
}
pub fn remove_connection(&self) {
self.connections.fetch_sub(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> ResourceSnapshot {
self.rate_bucket.refill();
ResourceSnapshot {
memory_bytes: self.memory_used.load(Ordering::Relaxed),
memory_limit: self.config.max_memory_bytes,
connections: self.connections.load(Ordering::Relaxed),
connection_limit: self.config.max_connections,
request_rate: self.rate_bucket.current() as f64,
request_rate_limit: self.config.max_requests_per_second,
timestamp: Instant::now(),
}
}
pub fn statistics(&self) -> LimiterStatistics {
self.stats.read().clone()
}
pub fn reset_statistics(&self) {
*self.stats.write() = LimiterStatistics::default();
}
pub fn overall_utilization(&self) -> f64 {
self.snapshot().overall_utilization()
}
pub fn config(&self) -> &ResourceLimiterConfig {
&self.config
}
pub fn uptime(&self) -> Duration {
self.created_at.elapsed()
}
pub fn is_under_pressure(&self) -> bool {
self.overall_utilization() >= self.config.backpressure_threshold
}
pub fn backpressure_level(&self) -> f64 {
let utilization = self.overall_utilization();
let threshold = self.config.backpressure_threshold;
if utilization < threshold {
0.0
} else {
(utilization - threshold) / (1.0 - threshold)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_config() -> ResourceLimiterConfig {
ResourceLimiterConfig {
max_memory_bytes: 1024 * 1024, max_connections: 100,
max_requests_per_second: 1000,
backpressure_threshold: 0.8,
strategy: BackpressureStrategyConfig::Adaptive,
}
}
#[test]
fn test_allowed_under_limit() {
let limiter = ResourceLimiter::new(make_config());
let result = limiter.check();
assert!(result.is_allowed());
assert!(!result.is_rejected());
}
#[test]
fn test_memory_limit() {
let limiter = ResourceLimiter::new(make_config());
limiter.set_memory_used(1024 * 1024 + 1);
let result = limiter.check();
assert!(result.is_rejected());
assert!(matches!(
result,
LimitResult::Rejected {
resource: ResourceType::Memory,
..
}
));
}
#[test]
fn test_connection_limit() {
let limiter = ResourceLimiter::new(make_config());
limiter.set_connections(100);
let result = limiter.check();
assert!(result.is_rejected());
assert!(matches!(
result,
LimitResult::Rejected {
resource: ResourceType::Connections,
..
}
));
}
#[test]
fn test_add_remove_connection() {
let limiter = ResourceLimiter::new(make_config());
for i in 0..100 {
assert!(limiter.add_connection(), "Failed at connection {}", i);
}
assert!(!limiter.add_connection());
limiter.remove_connection();
assert!(limiter.add_connection());
}
#[test]
fn test_memory_tracking() {
let limiter = ResourceLimiter::new(make_config());
limiter.add_memory(100);
assert_eq!(limiter.snapshot().memory_bytes, 100);
limiter.add_memory(200);
assert_eq!(limiter.snapshot().memory_bytes, 300);
limiter.release_memory(100);
assert_eq!(limiter.snapshot().memory_bytes, 200);
}
#[test]
fn test_backpressure_threshold() {
let limiter = ResourceLimiter::new(make_config());
limiter.set_memory_used(512 * 1024); assert!(!limiter.is_under_pressure());
assert_eq!(limiter.backpressure_level(), 0.0);
limiter.set_memory_used(900 * 1024); assert!(limiter.is_under_pressure());
assert!(limiter.backpressure_level() > 0.0);
}
#[test]
fn test_warning_near_limit() {
let limiter = ResourceLimiter::new(make_config());
limiter.set_memory_used(850 * 1024);
let result = limiter.check();
assert!(!result.is_rejected());
}
#[test]
fn test_statistics() {
let limiter = ResourceLimiter::new(make_config());
limiter.check();
limiter.check();
limiter.set_memory_used(1024 * 1024 + 1);
limiter.check();
let stats = limiter.statistics();
assert_eq!(stats.total_checks, 3);
assert!(stats.allowed >= 2);
assert!(stats.rejected >= 1);
}
#[test]
fn test_no_backpressure_strategy() {
let mut config = make_config();
config.strategy = BackpressureStrategyConfig::None;
let limiter = ResourceLimiter::new(config);
limiter.set_memory_used(950 * 1024);
let result = limiter.check();
assert!(!result.is_rejected());
}
#[test]
fn test_snapshot() {
let limiter = ResourceLimiter::new(make_config());
limiter.set_memory_used(512 * 1024);
limiter.set_connections(50);
let snapshot = limiter.snapshot();
assert_eq!(snapshot.memory_bytes, 512 * 1024);
assert_eq!(snapshot.memory_limit, 1024 * 1024);
assert_eq!(snapshot.connections, 50);
assert_eq!(snapshot.connection_limit, 100);
assert_eq!(snapshot.memory_utilization(), 0.5);
assert_eq!(snapshot.connection_utilization(), 0.5);
}
#[test]
fn test_limit_result_methods() {
let allowed = LimitResult::Allowed;
assert!(allowed.is_allowed());
assert!(!allowed.is_rejected());
assert!(!allowed.needs_delay());
assert!(allowed.delay().is_none());
let rejected = LimitResult::Rejected {
resource: ResourceType::Memory,
current: 100,
limit: 100,
};
assert!(!rejected.is_allowed());
assert!(rejected.is_rejected());
let delayed = LimitResult::Delayed {
resource: ResourceType::RequestRate,
delay: Duration::from_millis(50),
};
assert!(!delayed.is_allowed());
assert!(delayed.needs_delay());
assert_eq!(delayed.delay(), Some(Duration::from_millis(50)));
}
}