1use std::sync::atomic::{AtomicU64, AtomicU32, Ordering};
15use std::time::{Duration, Instant};
16
17#[repr(align(64))]
22pub struct RateMeter {
23 total_events: AtomicU64,
25 current_second_events: AtomicU32,
27 current_minute_events: AtomicU32,
29 current_hour_events: AtomicU32,
31 last_second: AtomicU64,
33 last_minute: AtomicU64,
35 last_hour: AtomicU64,
37 window_ns: u64,
39 created_at: Instant,
41}
42
43#[derive(Debug, Clone)]
45pub struct RateStats {
46 pub total_events: u64,
48 pub per_second: f64,
50 pub per_minute: f64,
52 pub per_hour: f64,
54 pub average_rate: f64,
56 pub age: Duration,
58 pub window_fill: f64,
60}
61
62impl RateMeter {
63 #[inline]
65 pub fn new() -> Self {
66 Self::with_window(Duration::from_secs(1))
67 }
68
69 #[inline]
71 pub fn with_window(window: Duration) -> Self {
72 Self {
73 total_events: AtomicU64::new(0),
74 current_second_events: AtomicU32::new(0),
75 current_minute_events: AtomicU32::new(0),
76 current_hour_events: AtomicU32::new(0),
77 last_second: AtomicU64::new(0),
78 last_minute: AtomicU64::new(0),
79 last_hour: AtomicU64::new(0),
80 window_ns: window.as_nanos() as u64,
81 created_at: Instant::now(),
82 }
83 }
84
85 #[inline(always)]
92 pub fn tick(&self) {
93 self.tick_n(1);
94 }
95
96 #[inline(always)]
98 pub fn tick_n(&self, n: u32) {
99 if n == 0 { return; }
100
101 self.total_events.fetch_add(n as u64, Ordering::Relaxed);
103
104 let now = self.get_unix_timestamp();
106 self.update_windows(now, n);
107 }
108
109 #[inline]
111 pub fn rate(&self) -> f64 {
112 let now = self.get_unix_timestamp();
113 self.update_windows(now, 0);
114
115 let events = self.current_second_events.load(Ordering::Relaxed);
116 events as f64
117 }
118
119 #[inline]
121 pub fn rate_per_second(&self) -> f64 {
122 self.rate()
123 }
124
125 #[inline]
127 pub fn rate_per_minute(&self) -> f64 {
128 let now = self.get_unix_timestamp();
129 self.update_windows(now, 0);
130
131 let events = self.current_minute_events.load(Ordering::Relaxed);
132 events as f64
133 }
134
135 #[inline]
137 pub fn rate_per_hour(&self) -> f64 {
138 let now = self.get_unix_timestamp();
139 self.update_windows(now, 0);
140
141 let events = self.current_hour_events.load(Ordering::Relaxed);
142 events as f64
143 }
144
145 #[inline(always)]
147 pub fn total(&self) -> u64 {
148 self.total_events.load(Ordering::Relaxed)
149 }
150
151 #[inline]
153 pub fn exceeds_rate(&self, limit: f64) -> bool {
154 self.rate() > limit
155 }
156
157 #[inline]
159 pub fn can_allow(&self, n: u32, limit: f64) -> bool {
160 let current_rate = self.rate();
161 (current_rate + n as f64) <= limit
162 }
163
164 #[inline]
166 pub fn tick_if_under_limit(&self, limit: f64) -> bool {
167 if self.can_allow(1, limit) {
168 self.tick();
169 true
170 } else {
171 false
172 }
173 }
174
175 #[inline]
177 pub fn tick_burst_if_under_limit(&self, n: u32, limit: f64) -> bool {
178 if self.can_allow(n, limit) {
179 self.tick_n(n);
180 true
181 } else {
182 false
183 }
184 }
185
186 #[inline]
188 pub fn reset(&self) {
189 let now = self.get_unix_timestamp();
190
191 self.total_events.store(0, Ordering::SeqCst);
192 self.current_second_events.store(0, Ordering::SeqCst);
193 self.current_minute_events.store(0, Ordering::SeqCst);
194 self.current_hour_events.store(0, Ordering::SeqCst);
195 self.last_second.store(now, Ordering::SeqCst);
196 self.last_minute.store(now / 60, Ordering::SeqCst);
197 self.last_hour.store(now / 3600, Ordering::SeqCst);
198 }
199
200 pub fn stats(&self) -> RateStats {
202 let now = self.get_unix_timestamp();
203 self.update_windows(now, 0);
204
205 let total_events = self.total();
206 let per_second = self.current_second_events.load(Ordering::Relaxed) as f64;
207 let per_minute = self.current_minute_events.load(Ordering::Relaxed) as f64;
208 let per_hour = self.current_hour_events.load(Ordering::Relaxed) as f64;
209
210 let age = self.created_at.elapsed();
211 let average_rate = if age.as_secs_f64() > 0.0 {
212 total_events as f64 / age.as_secs_f64()
213 } else {
214 0.0
215 };
216
217 let window_fill = if self.window_ns > 0 {
219 let window_seconds = self.window_ns as f64 / 1_000_000_000.0;
220 let elapsed_in_window = age.as_secs_f64().min(window_seconds);
221 (elapsed_in_window / window_seconds * 100.0).min(100.0)
222 } else {
223 100.0
224 };
225
226 RateStats {
227 total_events,
228 per_second,
229 per_minute,
230 per_hour,
231 average_rate,
232 age,
233 window_fill,
234 }
235 }
236
237 #[inline]
239 pub fn age(&self) -> Duration {
240 self.created_at.elapsed()
241 }
242
243 #[inline]
245 pub fn is_empty(&self) -> bool {
246 self.total() == 0
247 }
248
249 #[inline(always)]
252 fn get_unix_timestamp(&self) -> u64 {
253 self.created_at.elapsed().as_secs() +
254 std::time::SystemTime::now()
255 .duration_since(std::time::UNIX_EPOCH)
256 .unwrap_or_default()
257 .as_secs()
258 }
259
260 #[inline]
261 fn update_windows(&self, now: u64, new_events: u32) {
262 let current_second = now;
264 let last_second = self.last_second.load(Ordering::Relaxed);
265
266 if current_second != last_second {
267 if self.last_second.compare_exchange(
269 last_second,
270 current_second,
271 Ordering::Relaxed,
272 Ordering::Relaxed
273 ).is_ok() {
274 self.current_second_events.store(new_events, Ordering::Relaxed);
275 } else {
276 self.current_second_events.fetch_add(new_events, Ordering::Relaxed);
278 }
279 } else if new_events > 0 {
280 self.current_second_events.fetch_add(new_events, Ordering::Relaxed);
282 }
283
284 let current_minute = now / 60;
286 let last_minute = self.last_minute.load(Ordering::Relaxed);
287
288 if current_minute != last_minute {
289 if self.last_minute.compare_exchange(
290 last_minute,
291 current_minute,
292 Ordering::Relaxed,
293 Ordering::Relaxed
294 ).is_ok() {
295 self.current_minute_events.store(new_events, Ordering::Relaxed);
296 } else {
297 self.current_minute_events.fetch_add(new_events, Ordering::Relaxed);
298 }
299 } else if new_events > 0 {
300 self.current_minute_events.fetch_add(new_events, Ordering::Relaxed);
301 }
302
303 let current_hour = now / 3600;
305 let last_hour = self.last_hour.load(Ordering::Relaxed);
306
307 if current_hour != last_hour {
308 if self.last_hour.compare_exchange(
309 last_hour,
310 current_hour,
311 Ordering::Relaxed,
312 Ordering::Relaxed
313 ).is_ok() {
314 self.current_hour_events.store(new_events, Ordering::Relaxed);
315 } else {
316 self.current_hour_events.fetch_add(new_events, Ordering::Relaxed);
317 }
318 } else if new_events > 0 {
319 self.current_hour_events.fetch_add(new_events, Ordering::Relaxed);
320 }
321 }
322}
323
324impl Default for RateMeter {
325 #[inline]
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331impl std::fmt::Display for RateMeter {
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 write!(f, "RateMeter({:.1}/s, {} total)", self.rate(), self.total())
334 }
335}
336
337impl std::fmt::Debug for RateMeter {
338 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339 let stats = self.stats();
340 f.debug_struct("RateMeter")
341 .field("total_events", &stats.total_events)
342 .field("per_second", &stats.per_second)
343 .field("per_minute", &stats.per_minute)
344 .field("average_rate", &stats.average_rate)
345 .field("age", &stats.age)
346 .finish()
347 }
348}
349
350unsafe impl Send for RateMeter {}
352unsafe impl Sync for RateMeter {}
353
354pub mod specialized {
356 use super::*;
357
358 #[repr(align(64))]
360 pub struct ApiRateLimiter {
361 meter: RateMeter,
362 limit: AtomicU32, }
364
365 impl ApiRateLimiter {
366 #[inline]
368 pub fn new(requests_per_second: u32) -> Self {
369 Self {
370 meter: RateMeter::new(),
371 limit: AtomicU32::new(requests_per_second),
372 }
373 }
374
375 #[inline]
377 pub fn try_request(&self) -> bool {
378 let limit = self.limit.load(Ordering::Relaxed) as f64;
379 self.meter.tick_if_under_limit(limit)
380 }
381
382 #[inline]
384 pub fn try_requests(&self, n: u32) -> bool {
385 let limit = self.limit.load(Ordering::Relaxed) as f64;
386 self.meter.tick_burst_if_under_limit(n, limit)
387 }
388
389 #[inline]
391 pub fn set_limit(&self, requests_per_second: u32) {
392 self.limit.store(requests_per_second, Ordering::Relaxed);
393 }
394
395 #[inline]
397 pub fn get_limit(&self) -> u32 {
398 self.limit.load(Ordering::Relaxed)
399 }
400
401 #[inline]
403 pub fn current_rate(&self) -> f64 {
404 self.meter.rate()
405 }
406
407 #[inline]
409 pub fn total_requests(&self) -> u64 {
410 self.meter.total()
411 }
412
413 #[inline]
415 pub fn is_over_limit(&self) -> bool {
416 let limit = self.limit.load(Ordering::Relaxed) as f64;
417 self.meter.rate() > limit
418 }
419
420 #[inline]
422 pub fn reset(&self) {
423 self.meter.reset();
424 }
425 }
426
427 impl Default for ApiRateLimiter {
428 fn default() -> Self { Self::new(1000) } }
430
431 #[repr(align(64))]
433 pub struct ThroughputMeter {
434 meter: RateMeter,
435 }
436
437 impl ThroughputMeter {
438 #[inline]
440 pub fn new() -> Self {
441 Self {
442 meter: RateMeter::new(),
443 }
444 }
445
446 #[inline(always)]
448 pub fn record_bytes(&self, bytes: u64) {
449 self.meter.tick_n(bytes as u32);
450 }
451
452 #[inline]
454 pub fn bytes_per_second(&self) -> f64 {
455 self.meter.rate()
456 }
457
458 #[inline]
460 pub fn kb_per_second(&self) -> f64 {
461 self.meter.rate() / 1024.0
462 }
463
464 #[inline]
466 pub fn mb_per_second(&self) -> f64 {
467 self.meter.rate() / (1024.0 * 1024.0)
468 }
469
470 #[inline]
472 pub fn gb_per_second(&self) -> f64 {
473 self.meter.rate() / (1024.0 * 1024.0 * 1024.0)
474 }
475
476 #[inline]
478 pub fn total_bytes(&self) -> u64 {
479 self.meter.total()
480 }
481
482 #[inline]
484 pub fn reset(&self) {
485 self.meter.reset();
486 }
487 }
488
489 impl Default for ThroughputMeter {
490 fn default() -> Self { Self::new() }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use std::sync::Arc;
498 use std::thread;
499
500 #[test]
501 fn test_basic_operations() {
502 let meter = RateMeter::new();
503
504 assert!(meter.is_empty());
505 assert_eq!(meter.total(), 0);
506 assert_eq!(meter.rate(), 0.0);
507
508 meter.tick();
509 assert!(!meter.is_empty());
510 assert_eq!(meter.total(), 1);
511
512 meter.tick_n(5);
513 assert_eq!(meter.total(), 6);
514 }
515
516 #[test]
517 fn test_rate_calculations() {
518 let meter = RateMeter::new();
519
520 for _ in 0..100 {
522 meter.tick();
523 }
524
525 let rate = meter.rate();
526 assert_eq!(rate, 100.0);
527 assert_eq!(meter.rate_per_second(), 100.0);
528 }
529
530 #[test]
531 fn test_multiple_windows() {
532 let meter = RateMeter::new();
533
534 for _ in 0..60 {
536 meter.tick();
537 }
538
539 let stats = meter.stats();
540 assert_eq!(stats.total_events, 60);
541 assert_eq!(stats.per_second, 60.0);
542 assert_eq!(stats.per_minute, 60.0);
543 assert_eq!(stats.per_hour, 60.0);
544 }
545
546 #[test]
547 fn test_rate_limiting() {
548 let meter = RateMeter::new();
549
550 assert!(meter.tick_if_under_limit(10.0));
552 assert!(meter.tick_if_under_limit(10.0));
553
554 meter.tick_n(8);
556
557 assert!(!meter.tick_if_under_limit(10.0));
559 assert!(meter.exceeds_rate(9.0));
560 assert!(!meter.exceeds_rate(11.0));
561 }
562
563 #[test]
564 fn test_burst_rate_limiting() {
565 let meter = RateMeter::new();
566
567 assert!(meter.tick_burst_if_under_limit(5, 10.0));
569 assert_eq!(meter.total(), 5);
570
571 assert!(!meter.tick_burst_if_under_limit(10, 10.0));
573 assert_eq!(meter.total(), 5); assert!(meter.tick_burst_if_under_limit(3, 10.0));
577 assert_eq!(meter.total(), 8);
578 }
579
580 #[test]
581 fn test_can_allow() {
582 let meter = RateMeter::new();
583
584 meter.tick_n(5);
585
586 assert!(meter.can_allow(3, 10.0)); assert!(!meter.can_allow(6, 10.0)); assert!(meter.can_allow(5, 10.0)); }
590
591 #[test]
592 fn test_reset() {
593 let meter = RateMeter::new();
594
595 meter.tick_n(100);
596 assert_eq!(meter.total(), 100);
597 assert!(meter.rate() > 0.0);
598
599 meter.reset();
600 assert_eq!(meter.total(), 0);
601 assert_eq!(meter.rate(), 0.0);
602 assert!(meter.is_empty());
603 }
604
605 #[test]
606 fn test_statistics() {
607 let meter = RateMeter::new();
608
609 meter.tick_n(50);
610
611 let stats = meter.stats();
612 assert_eq!(stats.total_events, 50);
613 assert_eq!(stats.per_second, 50.0);
614 assert!(stats.average_rate > 0.0);
615 assert!(stats.age > Duration::from_nanos(0));
616 assert!(stats.window_fill >= 0.0);
617 }
618
619 #[test]
620 fn test_api_rate_limiter() {
621 let limiter = specialized::ApiRateLimiter::new(10);
622
623 for _ in 0..10 {
625 assert!(limiter.try_request());
626 }
627
628 assert!(!limiter.try_request());
630
631 assert_eq!(limiter.current_rate(), 10.0);
633 assert_eq!(limiter.total_requests(), 10);
634 assert_eq!(limiter.get_limit(), 10);
635
636 limiter.set_limit(20);
638 assert_eq!(limiter.get_limit(), 20);
639 assert!(!limiter.is_over_limit()); limiter.reset();
643 assert!(limiter.try_requests(5));
644 assert_eq!(limiter.total_requests(), 5);
645
646 assert!(!limiter.try_requests(20)); assert_eq!(limiter.total_requests(), 5); }
649
650 #[test]
651 fn test_throughput_meter() {
652 let meter = specialized::ThroughputMeter::new();
653
654 meter.record_bytes(1024); assert_eq!(meter.bytes_per_second(), 1024.0);
656 assert_eq!(meter.kb_per_second(), 1.0);
657 assert_eq!(meter.total_bytes(), 1024);
658
659 meter.record_bytes(1024 * 1024); assert_eq!(meter.total_bytes(), 1024 + 1024 * 1024);
661 assert!((meter.mb_per_second() - 1.001).abs() < 0.01);
662 }
663
664 #[test]
665 fn test_high_concurrency() {
666 let meter = Arc::new(RateMeter::new());
667 let num_threads = 50;
668 let ticks_per_thread = 1000;
669
670 let handles: Vec<_> = (0..num_threads)
671 .map(|_| {
672 let meter = Arc::clone(&meter);
673 thread::spawn(move || {
674 for _ in 0..ticks_per_thread {
675 meter.tick();
676 }
677 })
678 })
679 .collect();
680
681 for handle in handles {
682 handle.join().unwrap();
683 }
684
685 assert_eq!(meter.total(), num_threads * ticks_per_thread);
686
687 let stats = meter.stats();
688 assert!(stats.average_rate > 0.0);
689 assert_eq!(stats.total_events, num_threads * ticks_per_thread);
690 }
691
692 #[test]
693 fn test_concurrent_rate_limiting() {
694 let limiter = Arc::new(specialized::ApiRateLimiter::new(100));
695 let num_threads = 20;
696
697 let handles: Vec<_> = (0..num_threads)
698 .map(|_| {
699 let limiter = Arc::clone(&limiter);
700 thread::spawn(move || {
701 let mut successful = 0;
702 for _ in 0..10 {
703 if limiter.try_request() {
704 successful += 1;
705 }
706 }
707 successful
708 })
709 })
710 .collect();
711
712 let total_successful: i32 = handles.into_iter()
713 .map(|h| h.join().unwrap())
714 .sum();
715
716 assert!(total_successful <= 100);
718 assert!(total_successful >= 95); }
720
721 #[test]
722 fn test_display_and_debug() {
723 let meter = RateMeter::new();
724 meter.tick_n(42);
725
726 let display_str = format!("{}", meter);
727 assert!(display_str.contains("RateMeter"));
728 assert!(display_str.contains("42 total"));
729
730 let debug_str = format!("{:?}", meter);
731 assert!(debug_str.contains("RateMeter"));
732 assert!(debug_str.contains("total_events"));
733 }
734
735 #[test]
736 fn test_custom_window() {
737 let meter = RateMeter::with_window(Duration::from_secs(5));
738
739 meter.tick_n(10);
740 assert_eq!(meter.total(), 10);
741 assert_eq!(meter.rate(), 10.0);
742
743 let stats = meter.stats();
744 assert!(stats.window_fill >= 0.0);
745 }
746}
747
748#[cfg(test)]
749mod benchmarks {
750 use super::*;
751 use std::time::Instant;
752
753 #[test]
754 fn bench_rate_meter_tick() {
755 let meter = RateMeter::new();
756 let iterations = 10_000_000;
757
758 let start = Instant::now();
759 for _ in 0..iterations {
760 meter.tick();
761 }
762 let elapsed = start.elapsed();
763
764 println!("RateMeter tick: {:.2} ns/op",
765 elapsed.as_nanos() as f64 / iterations as f64);
766
767 assert_eq!(meter.total(), iterations);
768 assert!(elapsed.as_nanos() / (iterations as u128) < 400);
770 }
771
772 #[test]
773 fn bench_rate_meter_tick_n() {
774 let meter = RateMeter::new();
775 let iterations = 1_000_000;
776
777 let start = Instant::now();
778 for i in 0..iterations {
779 meter.tick_n((i % 10) + 1);
780 }
781 let elapsed = start.elapsed();
782
783 println!("RateMeter tick_n: {:.2} ns/op",
784 elapsed.as_nanos() as f64 / iterations as f64);
785
786 assert!(elapsed.as_nanos() / (iterations as u128) < 500);
788 }
789
790 #[test]
791 fn bench_rate_calculation() {
792 let meter = RateMeter::new();
793
794 meter.tick_n(1000);
796
797 let iterations = 1_000_000;
798 let start = Instant::now();
799
800 for _ in 0..iterations {
801 let _ = meter.rate();
802 }
803
804 let elapsed = start.elapsed();
805 println!("RateMeter rate: {:.2} ns/op",
806 elapsed.as_nanos() as f64 / iterations as f64);
807
808 assert!(elapsed.as_nanos() / iterations < 300);
810 }
811
812 #[test]
813 fn bench_api_rate_limiter() {
814 let limiter = specialized::ApiRateLimiter::new(1_000_000); let iterations = 1_000_000;
816
817 let start = Instant::now();
818 for _ in 0..iterations {
819 let _ = limiter.try_request();
820 }
821 let elapsed = start.elapsed();
822
823 println!("ApiRateLimiter try_request: {:.2} ns/op",
824 elapsed.as_nanos() as f64 / iterations as f64);
825
826 assert!(elapsed.as_nanos() / iterations < 1000);
828 }
829}