use crate::error::{CoreError, CoreResult, ErrorContext};
use once_cell::sync::Lazy;
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::RwLock;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub enum RateLimitStrategy {
FixedWindow {
max_events: u32,
window_duration: Duration,
},
SlidingWindow {
max_events: u32,
window_duration: Duration,
},
TokenBucket {
capacity: u32,
refill_rate: f64, },
ExponentialBackoff {
initialdelay: Duration,
maxdelay: Duration,
multiplier: f64,
},
Adaptive {
base_max_events: u32,
base_window: Duration,
load_threshold: f64,
},
}
impl Default for RateLimitStrategy {
fn default() -> Self {
RateLimitStrategy::SlidingWindow {
max_events: 10,
window_duration: Duration::from_secs(60),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum EventClass {
Critical,
Error,
Warning,
Info,
Debug,
Trace,
Custom(String),
}
impl EventClass {
pub fn priority(&self) -> u8 {
match self {
EventClass::Critical => 0,
EventClass::Error => 1,
EventClass::Warning => 2,
EventClass::Info => 3,
EventClass::Debug => 4,
EventClass::Trace => 5,
EventClass::Custom(_) => 3, }
}
pub fn bypass_rate_limiting(&self) -> bool {
matches!(self, EventClass::Critical)
}
}
#[derive(Debug, Clone)]
pub struct LogEvent {
pub message: String,
pub class: EventClass,
pub metadata: HashMap<String, String>,
pub timestamp: Instant,
pub source: Option<String>,
pub fingerprint: u64,
}
impl LogEvent {
pub fn new(message: String, class: EventClass) -> Self {
let fingerprint = Self::calculate_fingerprint(&message, &class);
Self {
message,
class,
metadata: HashMap::new(),
timestamp: Instant::now(),
source: None,
fingerprint,
}
}
pub fn with_source(mut self, source: String) -> Self {
self.source = Some(source);
self.fingerprint = Self::calculate_fingerprint(&self.message, &self.class);
if let Some(ref source) = self.source {
let mut hasher = DefaultHasher::new();
self.fingerprint.hash(&mut hasher);
source.hash(&mut hasher);
self.fingerprint = hasher.finish();
}
self
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
fn calculate_fingerprint(message: &str, class: &EventClass) -> u64 {
let mut hasher = DefaultHasher::new();
message.hash(&mut hasher);
class.hash(&mut hasher);
hasher.finish()
}
pub fn is_similar(&self, other: &LogEvent) -> bool {
self.fingerprint == other.fingerprint
}
}
#[derive(Debug, Clone)]
struct RateLimiterState {
strategy: RateLimitStrategy,
event_times: Vec<Instant>,
tokens: f64,
last_refill: Instant,
next_allowed_time: Instant,
currentdelay: Duration,
suppressed_count: u32,
last_summary_time: Instant,
}
impl RateLimiterState {
fn new(strategy: RateLimitStrategy) -> Self {
let now = Instant::now();
let tokens = match &strategy {
RateLimitStrategy::TokenBucket { capacity, .. } => *capacity as f64,
_ => 0.0,
};
Self {
strategy,
event_times: Vec::new(),
tokens,
last_refill: now,
next_allowed_time: now,
currentdelay: Duration::from_secs(0),
suppressed_count: 0,
last_summary_time: now,
}
}
fn should_allow(&mut self, event: &LogEvent) -> RateLimitDecision {
let now = event.timestamp;
if event.class.bypass_rate_limiting() {
return RateLimitDecision::Allow;
}
match &self.strategy {
RateLimitStrategy::FixedWindow {
max_events,
window_duration,
} => self.should_allow_fixed_window(*max_events, *window_duration, now),
RateLimitStrategy::SlidingWindow {
max_events,
window_duration,
} => self.should_allow_sliding_window(*max_events, *window_duration, now),
RateLimitStrategy::TokenBucket {
capacity,
refill_rate,
} => self.should_allow_token_bucket(*capacity, *refill_rate, now),
RateLimitStrategy::ExponentialBackoff {
initialdelay,
maxdelay,
multiplier,
} => self.should_allow_exponential_backoff(*initialdelay, *maxdelay, *multiplier, now),
RateLimitStrategy::Adaptive {
base_max_events,
base_window,
load_threshold,
} => self.should_allow_adaptive(*base_max_events, *base_window, *load_threshold, now),
}
}
fn should_allow_fixed_window(
&mut self,
max_events: u32,
window_duration: Duration,
now: Instant,
) -> RateLimitDecision {
let window_start = now.checked_sub(window_duration).unwrap_or(Instant::now());
self.event_times.retain(|&time| time >= window_start);
if self.event_times.len() < max_events as usize {
self.event_times.push(now);
RateLimitDecision::Allow
} else {
self.suppressed_count += 1;
RateLimitDecision::Suppress {
reason: format!(
"Fixed window limit exceeded ({max_events} events in {window_duration:?})"
),
retry_after: Some(window_start + window_duration),
}
}
}
fn should_allow_sliding_window(
&mut self,
max_events: u32,
window_duration: Duration,
now: Instant,
) -> RateLimitDecision {
let window_start = now.checked_sub(window_duration).unwrap_or(Instant::now());
self.event_times.retain(|&time| time >= window_start);
if self.event_times.len() < max_events as usize {
self.event_times.push(now);
RateLimitDecision::Allow
} else {
self.suppressed_count += 1;
let retry_after = self
.event_times
.first()
.map(|&oldest| oldest + window_duration);
RateLimitDecision::Suppress {
reason: format!(
"Sliding window limit exceeded ({max_events} events in {window_duration:?})"
),
retry_after,
}
}
}
fn should_allow_token_bucket(
&mut self,
capacity: u32,
refill_rate: f64,
now: Instant,
) -> RateLimitDecision {
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
self.tokens = (self.tokens + elapsed * refill_rate).min(capacity as f64);
self.last_refill = now;
if self.tokens >= 1.0 {
self.tokens -= 1.0;
RateLimitDecision::Allow
} else {
self.suppressed_count += 1;
let time_to_token = Duration::from_secs_f64((1.0 - self.tokens) / refill_rate);
RateLimitDecision::Suppress {
reason: format!("Token bucket empty (refill rate: {refill_rate:.2}/sec)"),
retry_after: Some(now + time_to_token),
}
}
}
fn should_allow_exponential_backoff(
&mut self,
initialdelay: Duration,
maxdelay: Duration,
multiplier: f64,
now: Instant,
) -> RateLimitDecision {
if now >= self.next_allowed_time {
self.currentdelay = initialdelay;
self.next_allowed_time = now + self.currentdelay;
RateLimitDecision::Allow
} else {
self.suppressed_count += 1;
self.currentdelay = Duration::from_secs_f64(
(self.currentdelay.as_secs_f64() * multiplier).min(maxdelay.as_secs_f64()),
);
self.next_allowed_time = now + self.currentdelay;
RateLimitDecision::Suppress {
reason: format!(
"Exponential backoff (current delay: {:?})",
self.currentdelay
),
retry_after: Some(self.next_allowed_time),
}
}
}
fn should_allow_adaptive(
&mut self,
base_max_events: u32,
base_window: Duration,
load_threshold: f64,
now: Instant,
) -> RateLimitDecision {
let current_load = self.estimate_system_load();
let adjusted_max_events = if current_load > load_threshold {
(base_max_events as f64 * (2.0 - current_load / load_threshold)).max(1.0) as u32
} else {
base_max_events
};
let window_start = now.checked_sub(base_window).unwrap_or(Instant::now());
self.event_times.retain(|&time| time >= window_start);
if self.event_times.len() < adjusted_max_events as usize {
self.event_times.push(now);
RateLimitDecision::Allow
} else {
self.suppressed_count += 1;
RateLimitDecision::Suppress {
reason: format!(
"Adaptive limit exceeded (load: {current_load:.2}, limit: {adjusted_max_events})"
),
retry_after: self.event_times.first().map(|&oldest| oldest + base_window),
}
}
}
fn estimate_system_load(&self) -> f64 {
let recent_events = self.event_times.len();
(recent_events as f64 / 100.0).min(1.0)
}
fn shouldlog_summary(&mut self, summaryinterval: Duration) -> Option<SuppressionSummary> {
let now = Instant::now();
if self.suppressed_count > 0
&& now.duration_since(self.last_summary_time) >= summaryinterval
{
let summary = SuppressionSummary {
suppressed_count: self.suppressed_count,
time_period: now.duration_since(self.last_summary_time),
strategy: format!("{:?}", self.strategy),
};
self.suppressed_count = 0;
self.last_summary_time = now;
Some(summary)
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub enum RateLimitDecision {
Allow,
Suppress {
reason: String,
retry_after: Option<Instant>,
},
}
#[derive(Debug, Clone)]
pub struct SuppressionSummary {
pub suppressed_count: u32,
pub time_period: Duration,
pub strategy: String,
}
impl std::fmt::Display for SuppressionSummary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Suppressed {} events over {:?} using {}",
self.suppressed_count, self.time_period, self.strategy
)
}
}
pub struct SmartRateLimiter {
limiters: RwLock<HashMap<u64, RateLimiterState>>,
config: RateLimiterConfig,
stats: RwLock<RateLimiterStats>,
}
#[derive(Debug, Clone)]
pub struct RateLimiterConfig {
pub default_strategy: RateLimitStrategy,
pub class_strategies: HashMap<EventClass, RateLimitStrategy>,
pub summary_interval: Duration,
pub max_tracked_events: usize,
pub enable_adaptive: bool,
}
impl Default for RateLimiterConfig {
fn default() -> Self {
let mut class_strategies = HashMap::new();
class_strategies.insert(
EventClass::Critical,
RateLimitStrategy::SlidingWindow {
max_events: u32::MAX,
window_duration: Duration::from_secs(1),
},
);
class_strategies.insert(
EventClass::Error,
RateLimitStrategy::SlidingWindow {
max_events: 50,
window_duration: Duration::from_secs(60),
},
);
class_strategies.insert(
EventClass::Warning,
RateLimitStrategy::SlidingWindow {
max_events: 20,
window_duration: Duration::from_secs(60),
},
);
class_strategies.insert(
EventClass::Info,
RateLimitStrategy::TokenBucket {
capacity: 10,
refill_rate: 0.5, },
);
class_strategies.insert(
EventClass::Debug,
RateLimitStrategy::TokenBucket {
capacity: 5,
refill_rate: 0.1, },
);
class_strategies.insert(
EventClass::Trace,
RateLimitStrategy::ExponentialBackoff {
initialdelay: Duration::from_secs(1),
maxdelay: Duration::from_secs(300), multiplier: 2.0,
},
);
Self {
default_strategy: RateLimitStrategy::default(),
class_strategies,
summary_interval: Duration::from_secs(300), max_tracked_events: 10000,
enable_adaptive: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RateLimiterStats {
pub total_events: u64,
pub allowed_events: u64,
pub suppressed_events: u64,
pub events_by_class: HashMap<EventClass, u64>,
pub tracked_event_types: usize,
}
impl SmartRateLimiter {
pub fn new(config: RateLimiterConfig) -> Self {
Self {
limiters: RwLock::new(HashMap::new()),
config,
stats: RwLock::new(RateLimiterStats::default()),
}
}
pub fn should_allow(&self, event: &LogEvent) -> CoreResult<RateLimitDecision> {
{
let mut stats = self.stats.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire stats write lock"))
})?;
stats.total_events += 1;
*stats
.events_by_class
.entry(event.class.clone())
.or_insert(0) += 1;
}
let decision = {
let mut limiters = self.limiters.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new(
"Failed to acquire limiters write lock",
))
})?;
if limiters.len() >= self.config.max_tracked_events
&& !limiters.contains_key(&event.fingerprint)
{
if let Some((&oldest_key, _)) = limiters.iter().next() {
limiters.remove(&oldest_key);
}
}
let limiter = limiters.entry(event.fingerprint).or_insert_with(|| {
let strategy = self
.config
.class_strategies
.get(&event.class)
.cloned()
.unwrap_or_else(|| self.config.default_strategy.clone());
RateLimiterState::new(strategy)
});
limiter.should_allow(event)
};
{
let mut stats = self.stats.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire stats write lock"))
})?;
match &decision {
RateLimitDecision::Allow => stats.allowed_events += 1,
RateLimitDecision::Suppress { .. } => stats.suppressed_events += 1,
}
stats.tracked_event_types = {
let limiters = self.limiters.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new(
"Failed to acquire limiters read lock",
))
})?;
limiters.len()
};
}
Ok(decision)
}
pub fn get_suppression_summaries(&self) -> CoreResult<Vec<(u64, SuppressionSummary)>> {
let mut summaries = Vec::new();
let mut limiters = self.limiters.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire limiters write lock"))
})?;
for (&fingerprint, limiter) in limiters.iter_mut() {
if let Some(summary) = limiter.shouldlog_summary(self.config.summary_interval) {
summaries.push((fingerprint, summary));
}
}
Ok(summaries)
}
pub fn get_stats(&self) -> CoreResult<RateLimiterStats> {
let stats = self.stats.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire stats read lock"))
})?;
Ok(stats.clone())
}
pub fn clear(&self) -> CoreResult<()> {
let mut limiters = self.limiters.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire limiters write lock"))
})?;
limiters.clear();
let mut stats = self.stats.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire stats write lock"))
})?;
*stats = RateLimiterStats::default();
Ok(())
}
pub fn update_config(&mut self, config: RateLimiterConfig) {
self.config = config;
}
pub const fn get_config(&self) -> &RateLimiterConfig {
&self.config
}
}
impl Default for SmartRateLimiter {
fn default() -> Self {
Self::new(RateLimiterConfig::default())
}
}
static GLOBAL_RATE_LIMITER: Lazy<SmartRateLimiter> = Lazy::new(SmartRateLimiter::default);
#[allow(dead_code)]
pub fn global_rate_limiter() -> &'static SmartRateLimiter {
&GLOBAL_RATE_LIMITER
}
pub mod utils {
use super::*;
pub fn error_event(message: String) -> LogEvent {
LogEvent::new(message, EventClass::Error)
}
pub fn warning_event(message: String) -> LogEvent {
LogEvent::new(message, EventClass::Warning)
}
pub fn info_event(message: String) -> LogEvent {
LogEvent::new(message, EventClass::Info)
}
pub fn debug_event(message: String) -> LogEvent {
LogEvent::new(message, EventClass::Debug)
}
pub fn shouldlog(event: &LogEvent) -> bool {
match global_rate_limiter().should_allow(event) {
Ok(RateLimitDecision::Allow) => true,
Ok(RateLimitDecision::Suppress { .. }) => false,
Err(_) => true, }
}
pub fn high_frequency_strategy() -> RateLimitStrategy {
RateLimitStrategy::TokenBucket {
capacity: 5,
refill_rate: 0.1, }
}
pub fn burst_strategy() -> RateLimitStrategy {
RateLimitStrategy::TokenBucket {
capacity: 20,
refill_rate: 2.0, }
}
pub fn periodic_strategy(period: Duration) -> RateLimitStrategy {
RateLimitStrategy::FixedWindow {
max_events: 1,
window_duration: period,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_event_classification() {
let event = LogEvent::new("Test message".to_string(), EventClass::Error);
assert_eq!(event.class, EventClass::Error);
assert_eq!(event.class.priority(), 1);
assert!(!event.class.bypass_rate_limiting());
let critical_event = LogEvent::new("Critical".to_string(), EventClass::Critical);
assert!(critical_event.class.bypass_rate_limiting());
}
#[test]
fn test_sliding_window_rate_limiting() {
let mut state = RateLimiterState::new(RateLimitStrategy::SlidingWindow {
max_events: 3,
window_duration: Duration::from_millis(100),
});
let event = LogEvent::new("Test".to_string(), EventClass::Info);
for _ in 0..3 {
match state.should_allow(&event) {
RateLimitDecision::Allow => {}
RateLimitDecision::Suppress { .. } => panic!("Should not suppress yet"),
}
}
match state.should_allow(&event) {
RateLimitDecision::Allow => panic!("Should suppress"),
RateLimitDecision::Suppress { .. } => {}
}
thread::sleep(Duration::from_millis(110));
let new_event = LogEvent::new("Test".to_string(), EventClass::Info);
match state.should_allow(&new_event) {
RateLimitDecision::Allow => {}
RateLimitDecision::Suppress { .. } => panic!("Should allow after window"),
}
}
#[test]
fn test_token_bucket() {
let mut state = RateLimiterState::new(RateLimitStrategy::TokenBucket {
capacity: 2,
refill_rate: 10.0, });
let event = LogEvent::new("Test".to_string(), EventClass::Info);
for _ in 0..2 {
match state.should_allow(&event) {
RateLimitDecision::Allow => {}
RateLimitDecision::Suppress { .. } => panic!("Should not suppress yet"),
}
}
match state.should_allow(&event) {
RateLimitDecision::Allow => panic!("Should suppress when bucket empty"),
RateLimitDecision::Suppress { .. } => {}
}
}
#[test]
fn test_smart_rate_limiter() {
let limiter = SmartRateLimiter::default();
let error_event = LogEvent::new("Error message".to_string(), EventClass::Error);
let debug_event = LogEvent::new("Debug message".to_string(), EventClass::Debug);
let error_decision = limiter
.should_allow(&error_event)
.expect("Operation failed");
assert!(matches!(error_decision, RateLimitDecision::Allow));
let debug_decision = limiter
.should_allow(&debug_event)
.expect("Operation failed");
assert!(matches!(debug_decision, RateLimitDecision::Allow));
let stats = limiter.get_stats().expect("Operation failed");
assert_eq!(stats.total_events, 2);
assert_eq!(stats.allowed_events, 2);
}
#[test]
fn test_event_fingerprinting() {
let event1 = LogEvent::new("Same message".to_string(), EventClass::Info);
let event2 = LogEvent::new("Same message".to_string(), EventClass::Info);
let event3 = LogEvent::new("Different message".to_string(), EventClass::Info);
assert!(event1.is_similar(&event2));
assert!(!event1.is_similar(&event3));
}
#[test]
fn test_suppression_summary() {
let summary = SuppressionSummary {
suppressed_count: 10,
time_period: Duration::from_secs(60),
strategy: "TokenBucket".to_string(),
};
let display_str = format!("{summary}");
assert!(display_str.contains("10 events"));
assert!(display_str.contains("TokenBucket"));
}
#[test]
fn test_critical_events_bypass() {
let limiter = SmartRateLimiter::default();
let critical_event = LogEvent::new("Critical error".to_string(), EventClass::Critical);
for _ in 0..1000 {
let decision = limiter
.should_allow(&critical_event)
.expect("Operation failed");
assert!(matches!(decision, RateLimitDecision::Allow));
}
}
#[test]
fn test_utils_functions() {
let error_event = utils::error_event("Test error".to_string());
assert_eq!(error_event.class, EventClass::Error);
let warning_event = utils::warning_event("Test warning".to_string());
assert_eq!(warning_event.class, EventClass::Warning);
let info_event = utils::info_event("Test info".to_string());
assert!(utils::shouldlog(&info_event)); }
}