vflight 0.9.2

Share files over the Veilid distributed network with content-addressable storage
Documentation
//! Token-bucket bandwidth throttling for chunk transfers.
//!
//! When enabled, both seed and fetch operations are rate-limited to a
//! configurable ceiling in KB/s. A burst allowance of 10× CHUNK_SIZE
//! permits the first wave of parallel transfers to proceed without
//! artificial latency; steady-state rate limiting engages after that.

use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::{debug, trace};

use crate::protocol::CHUNK_SIZE;

/// Number of chunks worth of tokens allowed as an initial burst.
const BURST_CHUNKS: usize = 10;

/// A shared, async-safe token-bucket rate limiter.
///
/// Construct via [`Throttler::new`]. Share across tasks with `Arc<Throttler>`.
/// When the configured rate is 0, [`Throttler::new`] returns `None` so that
/// call sites can store `Option<Arc<Throttler>>` and skip the abstraction
/// entirely on the unlimited path.
pub struct Throttler {
    /// Rate in bytes per second.
    rate: u64,
    /// Maximum tokens (burst ceiling) in bytes.
    bucket_size: u64,
    /// Interior-mutable state, protected by an async mutex.
    state: Mutex<ThrottlerState>,
}

struct ThrottlerState {
    /// Tokens currently available, in bytes.
    tokens: f64,
    /// Timestamp of the most recent refill calculation.
    last_refill: Instant,
}

impl Throttler {
    /// Create a new throttler at the given rate, or `None` if `rate_kb_s` is 0.
    ///
    /// Returning `Option` at construction time means call sites never need to
    /// check the rate at runtime — `None` means unlimited.
    pub fn new(rate_kb_s: u64) -> Option<Self> {
        if rate_kb_s == 0 {
            return None;
        }
        let rate = rate_kb_s * 1024;
        let bucket_size = (BURST_CHUNKS * CHUNK_SIZE) as u64;
        debug!(
            rate_kb_s,
            rate_bytes_s = rate,
            bucket_size,
            "Throttler created"
        );
        Some(Self {
            rate,
            bucket_size,
            state: Mutex::new(ThrottlerState {
                tokens: bucket_size as f64,
                last_refill: Instant::now(),
            }),
        })
    }

    /// Wait until `bytes` tokens are available, then consume them.
    ///
    /// The internal lock is dropped before sleeping so that parallel tasks
    /// sharing this throttler do not block each other.
    pub async fn acquire(&self, bytes: usize) {
        loop {
            let wait_duration = {
                let mut state = self.state.lock().await;
                let now = Instant::now();
                let elapsed = (now - state.last_refill).as_secs_f64();
                state.tokens += elapsed * self.rate as f64;
                if state.tokens > self.bucket_size as f64 {
                    state.tokens = self.bucket_size as f64;
                }
                state.last_refill = now;

                if state.tokens >= bytes as f64 {
                    state.tokens -= bytes as f64;
                    trace!(tokens_remaining = state.tokens, "Tokens consumed");
                    return;
                }

                let deficit = bytes as f64 - state.tokens;
                Duration::from_secs_f64(deficit / self.rate as f64)
            };

            trace!(
                wait_ms = wait_duration.as_secs_f64() * 1000.0,
                "Throttle: sleeping"
            );
            tokio::time::sleep(wait_duration).await;
        }
    }

    /// Rate in KB/s, for display in banners.
    pub fn rate_kb_s(&self) -> u64 {
        self.rate / 1024
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_returns_none_when_rate_is_zero() {
        assert!(Throttler::new(0).is_none());
    }

    #[test]
    fn test_new_returns_some_when_rate_is_nonzero() {
        let t = Throttler::new(100).unwrap();
        assert_eq!(t.rate_kb_s(), 100);
    }

    #[tokio::test]
    async fn test_acquire_immediate_when_tokens_available() {
        let t = Throttler::new(1).unwrap(); // 1 KB/s, but bucket starts full
        let start = Instant::now();
        t.acquire(1).await;
        assert!(start.elapsed() < Duration::from_millis(50));
    }

    #[tokio::test]
    async fn test_acquire_sleeps_when_exhausted() {
        // 10 KB/s = 10240 bytes/sec
        let t = Throttler::new(10).unwrap();
        // Drain the full bucket (300,000 bytes) — instant because bucket starts full
        t.acquire(BURST_CHUNKS * CHUNK_SIZE).await;
        // Now request 2048 bytes at 10240 bytes/sec → ~200ms
        let start = Instant::now();
        t.acquire(2048).await;
        assert!(
            start.elapsed() >= Duration::from_millis(150),
            "Expected >= 150ms sleep, got {:?}",
            start.elapsed()
        );
    }

    #[tokio::test]
    async fn test_acquire_partial_refill() {
        // 1024 KB/s = 1 MB/s = 1_048_576 bytes/sec
        let t = Throttler::new(1024).unwrap();
        // Drain bucket
        t.acquire(BURST_CHUNKS * CHUNK_SIZE).await;
        // Wait 100ms — should refill ~104,857 bytes
        tokio::time::sleep(Duration::from_millis(100)).await;
        // Acquire 50,000 bytes — should be available from refill, no extra sleep
        let start = Instant::now();
        t.acquire(50_000).await;
        assert!(
            start.elapsed() < Duration::from_millis(50),
            "Expected immediate acquire after refill, got {:?}",
            start.elapsed()
        );
    }

    /// Sustained rate test — verifies throughput is actually capped over time.
    /// Run with: `cargo test test_rate_limiting_sustained -- --ignored`
    #[test]
    #[ignore]
    fn test_rate_limiting_sustained() {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async {
            // 10 KB/s = 10240 bytes/sec. Bucket = 300,000 bytes.
            let t = Throttler::new(10).unwrap();
            // Drain bucket
            t.acquire(BURST_CHUNKS * CHUNK_SIZE).await;
            // Acquire 10 KB (one second's worth) 5 times → ~5 seconds total
            let start = Instant::now();
            for _ in 0..5 {
                t.acquire(10 * 1024).await;
            }
            let elapsed = start.elapsed();
            assert!(
                elapsed >= Duration::from_secs(4),
                "Expected >= 4s for 50KB at 10KB/s, got {:?}",
                elapsed
            );
        });
    }
}