car-engine 0.13.0

Core runtime engine for Common Agent Runtime
Documentation
//! Token bucket rate limiter for tool calls with backpressure support.

use std::collections::HashMap;
use std::time::Instant;
use tokio::sync::Mutex;

/// Per-tool rate limit configuration.
#[derive(Debug, Clone)]
pub struct RateLimit {
    pub max_calls: u32,
    pub interval_secs: f64,
}

/// Token bucket rate limiter for tool calls.
///
/// Each tool can have an independent rate limit. When a tool's bucket is empty,
/// `acquire()` blocks until a token becomes available (backpressure).
pub struct RateLimiter {
    limits: Mutex<HashMap<String, RateLimit>>,
    buckets: Mutex<HashMap<String, TokenBucket>>,
}

struct TokenBucket {
    tokens: f64,
    max_tokens: f64,
    refill_rate: f64, // tokens per second
    last_refill: Instant,
}

impl TokenBucket {
    fn new(max_tokens: f64, refill_rate: f64) -> Self {
        Self {
            tokens: max_tokens,
            max_tokens,
            refill_rate,
            last_refill: Instant::now(),
        }
    }

    /// Refill tokens based on elapsed time since last refill.
    fn refill(&mut self) {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.max_tokens);
        self.last_refill = now;
    }

    /// Try to consume one token. Returns true if successful.
    fn try_consume(&mut self) -> bool {
        self.refill();
        if self.tokens >= 1.0 {
            self.tokens -= 1.0;
            true
        } else {
            false
        }
    }

    /// Seconds until one token is available (0.0 if already available).
    fn time_until_available(&mut self) -> f64 {
        self.refill();
        if self.tokens >= 1.0 {
            return 0.0;
        }
        let deficit = 1.0 - self.tokens;
        deficit / self.refill_rate
    }
}

impl RateLimiter {
    /// Create an empty rate limiter with no limits configured.
    pub fn new() -> Self {
        Self {
            limits: Mutex::new(HashMap::new()),
            buckets: Mutex::new(HashMap::new()),
        }
    }

    /// Configure a rate limit for a specific tool.
    ///
    /// `max_calls` tokens over `interval_secs` seconds. The refill rate is
    /// `max_calls / interval_secs` tokens per second.
    pub async fn set_limit(&self, tool: &str, limit: RateLimit) {
        let max_tokens = limit.max_calls as f64;
        let refill_rate = max_tokens / limit.interval_secs;

        self.limits.lock().await.insert(tool.to_string(), limit);

        self.buckets
            .lock()
            .await
            .insert(tool.to_string(), TokenBucket::new(max_tokens, refill_rate));
    }

    /// Wait until a token is available for the given tool, then consume it.
    ///
    /// If no rate limit is configured for the tool, returns immediately.
    /// This provides backpressure: callers block until capacity is available.
    pub async fn acquire(&self, tool: &str) {
        loop {
            let wait_time = {
                let mut buckets = self.buckets.lock().await;
                let bucket = match buckets.get_mut(tool) {
                    Some(b) => b,
                    None => return, // no limit configured
                };

                if bucket.try_consume() {
                    return;
                }

                bucket.time_until_available()
            };

            // Sleep outside the lock to allow other tasks to proceed.
            tokio::time::sleep(std::time::Duration::from_secs_f64(wait_time)).await;
        }
    }

    /// Non-blocking attempt to acquire a token for the given tool.
    ///
    /// Returns `true` if a token was consumed, `false` if the bucket is empty.
    /// Returns `true` if no rate limit is configured for the tool.
    pub async fn try_acquire(&self, tool: &str) -> bool {
        let mut buckets = self.buckets.lock().await;
        match buckets.get_mut(tool) {
            Some(bucket) => bucket.try_consume(),
            None => true, // no limit configured
        }
    }
}

impl Default for RateLimiter {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_token_bucket_refills_correctly() {
        let limiter = RateLimiter::new();
        limiter
            .set_limit(
                "tool_a",
                RateLimit {
                    max_calls: 2,
                    interval_secs: 1.0,
                },
            )
            .await;

        // Consume both tokens.
        assert!(limiter.try_acquire("tool_a").await);
        assert!(limiter.try_acquire("tool_a").await);
        // Bucket is empty.
        assert!(!limiter.try_acquire("tool_a").await);

        // Wait for refill (0.5s should refill 1 token at rate 2/s).
        tokio::time::sleep(std::time::Duration::from_millis(550)).await;
        assert!(limiter.try_acquire("tool_a").await);
    }

    #[tokio::test]
    async fn test_acquire_blocks_when_empty() {
        let limiter = Arc::new(RateLimiter::new());
        limiter
            .set_limit(
                "tool_b",
                RateLimit {
                    max_calls: 1,
                    interval_secs: 0.2,
                },
            )
            .await;

        // Drain the single token.
        assert!(limiter.try_acquire("tool_b").await);
        assert!(!limiter.try_acquire("tool_b").await);

        // acquire() should block then return after ~0.2s refill.
        let start = Instant::now();
        limiter.acquire("tool_b").await;
        let elapsed = start.elapsed();

        assert!(
            elapsed.as_millis() >= 100,
            "acquire should have blocked; elapsed={}ms",
            elapsed.as_millis()
        );
    }

    #[tokio::test]
    async fn test_independent_tool_limits() {
        let limiter = RateLimiter::new();
        limiter
            .set_limit(
                "fast",
                RateLimit {
                    max_calls: 10,
                    interval_secs: 1.0,
                },
            )
            .await;
        limiter
            .set_limit(
                "slow",
                RateLimit {
                    max_calls: 1,
                    interval_secs: 1.0,
                },
            )
            .await;

        // Drain the slow bucket.
        assert!(limiter.try_acquire("slow").await);
        assert!(!limiter.try_acquire("slow").await);

        // fast bucket should still have tokens.
        for _ in 0..10 {
            assert!(limiter.try_acquire("fast").await);
        }
        assert!(!limiter.try_acquire("fast").await);
    }

    #[tokio::test]
    async fn test_no_limit_always_passes() {
        let limiter = RateLimiter::new();
        // No limit set for "unconfigured".
        assert!(limiter.try_acquire("unconfigured").await);
        limiter.acquire("unconfigured").await; // should return immediately
    }

    use std::sync::Arc;

    #[tokio::test]
    async fn test_max_tokens_cap() {
        let limiter = RateLimiter::new();
        limiter
            .set_limit(
                "capped",
                RateLimit {
                    max_calls: 2,
                    interval_secs: 1.0,
                },
            )
            .await;

        // Wait extra time -- tokens should not exceed max_tokens.
        tokio::time::sleep(std::time::Duration::from_millis(600)).await;

        assert!(limiter.try_acquire("capped").await);
        assert!(limiter.try_acquire("capped").await);
        assert!(!limiter.try_acquire("capped").await);
    }
}