1use crate::error::{CoreError, CoreResult, ErrorContext};
7use once_cell::sync::Lazy;
8use std::collections::hash_map::DefaultHasher;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11use std::sync::RwLock;
12use std::time::{Duration, Instant};
13
14#[derive(Debug, Clone)]
16pub enum RateLimitStrategy {
17 FixedWindow {
19 max_events: u32,
20 window_duration: Duration,
21 },
22 SlidingWindow {
24 max_events: u32,
25 window_duration: Duration,
26 },
27 TokenBucket {
29 capacity: u32,
30 refill_rate: f64, },
32 ExponentialBackoff {
34 initialdelay: Duration,
35 maxdelay: Duration,
36 multiplier: f64,
37 },
38 Adaptive {
40 base_max_events: u32,
41 base_window: Duration,
42 load_threshold: f64,
43 },
44}
45
46impl Default for RateLimitStrategy {
47 fn default() -> Self {
48 RateLimitStrategy::SlidingWindow {
49 max_events: 10,
50 window_duration: Duration::from_secs(60),
51 }
52 }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57pub enum EventClass {
58 Critical,
60 Error,
62 Warning,
64 Info,
66 Debug,
68 Trace,
70 Custom(String),
72}
73
74impl EventClass {
75 pub fn priority(&self) -> u8 {
77 match self {
78 EventClass::Critical => 0,
79 EventClass::Error => 1,
80 EventClass::Warning => 2,
81 EventClass::Info => 3,
82 EventClass::Debug => 4,
83 EventClass::Trace => 5,
84 EventClass::Custom(_) => 3, }
86 }
87
88 pub fn bypass_rate_limiting(&self) -> bool {
90 matches!(self, EventClass::Critical)
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct LogEvent {
97 pub message: String,
99 pub class: EventClass,
101 pub metadata: HashMap<String, String>,
103 pub timestamp: Instant,
105 pub source: Option<String>,
107 pub fingerprint: u64,
109}
110
111impl LogEvent {
112 pub fn new(message: String, class: EventClass) -> Self {
114 let fingerprint = Self::calculate_fingerprint(&message, &class);
115 Self {
116 message,
117 class,
118 metadata: HashMap::new(),
119 timestamp: Instant::now(),
120 source: None,
121 fingerprint,
122 }
123 }
124
125 pub fn with_source(mut self, source: String) -> Self {
127 self.source = Some(source);
128 self.fingerprint = Self::calculate_fingerprint(&self.message, &self.class);
130 if let Some(ref source) = self.source {
131 let mut hasher = DefaultHasher::new();
132 self.fingerprint.hash(&mut hasher);
133 source.hash(&mut hasher);
134 self.fingerprint = hasher.finish();
135 }
136 self
137 }
138
139 pub fn with_metadata(mut self, key: String, value: String) -> Self {
141 self.metadata.insert(key, value);
142 self
143 }
144
145 fn calculate_fingerprint(message: &str, class: &EventClass) -> u64 {
147 let mut hasher = DefaultHasher::new();
148 message.hash(&mut hasher);
149 class.hash(&mut hasher);
150 hasher.finish()
151 }
152
153 pub fn is_similar(&self, other: &LogEvent) -> bool {
155 self.fingerprint == other.fingerprint
156 }
157}
158
159#[derive(Debug, Clone)]
161struct RateLimiterState {
162 strategy: RateLimitStrategy,
164 event_times: Vec<Instant>,
166 tokens: f64,
168 last_refill: Instant,
170 next_allowed_time: Instant,
172 currentdelay: Duration,
174 suppressed_count: u32,
176 last_summary_time: Instant,
178}
179
180impl RateLimiterState {
181 fn new(strategy: RateLimitStrategy) -> Self {
183 let now = Instant::now();
184 let tokens = match &strategy {
185 RateLimitStrategy::TokenBucket { capacity, .. } => *capacity as f64,
186 _ => 0.0,
187 };
188
189 Self {
190 strategy,
191 event_times: Vec::new(),
192 tokens,
193 last_refill: now,
194 next_allowed_time: now,
195 currentdelay: Duration::from_secs(0),
196 suppressed_count: 0,
197 last_summary_time: now,
198 }
199 }
200
201 fn should_allow(&mut self, event: &LogEvent) -> RateLimitDecision {
203 let now = event.timestamp;
204
205 if event.class.bypass_rate_limiting() {
207 return RateLimitDecision::Allow;
208 }
209
210 match &self.strategy {
211 RateLimitStrategy::FixedWindow {
212 max_events,
213 window_duration,
214 } => self.should_allow_fixed_window(*max_events, *window_duration, now),
215 RateLimitStrategy::SlidingWindow {
216 max_events,
217 window_duration,
218 } => self.should_allow_sliding_window(*max_events, *window_duration, now),
219 RateLimitStrategy::TokenBucket {
220 capacity,
221 refill_rate,
222 } => self.should_allow_token_bucket(*capacity, *refill_rate, now),
223 RateLimitStrategy::ExponentialBackoff {
224 initialdelay,
225 maxdelay,
226 multiplier,
227 } => self.should_allow_exponential_backoff(*initialdelay, *maxdelay, *multiplier, now),
228 RateLimitStrategy::Adaptive {
229 base_max_events,
230 base_window,
231 load_threshold,
232 } => self.should_allow_adaptive(*base_max_events, *base_window, *load_threshold, now),
233 }
234 }
235
236 fn should_allow_fixed_window(
237 &mut self,
238 max_events: u32,
239 window_duration: Duration,
240 now: Instant,
241 ) -> RateLimitDecision {
242 let window_start = now.checked_sub(window_duration).unwrap_or(Instant::now());
244 self.event_times.retain(|&time| time >= window_start);
245
246 if self.event_times.len() < max_events as usize {
247 self.event_times.push(now);
248 RateLimitDecision::Allow
249 } else {
250 self.suppressed_count += 1;
251 RateLimitDecision::Suppress {
252 reason: format!(
253 "Fixed window limit exceeded ({max_events} events in {window_duration:?})"
254 ),
255 retry_after: Some(window_start + window_duration),
256 }
257 }
258 }
259
260 fn should_allow_sliding_window(
261 &mut self,
262 max_events: u32,
263 window_duration: Duration,
264 now: Instant,
265 ) -> RateLimitDecision {
266 let window_start = now.checked_sub(window_duration).unwrap_or(Instant::now());
268 self.event_times.retain(|&time| time >= window_start);
269
270 if self.event_times.len() < max_events as usize {
271 self.event_times.push(now);
272 RateLimitDecision::Allow
273 } else {
274 self.suppressed_count += 1;
275 let retry_after = self
277 .event_times
278 .first()
279 .map(|&oldest| oldest + window_duration);
280 RateLimitDecision::Suppress {
281 reason: format!(
282 "Sliding window limit exceeded ({max_events} events in {window_duration:?})"
283 ),
284 retry_after,
285 }
286 }
287 }
288
289 fn should_allow_token_bucket(
290 &mut self,
291 capacity: u32,
292 refill_rate: f64,
293 now: Instant,
294 ) -> RateLimitDecision {
295 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
297 self.tokens = (self.tokens + elapsed * refill_rate).min(capacity as f64);
298 self.last_refill = now;
299
300 if self.tokens >= 1.0 {
301 self.tokens -= 1.0;
302 RateLimitDecision::Allow
303 } else {
304 self.suppressed_count += 1;
305 let time_to_token = Duration::from_secs_f64((1.0 - self.tokens) / refill_rate);
307 RateLimitDecision::Suppress {
308 reason: format!("Token bucket empty (refill rate: {refill_rate:.2}/sec)"),
309 retry_after: Some(now + time_to_token),
310 }
311 }
312 }
313
314 fn should_allow_exponential_backoff(
315 &mut self,
316 initialdelay: Duration,
317 maxdelay: Duration,
318 multiplier: f64,
319 now: Instant,
320 ) -> RateLimitDecision {
321 if now >= self.next_allowed_time {
322 self.currentdelay = initialdelay;
324 self.next_allowed_time = now + self.currentdelay;
325 RateLimitDecision::Allow
326 } else {
327 self.suppressed_count += 1;
328 self.currentdelay = Duration::from_secs_f64(
330 (self.currentdelay.as_secs_f64() * multiplier).min(maxdelay.as_secs_f64()),
331 );
332 self.next_allowed_time = now + self.currentdelay;
333
334 RateLimitDecision::Suppress {
335 reason: format!(
336 "Exponential backoff (current delay: {:?})",
337 self.currentdelay
338 ),
339 retry_after: Some(self.next_allowed_time),
340 }
341 }
342 }
343
344 fn should_allow_adaptive(
345 &mut self,
346 base_max_events: u32,
347 base_window: Duration,
348 load_threshold: f64,
349 now: Instant,
350 ) -> RateLimitDecision {
351 let current_load = self.estimate_system_load();
354
355 let adjusted_max_events = if current_load > load_threshold {
356 (base_max_events as f64 * (2.0 - current_load / load_threshold)).max(1.0) as u32
358 } else {
359 base_max_events
360 };
361
362 let window_start = now.checked_sub(base_window).unwrap_or(Instant::now());
364 self.event_times.retain(|&time| time >= window_start);
365
366 if self.event_times.len() < adjusted_max_events as usize {
367 self.event_times.push(now);
368 RateLimitDecision::Allow
369 } else {
370 self.suppressed_count += 1;
371 RateLimitDecision::Suppress {
372 reason: format!(
373 "Adaptive limit exceeded (load: {current_load:.2}, limit: {adjusted_max_events})"
374 ),
375 retry_after: self.event_times.first().map(|&oldest| oldest + base_window),
376 }
377 }
378 }
379
380 fn estimate_system_load(&self) -> f64 {
382 let recent_events = self.event_times.len();
385 (recent_events as f64 / 100.0).min(1.0)
386 }
387
388 fn shouldlog_summary(&mut self, summaryinterval: Duration) -> Option<SuppressionSummary> {
390 let now = Instant::now();
391 if self.suppressed_count > 0
392 && now.duration_since(self.last_summary_time) >= summaryinterval
393 {
394 let summary = SuppressionSummary {
395 suppressed_count: self.suppressed_count,
396 time_period: now.duration_since(self.last_summary_time),
397 strategy: format!("{:?}", self.strategy),
398 };
399
400 self.suppressed_count = 0;
401 self.last_summary_time = now;
402 Some(summary)
403 } else {
404 None
405 }
406 }
407}
408
409#[derive(Debug, Clone)]
411pub enum RateLimitDecision {
412 Allow,
414 Suppress {
416 reason: String,
418 retry_after: Option<Instant>,
420 },
421}
422
423#[derive(Debug, Clone)]
425pub struct SuppressionSummary {
426 pub suppressed_count: u32,
428 pub time_period: Duration,
430 pub strategy: String,
432}
433
434impl std::fmt::Display for SuppressionSummary {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 write!(
437 f,
438 "Suppressed {} events over {:?} using {}",
439 self.suppressed_count, self.time_period, self.strategy
440 )
441 }
442}
443
444pub struct SmartRateLimiter {
446 limiters: RwLock<HashMap<u64, RateLimiterState>>,
448 config: RateLimiterConfig,
450 stats: RwLock<RateLimiterStats>,
452}
453
454#[derive(Debug, Clone)]
456pub struct RateLimiterConfig {
457 pub default_strategy: RateLimitStrategy,
459 pub class_strategies: HashMap<EventClass, RateLimitStrategy>,
461 pub summary_interval: Duration,
463 pub max_tracked_events: usize,
465 pub enable_adaptive: bool,
467}
468
469impl Default for RateLimiterConfig {
470 fn default() -> Self {
471 let mut class_strategies = HashMap::new();
472
473 class_strategies.insert(
475 EventClass::Critical,
476 RateLimitStrategy::SlidingWindow {
477 max_events: u32::MAX,
478 window_duration: Duration::from_secs(1),
479 },
480 );
481
482 class_strategies.insert(
484 EventClass::Error,
485 RateLimitStrategy::SlidingWindow {
486 max_events: 50,
487 window_duration: Duration::from_secs(60),
488 },
489 );
490
491 class_strategies.insert(
493 EventClass::Warning,
494 RateLimitStrategy::SlidingWindow {
495 max_events: 20,
496 window_duration: Duration::from_secs(60),
497 },
498 );
499
500 class_strategies.insert(
502 EventClass::Info,
503 RateLimitStrategy::TokenBucket {
504 capacity: 10,
505 refill_rate: 0.5, },
507 );
508
509 class_strategies.insert(
511 EventClass::Debug,
512 RateLimitStrategy::TokenBucket {
513 capacity: 5,
514 refill_rate: 0.1, },
516 );
517
518 class_strategies.insert(
520 EventClass::Trace,
521 RateLimitStrategy::ExponentialBackoff {
522 initialdelay: Duration::from_secs(1),
523 maxdelay: Duration::from_secs(300), multiplier: 2.0,
525 },
526 );
527
528 Self {
529 default_strategy: RateLimitStrategy::default(),
530 class_strategies,
531 summary_interval: Duration::from_secs(300), max_tracked_events: 10000,
533 enable_adaptive: true,
534 }
535 }
536}
537
538#[derive(Debug, Clone, Default)]
540pub struct RateLimiterStats {
541 pub total_events: u64,
543 pub allowed_events: u64,
545 pub suppressed_events: u64,
547 pub events_by_class: HashMap<EventClass, u64>,
549 pub tracked_event_types: usize,
551}
552
553impl SmartRateLimiter {
554 pub fn new(config: RateLimiterConfig) -> Self {
556 Self {
557 limiters: RwLock::new(HashMap::new()),
558 config,
559 stats: RwLock::new(RateLimiterStats::default()),
560 }
561 }
562
563 pub fn should_allow(&self, event: &LogEvent) -> CoreResult<RateLimitDecision> {
565 {
567 let mut stats = self.stats.write().map_err(|_| {
568 CoreError::ComputationError(ErrorContext::new("Failed to acquire stats write lock"))
569 })?;
570 stats.total_events += 1;
571 *stats
572 .events_by_class
573 .entry(event.class.clone())
574 .or_insert(0) += 1;
575 }
576
577 let decision = {
579 let mut limiters = self.limiters.write().map_err(|_| {
580 CoreError::ComputationError(ErrorContext::new(
581 "Failed to acquire limiters write lock",
582 ))
583 })?;
584
585 if limiters.len() >= self.config.max_tracked_events
587 && !limiters.contains_key(&event.fingerprint)
588 {
589 if let Some((&oldest_key, _)) = limiters.iter().next() {
591 limiters.remove(&oldest_key);
592 }
593 }
594
595 let limiter = limiters.entry(event.fingerprint).or_insert_with(|| {
596 let strategy = self
597 .config
598 .class_strategies
599 .get(&event.class)
600 .cloned()
601 .unwrap_or_else(|| self.config.default_strategy.clone());
602 RateLimiterState::new(strategy)
603 });
604
605 limiter.should_allow(event)
606 };
607
608 {
610 let mut stats = self.stats.write().map_err(|_| {
611 CoreError::ComputationError(ErrorContext::new("Failed to acquire stats write lock"))
612 })?;
613
614 match &decision {
615 RateLimitDecision::Allow => stats.allowed_events += 1,
616 RateLimitDecision::Suppress { .. } => stats.suppressed_events += 1,
617 }
618
619 stats.tracked_event_types = {
620 let limiters = self.limiters.read().map_err(|_| {
621 CoreError::ComputationError(ErrorContext::new(
622 "Failed to acquire limiters read lock",
623 ))
624 })?;
625 limiters.len()
626 };
627 }
628
629 Ok(decision)
630 }
631
632 pub fn get_suppression_summaries(&self) -> CoreResult<Vec<(u64, SuppressionSummary)>> {
634 let mut summaries = Vec::new();
635
636 let mut limiters = self.limiters.write().map_err(|_| {
637 CoreError::ComputationError(ErrorContext::new("Failed to acquire limiters write lock"))
638 })?;
639
640 for (&fingerprint, limiter) in limiters.iter_mut() {
641 if let Some(summary) = limiter.shouldlog_summary(self.config.summary_interval) {
642 summaries.push((fingerprint, summary));
643 }
644 }
645
646 Ok(summaries)
647 }
648
649 pub fn get_stats(&self) -> CoreResult<RateLimiterStats> {
651 let stats = self.stats.read().map_err(|_| {
652 CoreError::ComputationError(ErrorContext::new("Failed to acquire stats read lock"))
653 })?;
654 Ok(stats.clone())
655 }
656
657 pub fn clear(&self) -> CoreResult<()> {
659 let mut limiters = self.limiters.write().map_err(|_| {
660 CoreError::ComputationError(ErrorContext::new("Failed to acquire limiters write lock"))
661 })?;
662 limiters.clear();
663
664 let mut stats = self.stats.write().map_err(|_| {
665 CoreError::ComputationError(ErrorContext::new("Failed to acquire stats write lock"))
666 })?;
667 *stats = RateLimiterStats::default();
668
669 Ok(())
670 }
671
672 pub fn update_config(&mut self, config: RateLimiterConfig) {
674 self.config = config;
675 }
676
677 pub const fn get_config(&self) -> &RateLimiterConfig {
679 &self.config
680 }
681}
682
683impl Default for SmartRateLimiter {
684 fn default() -> Self {
685 Self::new(RateLimiterConfig::default())
686 }
687}
688
689static GLOBAL_RATE_LIMITER: Lazy<SmartRateLimiter> = Lazy::new(SmartRateLimiter::default);
691
692#[allow(dead_code)]
694pub fn global_rate_limiter() -> &'static SmartRateLimiter {
695 &GLOBAL_RATE_LIMITER
696}
697
698pub mod utils {
700 use super::*;
701
702 pub fn error_event(message: String) -> LogEvent {
704 LogEvent::new(message, EventClass::Error)
705 }
706
707 pub fn warning_event(message: String) -> LogEvent {
709 LogEvent::new(message, EventClass::Warning)
710 }
711
712 pub fn info_event(message: String) -> LogEvent {
714 LogEvent::new(message, EventClass::Info)
715 }
716
717 pub fn debug_event(message: String) -> LogEvent {
719 LogEvent::new(message, EventClass::Debug)
720 }
721
722 pub fn shouldlog(event: &LogEvent) -> bool {
724 match global_rate_limiter().should_allow(event) {
725 Ok(RateLimitDecision::Allow) => true,
726 Ok(RateLimitDecision::Suppress { .. }) => false,
727 Err(_) => true, }
729 }
730
731 pub fn high_frequency_strategy() -> RateLimitStrategy {
733 RateLimitStrategy::TokenBucket {
734 capacity: 5,
735 refill_rate: 0.1, }
737 }
738
739 pub fn burst_strategy() -> RateLimitStrategy {
741 RateLimitStrategy::TokenBucket {
742 capacity: 20,
743 refill_rate: 2.0, }
745 }
746
747 pub fn periodic_strategy(period: Duration) -> RateLimitStrategy {
749 RateLimitStrategy::FixedWindow {
750 max_events: 1,
751 window_duration: period,
752 }
753 }
754}
755
756#[cfg(test)]
757mod tests {
758 use super::*;
759 use std::thread;
760
761 #[test]
762 fn test_event_classification() {
763 let event = LogEvent::new("Test message".to_string(), EventClass::Error);
764 assert_eq!(event.class, EventClass::Error);
765 assert_eq!(event.class.priority(), 1);
766 assert!(!event.class.bypass_rate_limiting());
767
768 let critical_event = LogEvent::new("Critical".to_string(), EventClass::Critical);
769 assert!(critical_event.class.bypass_rate_limiting());
770 }
771
772 #[test]
773 fn test_sliding_window_rate_limiting() {
774 let mut state = RateLimiterState::new(RateLimitStrategy::SlidingWindow {
775 max_events: 3,
776 window_duration: Duration::from_millis(100),
777 });
778
779 let event = LogEvent::new("Test".to_string(), EventClass::Info);
780
781 for _ in 0..3 {
783 match state.should_allow(&event) {
784 RateLimitDecision::Allow => {}
785 RateLimitDecision::Suppress { .. } => panic!("Should not suppress yet"),
786 }
787 }
788
789 match state.should_allow(&event) {
791 RateLimitDecision::Allow => panic!("Should suppress"),
792 RateLimitDecision::Suppress { .. } => {}
793 }
794
795 thread::sleep(Duration::from_millis(110));
797 let new_event = LogEvent::new("Test".to_string(), EventClass::Info);
798 match state.should_allow(&new_event) {
799 RateLimitDecision::Allow => {}
800 RateLimitDecision::Suppress { .. } => panic!("Should allow after window"),
801 }
802 }
803
804 #[test]
805 fn test_token_bucket() {
806 let mut state = RateLimiterState::new(RateLimitStrategy::TokenBucket {
807 capacity: 2,
808 refill_rate: 10.0, });
810
811 let event = LogEvent::new("Test".to_string(), EventClass::Info);
812
813 for _ in 0..2 {
815 match state.should_allow(&event) {
816 RateLimitDecision::Allow => {}
817 RateLimitDecision::Suppress { .. } => panic!("Should not suppress yet"),
818 }
819 }
820
821 match state.should_allow(&event) {
823 RateLimitDecision::Allow => panic!("Should suppress when bucket empty"),
824 RateLimitDecision::Suppress { .. } => {}
825 }
826 }
827
828 #[test]
829 fn test_smart_rate_limiter() {
830 let limiter = SmartRateLimiter::default();
831
832 let error_event = LogEvent::new("Error message".to_string(), EventClass::Error);
833 let debug_event = LogEvent::new("Debug message".to_string(), EventClass::Debug);
834
835 let error_decision = limiter
837 .should_allow(&error_event)
838 .expect("Operation failed");
839 assert!(matches!(error_decision, RateLimitDecision::Allow));
840
841 let debug_decision = limiter
843 .should_allow(&debug_event)
844 .expect("Operation failed");
845 assert!(matches!(debug_decision, RateLimitDecision::Allow)); let stats = limiter.get_stats().expect("Operation failed");
849 assert_eq!(stats.total_events, 2);
850 assert_eq!(stats.allowed_events, 2);
851 }
852
853 #[test]
854 fn test_event_fingerprinting() {
855 let event1 = LogEvent::new("Same message".to_string(), EventClass::Info);
856 let event2 = LogEvent::new("Same message".to_string(), EventClass::Info);
857 let event3 = LogEvent::new("Different message".to_string(), EventClass::Info);
858
859 assert!(event1.is_similar(&event2));
860 assert!(!event1.is_similar(&event3));
861 }
862
863 #[test]
864 fn test_suppression_summary() {
865 let summary = SuppressionSummary {
866 suppressed_count: 10,
867 time_period: Duration::from_secs(60),
868 strategy: "TokenBucket".to_string(),
869 };
870
871 let display_str = format!("{summary}");
872 assert!(display_str.contains("10 events"));
873 assert!(display_str.contains("TokenBucket"));
874 }
875
876 #[test]
877 fn test_critical_events_bypass() {
878 let limiter = SmartRateLimiter::default();
879
880 let critical_event = LogEvent::new("Critical error".to_string(), EventClass::Critical);
882
883 for _ in 0..1000 {
885 let decision = limiter
886 .should_allow(&critical_event)
887 .expect("Operation failed");
888 assert!(matches!(decision, RateLimitDecision::Allow));
889 }
890 }
891
892 #[test]
893 fn test_utils_functions() {
894 let error_event = utils::error_event("Test error".to_string());
895 assert_eq!(error_event.class, EventClass::Error);
896
897 let warning_event = utils::warning_event("Test warning".to_string());
898 assert_eq!(warning_event.class, EventClass::Warning);
899
900 let info_event = utils::info_event("Test info".to_string());
902 assert!(utils::shouldlog(&info_event)); }
904}