use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::Mutex;
struct Window {
start: Instant,
count: u32,
}
pub struct Throttle {
max: u32,
period: Duration,
window: Mutex<Window>,
}
impl Throttle {
pub fn new(max: u32, period: Duration) -> Arc<Self> {
Arc::new(Self {
max,
period,
window: Mutex::new(Window {
start: Instant::now(),
count: 0,
}),
})
}
}
#[async_trait]
impl Processor for Throttle {
async fn process(&self, _exchange: &mut Exchange) -> Result<()> {
loop {
let sleep_for = {
let mut w = self.window.lock().await;
let now = Instant::now();
if now.duration_since(w.start) >= self.period {
w.start = now;
w.count = 0;
}
if w.count < self.max {
w.count += 1;
return Ok(());
}
self.period.saturating_sub(now.duration_since(w.start))
};
tokio::time::sleep(sleep_for).await;
}
}
}