ugi 0.2.1

Runtime-agnostic Rust request client with HTTP/1.1, HTTP/2, HTTP/3, H2C, WebSocket, SSE, and gRPC support
Documentation
//! Token-bucket rate limiter usable on any request or download.
//!
//! # Per-request limit
//! ```no_run
//! # async fn run() -> ugi::Result<()> {
//! use ugi::Client;
//! let client = Client::builder().build()?;
//! // Limit response body reads to 512 KB/s for this request only.
//! let resp = client.get("https://example.com/big-file")
//!     .rate_limit(512 * 1024)
//!     .await?;
//! # Ok(()) }
//! ```
//!
//! # Shared limit across concurrent requests
//! ```no_run
//! # async fn run() -> ugi::Result<()> {
//! use ugi::{Client, RateLimiter};
//! let client = Client::builder().build()?;
//! let limiter = RateLimiter::new(1024 * 1024); // 1 MB/s shared budget
//!
//! // Both requests draw from the same bucket.
//! let l1 = limiter.clone();
//! let l2 = limiter.clone();
//! let r1 = client.get("https://example.com/a").rate_limit_shared(l1);
//! let r2 = client.get("https://example.com/b").rate_limit_shared(l2);
//! // drive r1 and r2 concurrently …
//! # Ok(()) }
//! ```

use std::sync::Arc;
use std::time::{Duration, Instant};

use async_io::Timer;
use async_lock::Mutex;

// ─────────────────────────────────────────────────── public handle ────────────

/// A cloneable, shared token-bucket rate limiter.
///
/// Clone the handle to share one budget across multiple concurrent downloads
/// or requests.  Use [`RateLimiter::new`] to create an instance; cloning it
/// gives all clones access to the same underlying bucket.
#[derive(Clone, Debug)]
pub struct RateLimiter(Arc<Mutex<Bucket>>);

impl RateLimiter {
    /// Create a new rate limiter.
    ///
    /// Cloning the returned handle gives all clones access to the same
    /// token bucket, so the budget is shared among all clones.
    ///
    /// Pass `0` to disable rate limiting (unlimited throughput).
    pub fn new(bytes_per_sec: u64) -> Self {
        Self(Arc::new(Mutex::new(Bucket::new(bytes_per_sec))))
    }

    /// Adjust the rate ceiling at runtime.
    ///
    /// Takes effect immediately on the next [`acquire`](Self::acquire) call.
    /// Pass `0` to remove the limit.
    pub async fn set_rate(&self, bytes_per_sec: u64) {
        self.0.lock().await.bytes_per_sec = bytes_per_sec;
    }

    /// Block until `n` bytes of budget are available, then consume them.
    pub async fn acquire(&self, n: usize) {
        self.0.lock().await.acquire(n).await;
    }

    /// Current configured rate, in bytes per second.
    pub async fn rate(&self) -> u64 {
        self.0.lock().await.bytes_per_sec
    }
}

// ──────────────────────────────────────────────────── inner bucket ────────────

#[derive(Debug)]
pub(crate) struct Bucket {
    pub(crate) bytes_per_sec: u64,
    tokens: f64,
    last_refill: Instant,
}

impl Bucket {
    pub(crate) fn new(bytes_per_sec: u64) -> Self {
        Self {
            bytes_per_sec,
            tokens: bytes_per_sec as f64,
            last_refill: Instant::now(),
        }
    }

    pub(crate) async fn acquire(&mut self, n: usize) {
        if self.bytes_per_sec == 0 {
            return;
        }
        let n = n as f64;
        loop {
            let now = Instant::now();
            let elapsed = now.duration_since(self.last_refill).as_secs_f64();
            self.last_refill = now;
            self.tokens += elapsed * self.bytes_per_sec as f64;
            if self.tokens > self.bytes_per_sec as f64 {
                self.tokens = self.bytes_per_sec as f64;
            }
            if self.tokens >= n {
                self.tokens -= n;
                return;
            }
            let deficit = n - self.tokens;
            let wait = Duration::from_secs_f64(deficit / self.bytes_per_sec as f64);
            Timer::after(wait).await;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn unlimited_limiter_has_zero_rate() {
        let rl = RateLimiter::new(0);
        // Can't easily async-test acquire, but verify construction.
        let _ = rl;
    }

    #[test]
    fn clone_points_to_same_arc() {
        let a = RateLimiter::new(1024);
        let b = a.clone();
        assert!(Arc::ptr_eq(&a.0, &b.0));
    }
}