vtcode_core/tools/resilience/
adaptive_rate_limiter.rs1use hashbrown::HashMap;
2use once_cell::sync::Lazy;
3use std::sync::Mutex;
4use std::time::{Duration, Instant};
5use tracing::warn;
6
7struct TokenBucket {
9 capacity: f64,
10 tokens: f64,
11 refill_rate: f64, last_refill: Instant,
13}
14
15impl TokenBucket {
16 fn new(capacity: f64, refill_rate: f64) -> Self {
17 Self {
18 capacity,
19 tokens: capacity,
20 refill_rate,
21 last_refill: Instant::now(),
22 }
23 }
24
25 fn refill(&mut self) {
26 let now = Instant::now();
27 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
28 let new_tokens = elapsed * self.refill_rate;
29
30 if new_tokens > 0.0 {
31 self.tokens = (self.tokens + new_tokens).min(self.capacity);
32 self.last_refill = now;
33 }
34 }
35
36 fn try_acquire(&mut self, cost: f64) -> bool {
37 self.refill();
38 if self.tokens >= cost {
39 self.tokens -= cost;
40 true
41 } else {
42 false
43 }
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum Priority {
50 Low,
51 Normal,
52 High,
53 Critical,
54}
55
56impl Priority {
57 fn weight(&self) -> f64 {
58 match self {
59 Priority::Low => 2.0,
60 Priority::Normal => 1.0,
61 Priority::High => 0.5,
62 Priority::Critical => 0.1,
63 }
64 }
65}
66
67pub struct AdaptiveRateLimiter {
69 inner: Mutex<RateLimiterInner>,
70 default_capacity: f64,
71 default_refill_rate: f64,
72}
73
74struct RateLimiterInner {
75 buckets: HashMap<String, TokenBucket>,
76 tool_priorities: HashMap<String, Priority>,
77}
78
79impl AdaptiveRateLimiter {
80 pub fn new(default_capacity: f64, default_refill_rate: f64) -> Self {
81 let default_capacity = positive_finite_or(default_capacity, 1.0);
82 let default_refill_rate = positive_finite_or(default_refill_rate, 1.0);
83 Self {
84 inner: Mutex::new(RateLimiterInner {
85 buckets: HashMap::new(),
86 tool_priorities: HashMap::new(),
87 }),
88 default_capacity,
89 default_refill_rate,
90 }
91 }
92}
93
94pub static GLOBAL_ADAPTIVE_RATE_LIMITER: Lazy<AdaptiveRateLimiter> =
96 Lazy::new(AdaptiveRateLimiter::default);
97
98pub fn try_acquire_global(tool_name: &str) -> Result<(), Duration> {
100 GLOBAL_ADAPTIVE_RATE_LIMITER.try_acquire(tool_name)
101}
102
103impl Default for AdaptiveRateLimiter {
104 fn default() -> Self {
105 Self::new(10.0, 2.0)
106 }
107}
108
109impl AdaptiveRateLimiter {
110 pub fn set_priority(&self, tool_name: &str, priority: Priority) {
112 if let Ok(mut inner) = self.inner.lock() {
113 inner
114 .tool_priorities
115 .insert(tool_name.to_string(), priority);
116 } else {
117 warn!(
118 "adaptive rate limiter state lock poisoned while setting priority for '{}'",
119 tool_name
120 );
121 }
122 }
123
124 pub fn try_acquire(&self, tool_name: &str) -> Result<(), Duration> {
127 let Ok(mut inner) = self.inner.lock() else {
128 warn!(
129 "adaptive rate limiter state lock poisoned while acquiring '{}'",
130 tool_name
131 );
132 return Err(Duration::from_millis(100));
133 };
134
135 let priority = inner
136 .tool_priorities
137 .get(tool_name)
138 .copied()
139 .unwrap_or(Priority::Normal);
140
141 let bucket = inner
142 .buckets
143 .entry(tool_name.to_owned())
144 .or_insert_with(|| TokenBucket::new(self.default_capacity, self.default_refill_rate));
145 let cost = priority.weight();
146
147 if bucket.try_acquire(cost) {
148 Ok(())
149 } else {
150 let needed = cost - bucket.tokens;
156 let base_wait_secs = needed / bucket.refill_rate;
157
158 let jitter = 1.1;
160
161 let wait_secs = match priority {
162 Priority::Critical => base_wait_secs * 0.5, Priority::High => base_wait_secs * 0.8,
164 Priority::Normal => base_wait_secs * jitter,
165 Priority::Low => base_wait_secs * 1.5 * jitter, };
167
168 Err(Duration::try_from_secs_f64(wait_secs).unwrap_or(Duration::from_secs(60)))
169 }
170 }
171}
172
173fn positive_finite_or(value: f64, fallback: f64) -> f64 {
174 if value.is_finite() && value > 0.0 {
175 value
176 } else {
177 fallback
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::{AdaptiveRateLimiter, Priority};
184 use std::time::Duration;
185
186 #[test]
187 fn returns_wait_hint_when_bucket_exhausted() {
188 let limiter = AdaptiveRateLimiter::new(1.0, 1.0);
189 limiter.try_acquire("tool").unwrap();
190 let wait_hint = limiter
191 .try_acquire("tool")
192 .expect_err("second immediate call should be rate-limited");
193 assert!(wait_hint > Duration::ZERO);
194 }
195
196 #[test]
197 fn high_priority_wait_is_shorter_than_low_priority() {
198 let limiter = AdaptiveRateLimiter::new(0.2, 1.0);
199 limiter.set_priority("high", Priority::High);
200 limiter.set_priority("low", Priority::Low);
201
202 let high_wait = limiter
203 .try_acquire("high")
204 .expect_err("high-priority call should be limited");
205 let low_wait = limiter
206 .try_acquire("low")
207 .expect_err("low-priority call should be limited");
208
209 assert!(high_wait < low_wait);
210 }
211
212 #[test]
213 fn invalid_default_limits_fall_back_to_positive_values() {
214 let limiter = AdaptiveRateLimiter::new(f64::NAN, 0.0);
215 limiter.try_acquire("tool").unwrap();
216 let wait_hint = limiter
217 .try_acquire("tool")
218 .expect_err("second immediate call should be rate-limited");
219
220 assert!(wait_hint > Duration::ZERO);
221 }
222}