codetether_agent/swarm/
rate_limiter.rs1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct RateLimitInfo {
15 pub remaining: Option<u32>,
17 pub limit: Option<u32>,
19 pub reset_after_secs: Option<u64>,
21 pub reset_at: Option<u64>,
23 pub retry_after_secs: Option<u64>,
25 pub policy: Option<String>,
27}
28
29impl RateLimitInfo {
30 pub fn from_headers(headers: &HashMap<String, String>) -> Self {
32 let mut info = Self::default();
33
34 for (key, value) in headers {
35 let key_lower = key.to_lowercase();
36 match key_lower.as_str() {
37 k if k.contains("ratelimit-remaining") => {
39 info.remaining = value.parse().ok();
40 }
41 k if k.contains("ratelimit-limit") => {
43 info.limit = value.parse().ok();
44 }
45 k if k.contains("ratelimit-reset") && !k.contains("after") => {
47 info.reset_at = value.parse().ok();
48 }
49 k if k.contains("ratelimit-reset-after") => {
51 info.reset_after_secs = value.parse().ok();
52 }
53 "retry-after" => {
55 info.retry_after_secs = value.parse().ok();
56 }
57 k if k.contains("ratelimit-policy") => {
59 info.policy = Some(value.clone());
60 }
61 _ => {}
62 }
63 }
64
65 info
66 }
67
68 pub fn is_approaching_limit(&self) -> bool {
70 match (self.remaining, self.limit) {
71 (Some(remaining), Some(limit)) if limit > 0 => (remaining as f64 / limit as f64) < 0.2,
72 _ => false,
73 }
74 }
75
76 pub fn is_limit_exceeded(&self) -> bool {
78 self.remaining == Some(0)
79 }
80
81 pub fn recommended_delay(&self) -> Duration {
83 if let Some(retry_after) = self.retry_after_secs {
85 return Duration::from_secs(retry_after);
86 }
87
88 if self.is_approaching_limit() {
90 if let Some(reset_after) = self.reset_after_secs {
91 if let Some(remaining) = self.remaining
93 && remaining > 0
94 {
95 return Duration::from_millis((reset_after * 1000) / remaining as u64);
96 }
97 return Duration::from_secs(reset_after);
98 }
99
100 if let Some(reset_at) = self.reset_at {
101 let now = std::time::SystemTime::now()
102 .duration_since(std::time::UNIX_EPOCH)
103 .unwrap_or_default()
104 .as_secs();
105 if reset_at > now {
106 let remaining_secs = reset_at - now;
107 if let Some(remaining) = self.remaining
108 && remaining > 0
109 {
110 return Duration::from_millis((remaining_secs * 1000) / remaining as u64);
111 }
112 return Duration::from_secs(remaining_secs);
113 }
114 }
115 }
116
117 Duration::from_millis(0)
118 }
119}
120
121#[derive(Debug, Clone, Default, Serialize, Deserialize)]
123pub struct RateLimitStats {
124 pub total_requests: u64,
126 pub rate_limit_hits: u64,
128 pub retry_count: u64,
130 pub current_rpm: f64,
132 pub avg_delay_ms: u64,
134 pub adaptive_delay_ms: u64,
136 pub last_rate_limit_info: Option<RateLimitInfo>,
138 pub last_request_timestamp_secs: Option<u64>,
140}
141
142#[derive(Debug, Clone)]
144pub struct AdaptiveRateLimiter {
145 base_delay_ms: u64,
147 min_delay_ms: u64,
149 max_delay_ms: u64,
151 current_delay_ms: Arc<RwLock<u64>>,
153 backoff_multiplier: Arc<RwLock<f64>>,
155 stats: Arc<RwLock<RateLimitStats>>,
157 rate_limit_info: Arc<RwLock<RateLimitInfo>>,
159 request_times: Arc<RwLock<Vec<Instant>>>,
161}
162
163impl AdaptiveRateLimiter {
164 pub fn new(base_delay_ms: u64) -> Self {
166 Self {
167 base_delay_ms,
168 min_delay_ms: 100, max_delay_ms: 60_000, current_delay_ms: Arc::new(RwLock::new(base_delay_ms)),
171 backoff_multiplier: Arc::new(RwLock::new(1.0)),
172 stats: Arc::new(RwLock::new(RateLimitStats::default())),
173 rate_limit_info: Arc::new(RwLock::new(RateLimitInfo::default())),
174 request_times: Arc::new(RwLock::new(Vec::new())),
175 }
176 }
177
178 pub async fn current_delay(&self) -> Duration {
180 let info = self.rate_limit_info.read().await;
181 let recommended = info.recommended_delay();
182
183 let current = *self.current_delay_ms.read().await;
185 let delay_ms = current.max(recommended.as_millis() as u64);
186 Duration::from_millis(delay_ms.min(self.max_delay_ms).max(self.min_delay_ms))
187 }
188
189 pub async fn record_success(&self, headers: Option<&HashMap<String, String>>) {
191 let now = Instant::now();
192 let now_secs = std::time::SystemTime::now()
193 .duration_since(std::time::UNIX_EPOCH)
194 .unwrap_or_default()
195 .as_secs();
196
197 if let Some(h) = headers {
199 let info = RateLimitInfo::from_headers(h);
200 let mut rate_limit = self.rate_limit_info.write().await;
201 *rate_limit = info.clone();
202
203 if let (Some(remaining), Some(limit)) = (info.remaining, info.limit)
205 && limit > 0
206 {
207 let ratio = remaining as f64 / limit as f64;
208 let backoff = *self.backoff_multiplier.read().await;
209 let mut new_delay = self.base_delay_ms as f64;
210
211 if ratio < 0.1 {
212 new_delay *= 3.0;
214 } else if ratio < 0.3 {
215 new_delay *= 1.5;
217 } else if ratio > 0.5 && backoff <= 1.0 {
218 new_delay *= 0.8;
220 }
221
222 let mut current_delay = self.current_delay_ms.write().await;
223 *current_delay = new_delay as u64;
224 }
225 }
226
227 let mut stats = self.stats.write().await;
229 stats.total_requests += 1;
230 stats.last_request_timestamp_secs = Some(now_secs);
231 let current_delay = *self.current_delay_ms.read().await;
232 stats.adaptive_delay_ms = current_delay;
233
234 let mut backoff = self.backoff_multiplier.write().await;
236 *backoff = 1.0_f64.max(*backoff * 0.9);
237
238 drop(stats);
240 let mut times = self.request_times.write().await;
241 times.push(now);
242
243 let cutoff = now - Duration::from_secs(60);
245 times.retain(|&t| t > cutoff);
246
247 let rpm = times.len() as f64;
249 let mut stats = self.stats.write().await;
250 stats.current_rpm = rpm;
251 }
252
253 pub async fn record_rate_limit_hit(&self, retry_after: Option<u64>) {
255 let mut stats = self.stats.write().await;
256 stats.rate_limit_hits += 1;
257 stats.retry_count += 1;
258 drop(stats);
259
260 let mut backoff = self.backoff_multiplier.write().await;
262 *backoff = (*backoff * 2.0).min(32.0);
263 let current_backoff = *backoff;
264 drop(backoff);
265
266 let delay_ms = if let Some(retry) = retry_after {
268 retry * 1000
269 } else {
270 let base = self.base_delay_ms as f64;
271 (base * current_backoff) as u64
272 };
273
274 let mut current_delay = self.current_delay_ms.write().await;
275 *current_delay = delay_ms.min(self.max_delay_ms);
276 let delay_value = *current_delay;
277 drop(current_delay);
278
279 let mut stats = self.stats.write().await;
280 stats.adaptive_delay_ms = delay_value;
281 }
282
283 pub async fn record_retry(&self) {
285 let mut stats = self.stats.write().await;
286 stats.retry_count += 1;
287 }
288
289 pub async fn get_stats(&self) -> RateLimitStats {
291 self.stats.read().await.clone()
292 }
293
294 pub async fn get_rate_limit_info(&self) -> RateLimitInfo {
296 self.rate_limit_info.read().await.clone()
297 }
298
299 pub async fn wait(&self) {
301 let delay = self.current_delay().await;
302 if delay > Duration::from_millis(0) {
303 tokio::time::sleep(delay).await;
304 }
305 }
306
307 pub async fn reset(&self) {
309 let mut current_delay = self.current_delay_ms.write().await;
310 *current_delay = self.base_delay_ms;
311 let mut backoff = self.backoff_multiplier.write().await;
312 *backoff = 1.0;
313
314 let mut stats = self.stats.write().await;
315 *stats = RateLimitStats::default();
316
317 let mut info = self.rate_limit_info.write().await;
318 *info = RateLimitInfo::default();
319
320 let mut times = self.request_times.write().await;
321 times.clear();
322 }
323}
324
325impl Default for AdaptiveRateLimiter {
326 fn default() -> Self {
327 Self::new(1000) }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_rate_limit_info_from_headers() {
337 let mut headers = HashMap::new();
338 headers.insert("x-ratelimit-remaining".to_string(), "45".to_string());
339 headers.insert("x-ratelimit-limit".to_string(), "100".to_string());
340 headers.insert("x-ratelimit-reset-after".to_string(), "60".to_string());
341 headers.insert("retry-after".to_string(), "5".to_string());
342
343 let info = RateLimitInfo::from_headers(&headers);
344
345 assert_eq!(info.remaining, Some(45));
346 assert_eq!(info.limit, Some(100));
347 assert_eq!(info.reset_after_secs, Some(60));
348 assert_eq!(info.retry_after_secs, Some(5));
349 }
350
351 #[test]
352 fn test_is_approaching_limit() {
353 let info = RateLimitInfo {
354 remaining: Some(15),
355 limit: Some(100),
356 ..Default::default()
357 };
358 assert!(info.is_approaching_limit());
359
360 let info2 = RateLimitInfo {
361 remaining: Some(50),
362 limit: Some(100),
363 ..Default::default()
364 };
365 assert!(!info2.is_approaching_limit());
366 }
367
368 #[test]
369 fn test_recommended_delay_with_retry_after() {
370 let info = RateLimitInfo {
371 retry_after_secs: Some(10),
372 ..Default::default()
373 };
374 assert_eq!(info.recommended_delay(), Duration::from_secs(10));
375 }
376
377 #[test]
378 fn test_recommended_delay_when_approaching_limit() {
379 let info = RateLimitInfo {
380 remaining: Some(5),
381 limit: Some(100),
382 reset_after_secs: Some(60),
383 ..Default::default()
384 };
385 assert_eq!(info.recommended_delay(), Duration::from_secs(12));
387 }
388}