1use async_trait::async_trait;
9use coreon_core::{Exchange, Processor, Result};
10use std::{
11 sync::Arc,
12 time::{Duration, Instant},
13};
14use tokio::sync::Mutex;
15
16struct Window {
17 start: Instant,
18 count: u32,
19}
20
21pub struct Throttle {
22 max: u32,
23 period: Duration,
24 window: Mutex<Window>,
25}
26
27impl Throttle {
28 pub fn new(max: u32, period: Duration) -> Arc<Self> {
29 Arc::new(Self {
30 max,
31 period,
32 window: Mutex::new(Window {
33 start: Instant::now(),
34 count: 0,
35 }),
36 })
37 }
38}
39
40#[async_trait]
41impl Processor for Throttle {
42 async fn process(&self, _exchange: &mut Exchange) -> Result<()> {
43 loop {
44 let sleep_for = {
46 let mut w = self.window.lock().await;
47 let now = Instant::now();
48 if now.duration_since(w.start) >= self.period {
49 w.start = now;
50 w.count = 0;
51 }
52 if w.count < self.max {
53 w.count += 1;
54 return Ok(());
55 }
56 self.period.saturating_sub(now.duration_since(w.start))
57 };
58 tokio::time::sleep(sleep_for).await;
59 }
60 }
61}