1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16
17#[repr(align(64))]
22pub struct RateMeter {
23 total_events: AtomicU64,
25 current_second_events: AtomicU32,
27 current_minute_events: AtomicU32,
29 current_hour_events: AtomicU32,
31 last_second: AtomicU64,
33 last_minute: AtomicU64,
35 last_hour: AtomicU64,
37 window_ns: u64,
39 created_at: Instant,
41}
42
43#[derive(Debug, Clone)]
45pub struct RateStats {
46 pub total_events: u64,
48 pub per_second: f64,
50 pub per_minute: f64,
52 pub per_hour: f64,
54 pub average_rate: f64,
56 pub age: Duration,
58 pub window_fill: f64,
60}
61
62impl RateMeter {
63 #[inline]
65 pub fn new() -> Self {
66 Self::with_window(Duration::from_secs(1))
67 }
68
69 #[inline]
71 pub fn with_window(window: Duration) -> Self {
72 Self {
73 total_events: AtomicU64::new(0),
74 current_second_events: AtomicU32::new(0),
75 current_minute_events: AtomicU32::new(0),
76 current_hour_events: AtomicU32::new(0),
77 last_second: AtomicU64::new(0),
78 last_minute: AtomicU64::new(0),
79 last_hour: AtomicU64::new(0),
80 window_ns: window.as_nanos() as u64,
81 created_at: Instant::now(),
82 }
83 }
84
85 #[inline(always)]
92 pub fn tick(&self) {
93 self.tick_n(1);
94 }
95
96 #[inline(always)]
98 pub fn tick_n(&self, n: u32) {
99 if n == 0 {
100 return;
101 }
102
103 self.total_events.fetch_add(n as u64, Ordering::Relaxed);
105
106 let now = self.get_unix_timestamp();
108 self.update_windows(now, n);
109 }
110
111 #[inline]
113 pub fn rate(&self) -> f64 {
114 let now = self.get_unix_timestamp();
115 self.update_windows(now, 0);
116
117 let events = self.current_second_events.load(Ordering::Relaxed);
118 events as f64
119 }
120
121 #[inline]
123 pub fn rate_per_second(&self) -> f64 {
124 self.rate()
125 }
126
127 #[inline]
129 pub fn rate_per_minute(&self) -> f64 {
130 let now = self.get_unix_timestamp();
131 self.update_windows(now, 0);
132
133 let events = self.current_minute_events.load(Ordering::Relaxed);
134 events as f64
135 }
136
137 #[inline]
139 pub fn rate_per_hour(&self) -> f64 {
140 let now = self.get_unix_timestamp();
141 self.update_windows(now, 0);
142
143 let events = self.current_hour_events.load(Ordering::Relaxed);
144 events as f64
145 }
146
147 #[inline(always)]
149 pub fn total(&self) -> u64 {
150 self.total_events.load(Ordering::Relaxed)
151 }
152
153 #[inline]
155 pub fn exceeds_rate(&self, limit: f64) -> bool {
156 self.rate() > limit
157 }
158
159 #[inline]
161 pub fn can_allow(&self, n: u32, limit: f64) -> bool {
162 let current_rate = self.rate();
163 (current_rate + n as f64) <= limit
164 }
165
166 #[inline]
168 pub fn tick_if_under_limit(&self, limit: f64) -> bool {
169 if self.can_allow(1, limit) {
170 self.tick();
171 true
172 } else {
173 false
174 }
175 }
176
177 #[inline]
179 pub fn tick_burst_if_under_limit(&self, n: u32, limit: f64) -> bool {
180 if self.can_allow(n, limit) {
181 self.tick_n(n);
182 true
183 } else {
184 false
185 }
186 }
187
188 #[inline]
190 pub fn reset(&self) {
191 let now = self.get_unix_timestamp();
192
193 self.total_events.store(0, Ordering::SeqCst);
194 self.current_second_events.store(0, Ordering::SeqCst);
195 self.current_minute_events.store(0, Ordering::SeqCst);
196 self.current_hour_events.store(0, Ordering::SeqCst);
197 self.last_second.store(now, Ordering::SeqCst);
198 self.last_minute.store(now / 60, Ordering::SeqCst);
199 self.last_hour.store(now / 3600, Ordering::SeqCst);
200 }
201
202 pub fn stats(&self) -> RateStats {
204 let now = self.get_unix_timestamp();
205 self.update_windows(now, 0);
206
207 let total_events = self.total();
208 let per_second = self.current_second_events.load(Ordering::Relaxed) as f64;
209 let per_minute = self.current_minute_events.load(Ordering::Relaxed) as f64;
210 let per_hour = self.current_hour_events.load(Ordering::Relaxed) as f64;
211
212 let age = self.created_at.elapsed();
213 let average_rate = if age.as_secs_f64() > 0.0 {
214 total_events as f64 / age.as_secs_f64()
215 } else {
216 0.0
217 };
218
219 let window_fill = if self.window_ns > 0 {
221 let window_seconds = self.window_ns as f64 / 1_000_000_000.0;
222 let elapsed_in_window = age.as_secs_f64().min(window_seconds);
223 (elapsed_in_window / window_seconds * 100.0).min(100.0)
224 } else {
225 100.0
226 };
227
228 RateStats {
229 total_events,
230 per_second,
231 per_minute,
232 per_hour,
233 average_rate,
234 age,
235 window_fill,
236 }
237 }
238
239 #[inline]
241 pub fn age(&self) -> Duration {
242 self.created_at.elapsed()
243 }
244
245 #[inline]
247 pub fn is_empty(&self) -> bool {
248 self.total() == 0
249 }
250
251 #[inline(always)]
254 fn get_unix_timestamp(&self) -> u64 {
255 self.created_at.elapsed().as_secs()
256 + std::time::SystemTime::now()
257 .duration_since(std::time::UNIX_EPOCH)
258 .unwrap_or_default()
259 .as_secs()
260 }
261
262 #[inline]
263 fn update_windows(&self, now: u64, new_events: u32) {
264 let current_second = now;
266 let last_second = self.last_second.load(Ordering::Relaxed);
267
268 if current_second != last_second {
269 if self
271 .last_second
272 .compare_exchange(
273 last_second,
274 current_second,
275 Ordering::Relaxed,
276 Ordering::Relaxed,
277 )
278 .is_ok()
279 {
280 self.current_second_events
281 .store(new_events, Ordering::Relaxed);
282 } else {
283 self.current_second_events
285 .fetch_add(new_events, Ordering::Relaxed);
286 }
287 } else if new_events > 0 {
288 self.current_second_events
290 .fetch_add(new_events, Ordering::Relaxed);
291 }
292
293 let current_minute = now / 60;
295 let last_minute = self.last_minute.load(Ordering::Relaxed);
296
297 if current_minute != last_minute {
298 if self
299 .last_minute
300 .compare_exchange(
301 last_minute,
302 current_minute,
303 Ordering::Relaxed,
304 Ordering::Relaxed,
305 )
306 .is_ok()
307 {
308 self.current_minute_events
309 .store(new_events, Ordering::Relaxed);
310 } else {
311 self.current_minute_events
312 .fetch_add(new_events, Ordering::Relaxed);
313 }
314 } else if new_events > 0 {
315 self.current_minute_events
316 .fetch_add(new_events, Ordering::Relaxed);
317 }
318
319 let current_hour = now / 3600;
321 let last_hour = self.last_hour.load(Ordering::Relaxed);
322
323 if current_hour != last_hour {
324 if self
325 .last_hour
326 .compare_exchange(
327 last_hour,
328 current_hour,
329 Ordering::Relaxed,
330 Ordering::Relaxed,
331 )
332 .is_ok()
333 {
334 self.current_hour_events
335 .store(new_events, Ordering::Relaxed);
336 } else {
337 self.current_hour_events
338 .fetch_add(new_events, Ordering::Relaxed);
339 }
340 } else if new_events > 0 {
341 self.current_hour_events
342 .fetch_add(new_events, Ordering::Relaxed);
343 }
344 }
345}
346
347impl Default for RateMeter {
348 #[inline]
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354impl std::fmt::Display for RateMeter {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 write!(f, "RateMeter({:.1}/s, {} total)", self.rate(), self.total())
357 }
358}
359
360impl std::fmt::Debug for RateMeter {
361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362 let stats = self.stats();
363 f.debug_struct("RateMeter")
364 .field("total_events", &stats.total_events)
365 .field("per_second", &stats.per_second)
366 .field("per_minute", &stats.per_minute)
367 .field("average_rate", &stats.average_rate)
368 .field("age", &stats.age)
369 .finish()
370 }
371}
372
373unsafe impl Send for RateMeter {}
375unsafe impl Sync for RateMeter {}
376
377pub mod specialized {
379 use super::*;
380
381 #[repr(align(64))]
383 pub struct ApiRateLimiter {
384 meter: RateMeter,
385 limit: AtomicU32, }
387
388 impl ApiRateLimiter {
389 #[inline]
391 pub fn new(requests_per_second: u32) -> Self {
392 Self {
393 meter: RateMeter::new(),
394 limit: AtomicU32::new(requests_per_second),
395 }
396 }
397
398 #[inline]
400 pub fn try_request(&self) -> bool {
401 let limit = self.limit.load(Ordering::Relaxed) as f64;
402 self.meter.tick_if_under_limit(limit)
403 }
404
405 #[inline]
407 pub fn try_requests(&self, n: u32) -> bool {
408 let limit = self.limit.load(Ordering::Relaxed) as f64;
409 self.meter.tick_burst_if_under_limit(n, limit)
410 }
411
412 #[inline]
414 pub fn set_limit(&self, requests_per_second: u32) {
415 self.limit.store(requests_per_second, Ordering::Relaxed);
416 }
417
418 #[inline]
420 pub fn get_limit(&self) -> u32 {
421 self.limit.load(Ordering::Relaxed)
422 }
423
424 #[inline]
426 pub fn current_rate(&self) -> f64 {
427 self.meter.rate()
428 }
429
430 #[inline]
432 pub fn total_requests(&self) -> u64 {
433 self.meter.total()
434 }
435
436 #[inline]
438 pub fn is_over_limit(&self) -> bool {
439 let limit = self.limit.load(Ordering::Relaxed) as f64;
440 self.meter.rate() > limit
441 }
442
443 #[inline]
445 pub fn reset(&self) {
446 self.meter.reset();
447 }
448 }
449
450 impl Default for ApiRateLimiter {
451 fn default() -> Self {
452 Self::new(1000)
453 } }
455
456 #[repr(align(64))]
458 pub struct ThroughputMeter {
459 meter: RateMeter,
460 }
461
462 impl ThroughputMeter {
463 #[inline]
465 pub fn new() -> Self {
466 Self {
467 meter: RateMeter::new(),
468 }
469 }
470
471 #[inline(always)]
473 pub fn record_bytes(&self, bytes: u64) {
474 self.meter.tick_n(bytes as u32);
475 }
476
477 #[inline]
479 pub fn bytes_per_second(&self) -> f64 {
480 self.meter.rate()
481 }
482
483 #[inline]
485 pub fn kb_per_second(&self) -> f64 {
486 self.meter.rate() / 1024.0
487 }
488
489 #[inline]
491 pub fn mb_per_second(&self) -> f64 {
492 self.meter.rate() / (1024.0 * 1024.0)
493 }
494
495 #[inline]
497 pub fn gb_per_second(&self) -> f64 {
498 self.meter.rate() / (1024.0 * 1024.0 * 1024.0)
499 }
500
501 #[inline]
503 pub fn total_bytes(&self) -> u64 {
504 self.meter.total()
505 }
506
507 #[inline]
509 pub fn reset(&self) {
510 self.meter.reset();
511 }
512 }
513
514 impl Default for ThroughputMeter {
515 fn default() -> Self {
516 Self::new()
517 }
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use std::sync::Arc;
525 use std::thread;
526
527 #[test]
528 fn test_basic_operations() {
529 let meter = RateMeter::new();
530
531 assert!(meter.is_empty());
532 assert_eq!(meter.total(), 0);
533 assert_eq!(meter.rate(), 0.0);
534
535 meter.tick();
536 assert!(!meter.is_empty());
537 assert_eq!(meter.total(), 1);
538
539 meter.tick_n(5);
540 assert_eq!(meter.total(), 6);
541 }
542
543 #[test]
544 fn test_rate_calculations() {
545 let meter = RateMeter::new();
546
547 for _ in 0..100 {
549 meter.tick();
550 }
551
552 let rate = meter.rate();
553 assert_eq!(rate, 100.0);
554 assert_eq!(meter.rate_per_second(), 100.0);
555 }
556
557 #[test]
558 fn test_multiple_windows() {
559 let meter = RateMeter::new();
560
561 for _ in 0..60 {
563 meter.tick();
564 }
565
566 let stats = meter.stats();
567 assert_eq!(stats.total_events, 60);
568 assert_eq!(stats.per_second, 60.0);
569 assert_eq!(stats.per_minute, 60.0);
570 assert_eq!(stats.per_hour, 60.0);
571 }
572
573 #[test]
574 fn test_rate_limiting() {
575 let meter = RateMeter::new();
576
577 assert!(meter.tick_if_under_limit(10.0));
579 assert!(meter.tick_if_under_limit(10.0));
580
581 meter.tick_n(8);
583
584 assert!(!meter.tick_if_under_limit(10.0));
586 assert!(meter.exceeds_rate(9.0));
587 assert!(!meter.exceeds_rate(11.0));
588 }
589
590 #[test]
591 fn test_burst_rate_limiting() {
592 let meter = RateMeter::new();
593
594 assert!(meter.tick_burst_if_under_limit(5, 10.0));
596 assert_eq!(meter.total(), 5);
597
598 assert!(!meter.tick_burst_if_under_limit(10, 10.0));
600 assert_eq!(meter.total(), 5); assert!(meter.tick_burst_if_under_limit(3, 10.0));
604 assert_eq!(meter.total(), 8);
605 }
606
607 #[test]
608 fn test_can_allow() {
609 let meter = RateMeter::new();
610
611 meter.tick_n(5);
612
613 assert!(meter.can_allow(3, 10.0)); assert!(!meter.can_allow(6, 10.0)); assert!(meter.can_allow(5, 10.0)); }
617
618 #[test]
619 fn test_reset() {
620 let meter = RateMeter::new();
621
622 meter.tick_n(100);
623 assert_eq!(meter.total(), 100);
624 assert!(meter.rate() > 0.0);
625
626 meter.reset();
627 assert_eq!(meter.total(), 0);
628 assert_eq!(meter.rate(), 0.0);
629 assert!(meter.is_empty());
630 }
631
632 #[test]
633 fn test_statistics() {
634 let meter = RateMeter::new();
635
636 meter.tick_n(50);
637
638 let stats = meter.stats();
639 assert_eq!(stats.total_events, 50);
640 assert_eq!(stats.per_second, 50.0);
641 assert!(stats.average_rate > 0.0);
642 assert!(stats.age > Duration::from_nanos(0));
643 assert!(stats.window_fill >= 0.0);
644 }
645
646 #[test]
647 fn test_api_rate_limiter() {
648 let limiter = specialized::ApiRateLimiter::new(10);
649
650 for _ in 0..10 {
652 assert!(limiter.try_request());
653 }
654
655 assert!(!limiter.try_request());
657
658 assert_eq!(limiter.current_rate(), 10.0);
660 assert_eq!(limiter.total_requests(), 10);
661 assert_eq!(limiter.get_limit(), 10);
662
663 limiter.set_limit(20);
665 assert_eq!(limiter.get_limit(), 20);
666 assert!(!limiter.is_over_limit()); limiter.reset();
670 assert!(limiter.try_requests(5));
671 assert_eq!(limiter.total_requests(), 5);
672
673 assert!(!limiter.try_requests(20)); assert_eq!(limiter.total_requests(), 5); }
676
677 #[test]
678 fn test_throughput_meter() {
679 let meter = specialized::ThroughputMeter::new();
680
681 meter.record_bytes(1024); assert_eq!(meter.bytes_per_second(), 1024.0);
683 assert_eq!(meter.kb_per_second(), 1.0);
684 assert_eq!(meter.total_bytes(), 1024);
685
686 meter.record_bytes(1024 * 1024); assert_eq!(meter.total_bytes(), 1024 + 1024 * 1024);
688 assert!((meter.mb_per_second() - 1.001).abs() < 0.01);
689 }
690
691 #[test]
692 fn test_high_concurrency() {
693 let meter = Arc::new(RateMeter::new());
694 let num_threads = 50;
695 let ticks_per_thread = 1000;
696
697 let handles: Vec<_> = (0..num_threads)
698 .map(|_| {
699 let meter = Arc::clone(&meter);
700 thread::spawn(move || {
701 for _ in 0..ticks_per_thread {
702 meter.tick();
703 }
704 })
705 })
706 .collect();
707
708 for handle in handles {
709 handle.join().unwrap();
710 }
711
712 assert_eq!(meter.total(), num_threads * ticks_per_thread);
713
714 let stats = meter.stats();
715 assert!(stats.average_rate > 0.0);
716 assert_eq!(stats.total_events, num_threads * ticks_per_thread);
717 }
718
719 #[test]
720 fn test_concurrent_rate_limiting() {
721 let limiter = Arc::new(specialized::ApiRateLimiter::new(100));
722 let num_threads = 20;
723
724 let handles: Vec<_> = (0..num_threads)
725 .map(|_| {
726 let limiter = Arc::clone(&limiter);
727 thread::spawn(move || {
728 let mut successful = 0;
729 for _ in 0..10 {
730 if limiter.try_request() {
731 successful += 1;
732 }
733 }
734 successful
735 })
736 })
737 .collect();
738
739 let total_successful: i32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
740
741 assert!(total_successful <= 120);
744 assert!(total_successful >= 90); }
746
747 #[test]
748 fn test_display_and_debug() {
749 let meter = RateMeter::new();
750 meter.tick_n(42);
751
752 let display_str = format!("{}", meter);
753 assert!(display_str.contains("RateMeter"));
754 assert!(display_str.contains("42 total"));
755
756 let debug_str = format!("{:?}", meter);
757 assert!(debug_str.contains("RateMeter"));
758 assert!(debug_str.contains("total_events"));
759 }
760
761 #[test]
762 fn test_custom_window() {
763 let meter = RateMeter::with_window(Duration::from_secs(5));
764
765 meter.tick_n(10);
766 assert_eq!(meter.total(), 10);
767 assert_eq!(meter.rate(), 10.0);
768
769 let stats = meter.stats();
770 assert!(stats.window_fill >= 0.0);
771 }
772}
773
774#[cfg(all(test, feature = "bench-tests", not(tarpaulin)))]
775#[allow(unused_imports)]
776mod benchmarks {
777 use super::*;
778 use std::time::Instant;
779
780 #[cfg_attr(not(feature = "bench-tests"), ignore)]
781 #[test]
782 fn bench_rate_meter_tick() {
783 let meter = RateMeter::new();
784 let iterations = 10_000_000;
785
786 let start = Instant::now();
787 for _ in 0..iterations {
788 meter.tick();
789 }
790 let elapsed = start.elapsed();
791
792 println!(
793 "RateMeter tick: {:.2} ns/op",
794 elapsed.as_nanos() as f64 / iterations as f64
795 );
796
797 assert_eq!(meter.total(), iterations);
798 assert!(elapsed.as_nanos() / (iterations as u128) < 400);
800 }
801
802 #[cfg_attr(not(feature = "bench-tests"), ignore)]
803 #[test]
804 fn bench_rate_meter_tick_n() {
805 let meter = RateMeter::new();
806 let iterations = 1_000_000;
807
808 let start = Instant::now();
809 for i in 0..iterations {
810 meter.tick_n((i % 10) + 1);
811 }
812 let elapsed = start.elapsed();
813
814 println!(
815 "RateMeter tick_n: {:.2} ns/op",
816 elapsed.as_nanos() as f64 / iterations as f64
817 );
818
819 assert!(elapsed.as_nanos() / (iterations as u128) < 500);
821 }
822
823 #[cfg_attr(not(feature = "bench-tests"), ignore)]
824 #[test]
825 fn bench_rate_calculation() {
826 let meter = RateMeter::new();
827
828 meter.tick_n(1000);
830
831 let iterations = 1_000_000;
832 let start = Instant::now();
833
834 for _ in 0..iterations {
835 let _ = meter.rate();
836 }
837
838 let elapsed = start.elapsed();
839 println!(
840 "RateMeter rate: {:.2} ns/op",
841 elapsed.as_nanos() as f64 / iterations as f64
842 );
843
844 assert!(elapsed.as_nanos() / iterations < 300);
846 }
847
848 #[cfg_attr(not(feature = "bench-tests"), ignore)]
849 #[test]
850 fn bench_api_rate_limiter() {
851 let limiter = specialized::ApiRateLimiter::new(1_000_000); let iterations = 1_000_000;
853
854 let start = Instant::now();
855 for _ in 0..iterations {
856 let _ = limiter.try_request();
857 }
858 let elapsed = start.elapsed();
859
860 println!(
861 "ApiRateLimiter try_request: {:.2} ns/op",
862 elapsed.as_nanos() as f64 / iterations as f64
863 );
864
865 assert!(elapsed.as_nanos() / iterations < 1000);
867 }
868}