Skip to main content

coreon_eip/
throttle.rs

1//! Throttle — allow at most `max` exchanges per `period`, blocking callers
2//! when the window is full.
3//!
4//! Implementation: fixed-window counter. Simpler than leaky-bucket, adequate
5//! for coarse shaping. If smoother pacing is needed, later add a token-bucket
6//! variant.
7
8use async_trait::async_trait;
9use coreon_core::{Exchange, Processor, Result};
10use std::{
11    sync::Arc,
12    time::{Duration, Instant},
13};
14use tokio::sync::Mutex;
15
16struct Window {
17    start: Instant,
18    count: u32,
19}
20
21pub struct Throttle {
22    max: u32,
23    period: Duration,
24    window: Mutex<Window>,
25}
26
27impl Throttle {
28    pub fn new(max: u32, period: Duration) -> Arc<Self> {
29        Arc::new(Self {
30            max,
31            period,
32            window: Mutex::new(Window {
33                start: Instant::now(),
34                count: 0,
35            }),
36        })
37    }
38}
39
40#[async_trait]
41impl Processor for Throttle {
42    async fn process(&self, _exchange: &mut Exchange) -> Result<()> {
43        loop {
44            // Compute sleep outside the lock to avoid holding it across .await.
45            let sleep_for = {
46                let mut w = self.window.lock().await;
47                let now = Instant::now();
48                if now.duration_since(w.start) >= self.period {
49                    w.start = now;
50                    w.count = 0;
51                }
52                if w.count < self.max {
53                    w.count += 1;
54                    return Ok(());
55                }
56                self.period.saturating_sub(now.duration_since(w.start))
57            };
58            tokio::time::sleep(sleep_for).await;
59        }
60    }
61}