#[derive(Debug, Clone, PartialEq)]
pub enum DeliveryMsg {
Single(String),
Batch {
last: String,
dropped: usize,
},
}
#[derive(Debug)]
pub struct RateLimiter {
window_ms: u64,
max_per_window: usize, buffer: Vec<String>,
last_emit_ms: Option<u64>, last_seen_ms: u64,
}
impl RateLimiter {
pub fn new(rate_ms: u64) -> Self {
Self {
window_ms: rate_ms,
max_per_window: if rate_ms == 0 { 0 } else { 1 },
buffer: Vec::new(),
last_emit_ms: None,
last_seen_ms: 0,
}
}
pub fn admit(&mut self, msg: String, now_ms: u64) -> Option<DeliveryMsg> {
self.last_seen_ms = now_ms;
if self.max_per_window == 0 {
return Some(DeliveryMsg::Single(msg));
}
let window_open = self
.last_emit_ms
.map(|last| now_ms.saturating_sub(last) >= self.window_ms)
.unwrap_or(true);
if window_open {
self.last_emit_ms = Some(now_ms);
Some(DeliveryMsg::Single(msg))
} else {
self.buffer.push(msg);
None
}
}
pub fn flush_pending(&mut self, now_ms: u64) -> Option<DeliveryMsg> {
if self.max_per_window == 0 || self.buffer.is_empty() {
return None;
}
let window_closed = self
.last_emit_ms
.map(|last| now_ms.saturating_sub(last) >= self.window_ms)
.unwrap_or(true);
if !window_closed {
return None;
}
let last = self.buffer.pop().expect("buffer not empty");
let dropped = self.buffer.len();
self.buffer.clear();
self.last_emit_ms = Some(now_ms);
Some(DeliveryMsg::Batch { last, dropped })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn first_message_in_window_is_single() {
let mut r = RateLimiter::new(2000);
assert_eq!(
r.admit("a".into(), 0),
Some(DeliveryMsg::Single("a".into()))
);
}
#[test]
fn subsequent_in_window_buffered() {
let mut r = RateLimiter::new(2000);
assert!(r.admit("a".into(), 0).is_some());
assert!(r.admit("b".into(), 500).is_none());
assert!(r.admit("c".into(), 1500).is_none());
}
#[test]
fn flush_after_window_emits_batch() {
let mut r = RateLimiter::new(2000);
r.admit("a".into(), 0); r.admit("b".into(), 500); r.admit("c".into(), 1500); let flushed = r.flush_pending(2000);
assert_eq!(
flushed,
Some(DeliveryMsg::Batch {
last: "c".into(),
dropped: 1
})
);
}
#[test]
fn flush_with_empty_buffer_returns_none() {
let mut r = RateLimiter::new(2000);
assert_eq!(r.flush_pending(5000), None);
}
#[test]
fn flush_inside_window_returns_none() {
let mut r = RateLimiter::new(2000);
r.admit("a".into(), 0);
r.admit("b".into(), 500);
assert_eq!(r.flush_pending(1500), None);
}
#[test]
fn rate_zero_disables_limit() {
let mut r = RateLimiter::new(0);
assert_eq!(
r.admit("a".into(), 0),
Some(DeliveryMsg::Single("a".into()))
);
assert_eq!(
r.admit("b".into(), 1),
Some(DeliveryMsg::Single("b".into()))
);
assert_eq!(
r.admit("c".into(), 2),
Some(DeliveryMsg::Single("c".into()))
);
assert_eq!(r.flush_pending(3), None);
}
#[test]
fn second_window_emits_single_again() {
let mut r = RateLimiter::new(2000);
assert!(matches!(
r.admit("a".into(), 0),
Some(DeliveryMsg::Single(_))
));
r.admit("b".into(), 500);
let _ = r.flush_pending(2000);
assert!(r.admit("c".into(), 2100).is_none(), "still in window");
assert!(
matches!(r.admit("d".into(), 4001), Some(DeliveryMsg::Single(_))),
"next window should be open"
);
}
}