stochastic-routing-extended 1.0.2

SRX (Stochastic Routing eXtended) — a next-generation VPN protocol with stochastic routing, DPI evasion, post-quantum cryptography, and multi-transport channel splitting
Documentation
//! Token bucket rate limiter for traffic shaping.
//!
//! Limits the sending rate to prevent bursts that might trigger DPI or
//! overwhelm slow transport. Can be applied per-transport or globally.

use std::time::{Duration, Instant};

/// Token bucket rate limiter.
///
/// Tokens accumulate at `rate` tokens/second up to `burst` maximum.
/// Each seingnd consumes tokens equal to the payload size in bytes.
pub struct RateLimiter {
    /// Tokens available.
    tokens: f64,
    /// Maximum token capacity (burst size in bytes).
    burst: f64,
    /// Token refill rate (bytes per second).
    rate: f64,
    /// Last time tokens were refilled.
    last_refill: Instant,
}

impl RateLimiter {
    /// Create a new rate limiter.
    ///
    /// - `rate_bytes_per_sec`: steady-state throughput limit.
    /// - `burst_bytes`: maximum burst size (also the initial token count).
    pub fn new(rate_bytes_per_sec: u64, burst_bytes: u64) -> Self {
        Self {
            tokens: burst_bytes as f64,
            burst: burst_bytes as f64,
            rate: rate_bytes_per_sec as f64,
            last_refill: Instant::now(),
        }
    }

    /// Refill tokens based on elapsed time.
    fn refill(&mut self) {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
        self.tokens = (self.tokens + elapsed * self.rate).min(self.burst);
        self.last_refill = now;
    }

    /// Try to consume `bytes` tokens. Returns `true` if allowed, `false` if rate-limited.
    pub fn try_consume(&mut self, bytes: usize) -> bool {
        self.refill();
        let cost = bytes as f64;
        if self.tokens >= cost {
            self.tokens -= cost;
            true
        } else {
            false
        }
    }

    /// How long to wait before `bytes` tokens become available.
    ///
    /// Returns `Duration::ZERO` if tokens are already available.
    pub fn wait_time(&mut self, bytes: usize) -> Duration {
        self.refill();
        let cost = bytes as f64;
        if self.tokens >= cost {
            Duration::ZERO
        } else {
            let deficit = cost - self.tokens;
            Duration::from_secs_f64(deficit / self.rate)
        }
    }

    /// Consume tokens, waiting if necessary. Call from an async context.
    pub async fn consume(&mut self, bytes: usize) {
        let wait = self.wait_time(bytes);
        if !wait.is_zero() {
            tokio::time::sleep(wait).await;
            self.refill();
        }
        self.tokens -= bytes as f64;
    }

    /// Current available tokens (bytes).
    pub fn available(&mut self) -> u64 {
        self.refill();
        self.tokens as u64
    }

    /// Configured rate in bytes per second.
    pub fn rate(&self) -> u64 {
        self.rate as u64
    }

    /// Configured burst size in bytes.
    pub fn burst(&self) -> u64 {
        self.burst as u64
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn initial_burst_allows_immediate_send() {
        let mut rl = RateLimiter::new(1000, 5000);
        assert!(rl.try_consume(5000));
        // Burst exhausted, next should fail.
        assert!(!rl.try_consume(1));
    }

    #[test]
    fn try_consume_within_budget() {
        let mut rl = RateLimiter::new(10_000, 10_000);
        assert!(rl.try_consume(5000));
        assert!(rl.try_consume(5000));
        assert!(!rl.try_consume(1));
    }

    #[test]
    fn wait_time_zero_when_available() {
        let mut rl = RateLimiter::new(1000, 5000);
        assert_eq!(rl.wait_time(100), Duration::ZERO);
    }

    #[test]
    fn wait_time_nonzero_when_exhausted() {
        let mut rl = RateLimiter::new(1000, 100);
        rl.try_consume(100);
        let wait = rl.wait_time(100);
        // Need 100 bytes at 1000 B/s = 100ms.
        assert!(wait.as_millis() >= 90 && wait.as_millis() <= 110);
    }

    #[test]
    fn tokens_refill_over_time() {
        let mut rl = RateLimiter::new(1_000_000, 1_000_000);
        rl.try_consume(1_000_000); // Exhaust.
        // Manually advance last_refill to simulate time passing.
        rl.last_refill = Instant::now() - Duration::from_secs(1);
        assert!(rl.available() >= 900_000);
    }

    #[test]
    fn rate_and_burst_accessors() {
        let rl = RateLimiter::new(5000, 10000);
        assert_eq!(rl.rate(), 5000);
        assert_eq!(rl.burst(), 10000);
    }

    #[tokio::test]
    async fn consume_waits_then_succeeds() {
        let mut rl = RateLimiter::new(100_000, 100);
        rl.try_consume(100); // Exhaust burst.
        let start = Instant::now();
        rl.consume(100).await; // Should wait ~1ms.
        let elapsed = start.elapsed();
        assert!(elapsed.as_micros() >= 500); // At least some waiting.
    }

    #[test]
    fn zero_byte_consume_always_succeeds() {
        let mut rl = RateLimiter::new(1, 1);
        rl.try_consume(1);
        assert!(rl.try_consume(0));
    }

    #[test]
    fn burst_caps_token_accumulation() {
        let mut rl = RateLimiter::new(1_000_000, 500);
        // Simulate 10 seconds of idle.
        rl.last_refill = Instant::now() - Duration::from_secs(10);
        let avail = rl.available();
        assert_eq!(avail, 500); // Capped at burst.
    }
}