use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_CAP_BYTES: u64 = 64 * 1024 * 1024;
const RESUME_FRACTION: f64 = 0.75;
#[derive(Debug, Clone)]
pub struct SendThrottle {
in_flight: Arc<AtomicU64>,
cap: u64,
resume_threshold: u64,
}
impl SendThrottle {
pub fn new(cap: u64) -> Self {
let resume_threshold = (cap as f64 * RESUME_FRACTION) as u64;
Self {
in_flight: Arc::new(AtomicU64::new(0)),
cap,
resume_threshold,
}
}
pub fn default_cap() -> Self {
Self::new(DEFAULT_CAP_BYTES)
}
pub fn in_flight(&self) -> u64 {
self.in_flight.load(Ordering::Relaxed)
}
pub fn can_send(&self) -> bool {
self.in_flight.load(Ordering::Acquire) < self.cap
}
pub fn can_resume(&self) -> bool {
self.in_flight.load(Ordering::Acquire) <= self.resume_threshold
}
pub fn charge(&self, bytes: u64) {
self.in_flight.fetch_add(bytes, Ordering::AcqRel);
}
pub fn ack(&self, bytes: u64) {
let prev = self.in_flight.load(Ordering::Acquire);
let new = prev.saturating_sub(bytes);
self.in_flight.store(new, Ordering::Release);
}
pub fn reset(&self) {
self.in_flight.store(0, Ordering::Release);
}
pub fn cap(&self) -> u64 {
self.cap
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn throttle_charge_and_ack() {
let t = SendThrottle::new(100);
assert!(t.can_send());
t.charge(80);
assert!(t.can_send());
t.charge(30);
assert!(!t.can_send());
t.ack(30);
assert!(!t.can_resume());
t.ack(10);
assert!(t.can_resume());
}
#[test]
fn throttle_reset_zeroes_counter() {
let t = SendThrottle::new(100);
t.charge(99);
t.reset();
assert_eq!(t.in_flight(), 0);
assert!(t.can_send());
}
#[test]
fn ack_does_not_underflow() {
let t = SendThrottle::new(100);
t.charge(10);
t.ack(50); assert_eq!(t.in_flight(), 0);
}
#[test]
fn clone_shares_state() {
let t = SendThrottle::new(100);
let t2 = t.clone();
t.charge(60);
assert_eq!(t2.in_flight(), 60);
}
}