codetether_agent/swarm/
rate_limiter.rs1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct RateLimitInfo {
15 pub remaining: Option<u32>,
17 pub limit: Option<u32>,
19 pub reset_after_secs: Option<u64>,
21 pub reset_at: Option<u64>,
23 pub retry_after_secs: Option<u64>,
25 pub policy: Option<String>,
27}
28
29impl RateLimitInfo {
30 pub fn from_headers(headers: &HashMap<String, String>) -> Self {
32 let mut info = Self::default();
33
34 for (key, value) in headers {
35 let key_lower = key.to_lowercase();
36 match key_lower.as_str() {
37 k if k.contains("ratelimit-remaining") => {
39 info.remaining = value.parse().ok();
40 }
41 k if k.contains("ratelimit-limit") => {
43 info.limit = value.parse().ok();
44 }
45 k if k.contains("ratelimit-reset") && !k.contains("after") => {
47 info.reset_at = value.parse().ok();
48 }
49 k if k.contains("ratelimit-reset-after") => {
51 info.reset_after_secs = value.parse().ok();
52 }
53 "retry-after" => {
55 info.retry_after_secs = value.parse().ok();
56 }
57 k if k.contains("ratelimit-policy") => {
59 info.policy = Some(value.clone());
60 }
61 _ => {}
62 }
63 }
64
65 info
66 }
67
68 pub fn is_approaching_limit(&self) -> bool {
70 match (self.remaining, self.limit) {
71 (Some(remaining), Some(limit)) if limit > 0 => (remaining as f64 / limit as f64) < 0.2,
72 _ => false,
73 }
74 }
75
76 pub fn is_limit_exceeded(&self) -> bool {
78 self.remaining == Some(0)
79 }
80
81 pub fn recommended_delay(&self) -> Duration {
83 if let Some(retry_after) = self.retry_after_secs {
85 return Duration::from_secs(retry_after);
86 }
87
88 if self.is_approaching_limit() {
90 if let Some(reset_after) = self.reset_after_secs {
91 if let Some(remaining) = self.remaining {
93 if remaining > 0 {
94 return Duration::from_millis((reset_after * 1000) / remaining as u64);
95 }
96 }
97 return Duration::from_secs(reset_after);
98 }
99
100 if let Some(reset_at) = self.reset_at {
101 let now = std::time::SystemTime::now()
102 .duration_since(std::time::UNIX_EPOCH)
103 .unwrap_or_default()
104 .as_secs();
105 if reset_at > now {
106 let remaining_secs = reset_at - now;
107 if let Some(remaining) = self.remaining {
108 if remaining > 0 {
109 return Duration::from_millis(
110 (remaining_secs * 1000) / remaining as u64,
111 );
112 }
113 }
114 return Duration::from_secs(remaining_secs);
115 }
116 }
117 }
118
119 Duration::from_millis(0)
120 }
121}
122
123#[derive(Debug, Clone, Default, Serialize, Deserialize)]
125pub struct RateLimitStats {
126 pub total_requests: u64,
128 pub rate_limit_hits: u64,
130 pub retry_count: u64,
132 pub current_rpm: f64,
134 pub avg_delay_ms: u64,
136 pub adaptive_delay_ms: u64,
138 pub last_rate_limit_info: Option<RateLimitInfo>,
140 pub last_request_timestamp_secs: Option<u64>,
142}
143
144#[derive(Debug, Clone)]
146pub struct AdaptiveRateLimiter {
147 base_delay_ms: u64,
149 min_delay_ms: u64,
151 max_delay_ms: u64,
153 current_delay_ms: Arc<RwLock<u64>>,
155 backoff_multiplier: Arc<RwLock<f64>>,
157 stats: Arc<RwLock<RateLimitStats>>,
159 rate_limit_info: Arc<RwLock<RateLimitInfo>>,
161 request_times: Arc<RwLock<Vec<Instant>>>,
163}
164
165impl AdaptiveRateLimiter {
166 pub fn new(base_delay_ms: u64) -> Self {
168 Self {
169 base_delay_ms,
170 min_delay_ms: 100, max_delay_ms: 60_000, current_delay_ms: Arc::new(RwLock::new(base_delay_ms)),
173 backoff_multiplier: Arc::new(RwLock::new(1.0)),
174 stats: Arc::new(RwLock::new(RateLimitStats::default())),
175 rate_limit_info: Arc::new(RwLock::new(RateLimitInfo::default())),
176 request_times: Arc::new(RwLock::new(Vec::new())),
177 }
178 }
179
180 pub async fn current_delay(&self) -> Duration {
182 let info = self.rate_limit_info.read().await;
183 let recommended = info.recommended_delay();
184
185 let current = *self.current_delay_ms.read().await;
187 let delay_ms = current.max(recommended.as_millis() as u64);
188 Duration::from_millis(delay_ms.min(self.max_delay_ms).max(self.min_delay_ms))
189 }
190
191 pub async fn record_success(&self, headers: Option<&HashMap<String, String>>) {
193 let now = Instant::now();
194 let now_secs = std::time::SystemTime::now()
195 .duration_since(std::time::UNIX_EPOCH)
196 .unwrap_or_default()
197 .as_secs();
198
199 if let Some(h) = headers {
201 let info = RateLimitInfo::from_headers(h);
202 let mut rate_limit = self.rate_limit_info.write().await;
203 *rate_limit = info.clone();
204
205 if let (Some(remaining), Some(limit)) = (info.remaining, info.limit) {
207 if limit > 0 {
208 let ratio = remaining as f64 / limit as f64;
209 let backoff = *self.backoff_multiplier.read().await;
210 let mut new_delay = self.base_delay_ms as f64;
211
212 if ratio < 0.1 {
213 new_delay *= 3.0;
215 } else if ratio < 0.3 {
216 new_delay *= 1.5;
218 } else if ratio > 0.5 && backoff <= 1.0 {
219 new_delay *= 0.8;
221 }
222
223 let mut current_delay = self.current_delay_ms.write().await;
224 *current_delay = new_delay as u64;
225 }
226 }
227 }
228
229 let mut stats = self.stats.write().await;
231 stats.total_requests += 1;
232 stats.last_request_timestamp_secs = Some(now_secs);
233 let current_delay = *self.current_delay_ms.read().await;
234 stats.adaptive_delay_ms = current_delay;
235
236 let mut backoff = self.backoff_multiplier.write().await;
238 *backoff = 1.0_f64.max(*backoff * 0.9);
239
240 drop(stats);
242 let mut times = self.request_times.write().await;
243 times.push(now);
244
245 let cutoff = now - Duration::from_secs(60);
247 times.retain(|&t| t > cutoff);
248
249 let rpm = times.len() as f64;
251 let mut stats = self.stats.write().await;
252 stats.current_rpm = rpm;
253 }
254
255 pub async fn record_rate_limit_hit(&self, retry_after: Option<u64>) {
257 let mut stats = self.stats.write().await;
258 stats.rate_limit_hits += 1;
259 stats.retry_count += 1;
260 drop(stats);
261
262 let mut backoff = self.backoff_multiplier.write().await;
264 *backoff = (*backoff * 2.0).min(32.0);
265 let current_backoff = *backoff;
266 drop(backoff);
267
268 let delay_ms = if let Some(retry) = retry_after {
270 retry * 1000
271 } else {
272 let base = self.base_delay_ms as f64;
273 (base * current_backoff) as u64
274 };
275
276 let mut current_delay = self.current_delay_ms.write().await;
277 *current_delay = delay_ms.min(self.max_delay_ms);
278 let delay_value = *current_delay;
279 drop(current_delay);
280
281 let mut stats = self.stats.write().await;
282 stats.adaptive_delay_ms = delay_value;
283 }
284
285 pub async fn record_retry(&self) {
287 let mut stats = self.stats.write().await;
288 stats.retry_count += 1;
289 }
290
291 pub async fn get_stats(&self) -> RateLimitStats {
293 self.stats.read().await.clone()
294 }
295
296 pub async fn get_rate_limit_info(&self) -> RateLimitInfo {
298 self.rate_limit_info.read().await.clone()
299 }
300
301 pub async fn wait(&self) {
303 let delay = self.current_delay().await;
304 if delay > Duration::from_millis(0) {
305 tokio::time::sleep(delay).await;
306 }
307 }
308
309 pub async fn reset(&self) {
311 let mut current_delay = self.current_delay_ms.write().await;
312 *current_delay = self.base_delay_ms;
313 let mut backoff = self.backoff_multiplier.write().await;
314 *backoff = 1.0;
315
316 let mut stats = self.stats.write().await;
317 *stats = RateLimitStats::default();
318
319 let mut info = self.rate_limit_info.write().await;
320 *info = RateLimitInfo::default();
321
322 let mut times = self.request_times.write().await;
323 times.clear();
324 }
325}
326
327impl Default for AdaptiveRateLimiter {
328 fn default() -> Self {
329 Self::new(1000) }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_rate_limit_info_from_headers() {
339 let mut headers = HashMap::new();
340 headers.insert("x-ratelimit-remaining".to_string(), "45".to_string());
341 headers.insert("x-ratelimit-limit".to_string(), "100".to_string());
342 headers.insert("x-ratelimit-reset-after".to_string(), "60".to_string());
343 headers.insert("retry-after".to_string(), "5".to_string());
344
345 let info = RateLimitInfo::from_headers(&headers);
346
347 assert_eq!(info.remaining, Some(45));
348 assert_eq!(info.limit, Some(100));
349 assert_eq!(info.reset_after_secs, Some(60));
350 assert_eq!(info.retry_after_secs, Some(5));
351 }
352
353 #[test]
354 fn test_is_approaching_limit() {
355 let info = RateLimitInfo {
356 remaining: Some(15),
357 limit: Some(100),
358 ..Default::default()
359 };
360 assert!(info.is_approaching_limit());
361
362 let info2 = RateLimitInfo {
363 remaining: Some(50),
364 limit: Some(100),
365 ..Default::default()
366 };
367 assert!(!info2.is_approaching_limit());
368 }
369
370 #[test]
371 fn test_recommended_delay_with_retry_after() {
372 let info = RateLimitInfo {
373 retry_after_secs: Some(10),
374 ..Default::default()
375 };
376 assert_eq!(info.recommended_delay(), Duration::from_secs(10));
377 }
378
379 #[test]
380 fn test_recommended_delay_when_approaching_limit() {
381 let info = RateLimitInfo {
382 remaining: Some(5),
383 limit: Some(100),
384 reset_after_secs: Some(60),
385 ..Default::default()
386 };
387 assert_eq!(info.recommended_delay(), Duration::from_secs(12));
389 }
390}