Skip to main content

vtcode_core/tools/resilience/
adaptive_rate_limiter.rs

1use hashbrown::HashMap;
2use once_cell::sync::Lazy;
3use std::sync::Mutex;
4use std::time::{Duration, Instant};
5use tracing::warn;
6
7/// A token bucket implementation for rate limiting.
8struct TokenBucket {
9    capacity: f64,
10    tokens: f64,
11    refill_rate: f64, // tokens per second
12    last_refill: Instant,
13}
14
15impl TokenBucket {
16    fn new(capacity: f64, refill_rate: f64) -> Self {
17        Self {
18            capacity,
19            tokens: capacity,
20            refill_rate,
21            last_refill: Instant::now(),
22        }
23    }
24
25    fn refill(&mut self) {
26        let now = Instant::now();
27        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
28        let new_tokens = elapsed * self.refill_rate;
29
30        if new_tokens > 0.0 {
31            self.tokens = (self.tokens + new_tokens).min(self.capacity);
32            self.last_refill = now;
33        }
34    }
35
36    fn try_acquire(&mut self, cost: f64) -> bool {
37        self.refill();
38        if self.tokens >= cost {
39            self.tokens -= cost;
40            true
41        } else {
42            false
43        }
44    }
45}
46
47/// Priority levels for tools.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum Priority {
50    Low,
51    Normal,
52    High,
53    Critical,
54}
55
56impl Priority {
57    fn weight(&self) -> f64 {
58        match self {
59            Priority::Low => 2.0,
60            Priority::Normal => 1.0,
61            Priority::High => 0.5,
62            Priority::Critical => 0.1,
63        }
64    }
65}
66
67/// Adaptive Rate Limiter with per-tool priority and exponential backoff.
68pub struct AdaptiveRateLimiter {
69    inner: Mutex<RateLimiterInner>,
70    default_capacity: f64,
71    default_refill_rate: f64,
72}
73
74struct RateLimiterInner {
75    buckets: HashMap<String, TokenBucket>,
76    tool_priorities: HashMap<String, Priority>,
77}
78
79impl AdaptiveRateLimiter {
80    pub fn new(default_capacity: f64, default_refill_rate: f64) -> Self {
81        let default_capacity = positive_finite_or(default_capacity, 1.0);
82        let default_refill_rate = positive_finite_or(default_refill_rate, 1.0);
83        Self {
84            inner: Mutex::new(RateLimiterInner {
85                buckets: HashMap::new(),
86                tool_priorities: HashMap::new(),
87            }),
88            default_capacity,
89            default_refill_rate,
90        }
91    }
92}
93
94/// Shared adaptive limiter for non-session-scoped execution flows (e.g. skill sub-LLM loops).
95pub static GLOBAL_ADAPTIVE_RATE_LIMITER: Lazy<AdaptiveRateLimiter> =
96    Lazy::new(AdaptiveRateLimiter::default);
97
98/// Acquire from the shared adaptive limiter.
99pub fn try_acquire_global(tool_name: &str) -> Result<(), Duration> {
100    GLOBAL_ADAPTIVE_RATE_LIMITER.try_acquire(tool_name)
101}
102
103impl Default for AdaptiveRateLimiter {
104    fn default() -> Self {
105        Self::new(10.0, 2.0)
106    }
107}
108
109impl AdaptiveRateLimiter {
110    /// Set a priority level for a specific tool.
111    pub fn set_priority(&self, tool_name: &str, priority: Priority) {
112        if let Ok(mut inner) = self.inner.lock() {
113            inner
114                .tool_priorities
115                .insert(tool_name.to_string(), priority);
116        } else {
117            warn!(
118                "adaptive rate limiter state lock poisoned while setting priority for '{}'",
119                tool_name
120            );
121        }
122    }
123
124    /// Try to acquire permission to execute a tool.
125    /// Returns Ok(()) if allowed, or Err(Duration) indicating suggested wait time.
126    pub fn try_acquire(&self, tool_name: &str) -> Result<(), Duration> {
127        let Ok(mut inner) = self.inner.lock() else {
128            warn!(
129                "adaptive rate limiter state lock poisoned while acquiring '{}'",
130                tool_name
131            );
132            return Err(Duration::from_millis(100));
133        };
134
135        let priority = inner
136            .tool_priorities
137            .get(tool_name)
138            .copied()
139            .unwrap_or(Priority::Normal);
140
141        let bucket = inner
142            .buckets
143            .entry(tool_name.to_owned())
144            .or_insert_with(|| TokenBucket::new(self.default_capacity, self.default_refill_rate));
145        let cost = priority.weight();
146
147        if bucket.try_acquire(cost) {
148            Ok(())
149        } else {
150            // Adaptive backoff:
151            // 1. Calculate base need (tokens needed / refill rate)
152            // 2. Apply exponential factor based on deficit to discourage hammering
153            // 3. High priority tools get a "discount" on the wait time
154
155            let needed = cost - bucket.tokens;
156            let base_wait_secs = needed / bucket.refill_rate;
157
158            // Jitter for backoff (add 10% randomness to avoid thundering herd)
159            let jitter = 1.1;
160
161            let wait_secs = match priority {
162                Priority::Critical => base_wait_secs * 0.5, // Return faster
163                Priority::High => base_wait_secs * 0.8,
164                Priority::Normal => base_wait_secs * jitter,
165                Priority::Low => base_wait_secs * 1.5 * jitter, // Penalize low priority overflow
166            };
167
168            Err(Duration::try_from_secs_f64(wait_secs).unwrap_or(Duration::from_secs(60)))
169        }
170    }
171}
172
173fn positive_finite_or(value: f64, fallback: f64) -> f64 {
174    if value.is_finite() && value > 0.0 {
175        value
176    } else {
177        fallback
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::{AdaptiveRateLimiter, Priority};
184    use std::time::Duration;
185
186    #[test]
187    fn returns_wait_hint_when_bucket_exhausted() {
188        let limiter = AdaptiveRateLimiter::new(1.0, 1.0);
189        limiter.try_acquire("tool").unwrap();
190        let wait_hint = limiter
191            .try_acquire("tool")
192            .expect_err("second immediate call should be rate-limited");
193        assert!(wait_hint > Duration::ZERO);
194    }
195
196    #[test]
197    fn high_priority_wait_is_shorter_than_low_priority() {
198        let limiter = AdaptiveRateLimiter::new(0.2, 1.0);
199        limiter.set_priority("high", Priority::High);
200        limiter.set_priority("low", Priority::Low);
201
202        let high_wait = limiter
203            .try_acquire("high")
204            .expect_err("high-priority call should be limited");
205        let low_wait = limiter
206            .try_acquire("low")
207            .expect_err("low-priority call should be limited");
208
209        assert!(high_wait < low_wait);
210    }
211
212    #[test]
213    fn invalid_default_limits_fall_back_to_positive_values() {
214        let limiter = AdaptiveRateLimiter::new(f64::NAN, 0.0);
215        limiter.try_acquire("tool").unwrap();
216        let wait_hint = limiter
217            .try_acquire("tool")
218            .expect_err("second immediate call should be rate-limited");
219
220        assert!(wait_hint > Duration::ZERO);
221    }
222}