Skip to main content

codetether_agent/swarm/
rate_limiter.rs

1//! Adaptive rate limiting for swarm operations
2//!
3//! Parses rate limit headers from provider responses and dynamically
4//! adjusts request timing to avoid hitting rate limits.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12/// Rate limit information extracted from API response headers
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct RateLimitInfo {
15    /// Number of requests remaining in the current window
16    pub remaining: Option<u32>,
17    /// Total limit for the current window
18    pub limit: Option<u32>,
19    /// Seconds until the rate limit resets
20    pub reset_after_secs: Option<u64>,
21    /// Unix timestamp when the rate limit resets
22    pub reset_at: Option<u64>,
23    /// Retry-After header value (seconds to wait before retry)
24    pub retry_after_secs: Option<u64>,
25    /// The policy string (e.g., "60:100" for 60 seconds, 100 requests)
26    pub policy: Option<String>,
27}
28
29impl RateLimitInfo {
30    /// Parse rate limit headers from a header map
31    pub fn from_headers(headers: &HashMap<String, String>) -> Self {
32        let mut info = Self::default();
33
34        for (key, value) in headers {
35            let key_lower = key.to_lowercase();
36            match key_lower.as_str() {
37                // X-RateLimit-Remaining or x-ratelimit-remaining
38                k if k.contains("ratelimit-remaining") => {
39                    info.remaining = value.parse().ok();
40                }
41                // X-RateLimit-Limit or x-ratelimit-limit
42                k if k.contains("ratelimit-limit") => {
43                    info.limit = value.parse().ok();
44                }
45                // X-RateLimit-Reset or x-ratelimit-reset (timestamp)
46                k if k.contains("ratelimit-reset") && !k.contains("after") => {
47                    info.reset_at = value.parse().ok();
48                }
49                // X-RateLimit-Reset-After or x-ratelimit-reset-after (seconds)
50                k if k.contains("ratelimit-reset-after") => {
51                    info.reset_after_secs = value.parse().ok();
52                }
53                // Retry-After
54                "retry-after" => {
55                    info.retry_after_secs = value.parse().ok();
56                }
57                // X-RateLimit-Policy
58                k if k.contains("ratelimit-policy") => {
59                    info.policy = Some(value.clone());
60                }
61                _ => {}
62            }
63        }
64
65        info
66    }
67
68    /// Check if we're approaching the rate limit (less than 20% remaining)
69    pub fn is_approaching_limit(&self) -> bool {
70        match (self.remaining, self.limit) {
71            (Some(remaining), Some(limit)) if limit > 0 => (remaining as f64 / limit as f64) < 0.2,
72            _ => false,
73        }
74    }
75
76    /// Check if we've hit the rate limit
77    pub fn is_limit_exceeded(&self) -> bool {
78        self.remaining == Some(0)
79    }
80
81    /// Calculate recommended delay before next request
82    pub fn recommended_delay(&self) -> Duration {
83        // If we have a retry-after header, use that
84        if let Some(retry_after) = self.retry_after_secs {
85            return Duration::from_secs(retry_after);
86        }
87
88        // If we're approaching limit and have reset info, calculate delay
89        if self.is_approaching_limit() {
90            if let Some(reset_after) = self.reset_after_secs {
91                // Spread requests evenly across remaining time
92                if let Some(remaining) = self.remaining
93                    && remaining > 0
94                {
95                    return Duration::from_millis((reset_after * 1000) / remaining as u64);
96                }
97                return Duration::from_secs(reset_after);
98            }
99
100            if let Some(reset_at) = self.reset_at {
101                let now = std::time::SystemTime::now()
102                    .duration_since(std::time::UNIX_EPOCH)
103                    .unwrap_or_default()
104                    .as_secs();
105                if reset_at > now {
106                    let remaining_secs = reset_at - now;
107                    if let Some(remaining) = self.remaining
108                        && remaining > 0
109                    {
110                        return Duration::from_millis((remaining_secs * 1000) / remaining as u64);
111                    }
112                    return Duration::from_secs(remaining_secs);
113                }
114            }
115        }
116
117        Duration::from_millis(0)
118    }
119}
120
121/// Statistics for rate limit tracking
122#[derive(Debug, Clone, Default, Serialize, Deserialize)]
123pub struct RateLimitStats {
124    /// Total number of requests made
125    pub total_requests: u64,
126    /// Number of 429 responses received
127    pub rate_limit_hits: u64,
128    /// Number of retries due to rate limiting
129    pub retry_count: u64,
130    /// Current estimated requests per minute
131    pub current_rpm: f64,
132    /// Average delay between requests (ms)
133    pub avg_delay_ms: u64,
134    /// Current adaptive delay (ms)
135    pub adaptive_delay_ms: u64,
136    /// Last rate limit info received
137    pub last_rate_limit_info: Option<RateLimitInfo>,
138    /// Unix timestamp of last request (seconds since epoch)
139    pub last_request_timestamp_secs: Option<u64>,
140}
141
142/// Adaptive rate limiter that adjusts based on provider responses
143#[derive(Debug, Clone)]
144pub struct AdaptiveRateLimiter {
145    /// Base delay between requests
146    base_delay_ms: u64,
147    /// Minimum delay (ms)
148    min_delay_ms: u64,
149    /// Maximum delay (ms)
150    max_delay_ms: u64,
151    /// Current adaptive delay
152    current_delay_ms: Arc<RwLock<u64>>,
153    /// Exponential backoff multiplier for 429s
154    backoff_multiplier: Arc<RwLock<f64>>,
155    /// Statistics
156    stats: Arc<RwLock<RateLimitStats>>,
157    /// Last rate limit info
158    rate_limit_info: Arc<RwLock<RateLimitInfo>>,
159    /// Request timestamps for RPM calculation
160    request_times: Arc<RwLock<Vec<Instant>>>,
161}
162
163impl AdaptiveRateLimiter {
164    /// Create a new adaptive rate limiter
165    pub fn new(base_delay_ms: u64) -> Self {
166        Self {
167            base_delay_ms,
168            min_delay_ms: 100,    // 100ms minimum
169            max_delay_ms: 60_000, // 60s maximum
170            current_delay_ms: Arc::new(RwLock::new(base_delay_ms)),
171            backoff_multiplier: Arc::new(RwLock::new(1.0)),
172            stats: Arc::new(RwLock::new(RateLimitStats::default())),
173            rate_limit_info: Arc::new(RwLock::new(RateLimitInfo::default())),
174            request_times: Arc::new(RwLock::new(Vec::new())),
175        }
176    }
177
178    /// Get the current delay
179    pub async fn current_delay(&self) -> Duration {
180        let info = self.rate_limit_info.read().await;
181        let recommended = info.recommended_delay();
182
183        // Use the larger of adaptive delay or recommended delay
184        let current = *self.current_delay_ms.read().await;
185        let delay_ms = current.max(recommended.as_millis() as u64);
186        Duration::from_millis(delay_ms.min(self.max_delay_ms).max(self.min_delay_ms))
187    }
188
189    /// Record a successful request with optional rate limit headers
190    pub async fn record_success(&self, headers: Option<&HashMap<String, String>>) {
191        let now = Instant::now();
192        let now_secs = std::time::SystemTime::now()
193            .duration_since(std::time::UNIX_EPOCH)
194            .unwrap_or_default()
195            .as_secs();
196
197        // Update rate limit info if headers provided
198        if let Some(h) = headers {
199            let info = RateLimitInfo::from_headers(h);
200            let mut rate_limit = self.rate_limit_info.write().await;
201            *rate_limit = info.clone();
202
203            // Adjust delay based on remaining requests
204            if let (Some(remaining), Some(limit)) = (info.remaining, info.limit)
205                && limit > 0
206            {
207                let ratio = remaining as f64 / limit as f64;
208                let backoff = *self.backoff_multiplier.read().await;
209                let mut new_delay = self.base_delay_ms as f64;
210
211                if ratio < 0.1 {
212                    // Less than 10% remaining - be very conservative
213                    new_delay *= 3.0;
214                } else if ratio < 0.3 {
215                    // Less than 30% remaining - be cautious
216                    new_delay *= 1.5;
217                } else if ratio > 0.5 && backoff <= 1.0 {
218                    // More than 50% remaining and no recent 429s - can be more aggressive
219                    new_delay *= 0.8;
220                }
221
222                let mut current_delay = self.current_delay_ms.write().await;
223                *current_delay = new_delay as u64;
224            }
225        }
226
227        // Update stats
228        let mut stats = self.stats.write().await;
229        stats.total_requests += 1;
230        stats.last_request_timestamp_secs = Some(now_secs);
231        let current_delay = *self.current_delay_ms.read().await;
232        stats.adaptive_delay_ms = current_delay;
233
234        // Reset backoff multiplier on success
235        let mut backoff = self.backoff_multiplier.write().await;
236        *backoff = 1.0_f64.max(*backoff * 0.9);
237
238        // Update request times for RPM calculation
239        drop(stats);
240        let mut times = self.request_times.write().await;
241        times.push(now);
242
243        // Keep only last 60 seconds of request times
244        let cutoff = now - Duration::from_secs(60);
245        times.retain(|&t| t > cutoff);
246
247        // Update RPM in stats
248        let rpm = times.len() as f64;
249        let mut stats = self.stats.write().await;
250        stats.current_rpm = rpm;
251    }
252
253    /// Record a rate limit hit (429 response)
254    pub async fn record_rate_limit_hit(&self, retry_after: Option<u64>) {
255        let mut stats = self.stats.write().await;
256        stats.rate_limit_hits += 1;
257        stats.retry_count += 1;
258        drop(stats);
259
260        // Increase backoff multiplier exponentially
261        let mut backoff = self.backoff_multiplier.write().await;
262        *backoff = (*backoff * 2.0).min(32.0);
263        let current_backoff = *backoff;
264        drop(backoff);
265
266        // Set delay based on retry-after or exponential backoff
267        let delay_ms = if let Some(retry) = retry_after {
268            retry * 1000
269        } else {
270            let base = self.base_delay_ms as f64;
271            (base * current_backoff) as u64
272        };
273
274        let mut current_delay = self.current_delay_ms.write().await;
275        *current_delay = delay_ms.min(self.max_delay_ms);
276        let delay_value = *current_delay;
277        drop(current_delay);
278
279        let mut stats = self.stats.write().await;
280        stats.adaptive_delay_ms = delay_value;
281    }
282
283    /// Record a retry attempt
284    pub async fn record_retry(&self) {
285        let mut stats = self.stats.write().await;
286        stats.retry_count += 1;
287    }
288
289    /// Get current statistics
290    pub async fn get_stats(&self) -> RateLimitStats {
291        self.stats.read().await.clone()
292    }
293
294    /// Get current rate limit info
295    pub async fn get_rate_limit_info(&self) -> RateLimitInfo {
296        self.rate_limit_info.read().await.clone()
297    }
298
299    /// Wait for the appropriate delay before making next request
300    pub async fn wait(&self) {
301        let delay = self.current_delay().await;
302        if delay > Duration::from_millis(0) {
303            tokio::time::sleep(delay).await;
304        }
305    }
306
307    /// Reset the rate limiter to initial state
308    pub async fn reset(&self) {
309        let mut current_delay = self.current_delay_ms.write().await;
310        *current_delay = self.base_delay_ms;
311        let mut backoff = self.backoff_multiplier.write().await;
312        *backoff = 1.0;
313
314        let mut stats = self.stats.write().await;
315        *stats = RateLimitStats::default();
316
317        let mut info = self.rate_limit_info.write().await;
318        *info = RateLimitInfo::default();
319
320        let mut times = self.request_times.write().await;
321        times.clear();
322    }
323}
324
325impl Default for AdaptiveRateLimiter {
326    fn default() -> Self {
327        Self::new(1000) // 1 second default base delay
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn test_rate_limit_info_from_headers() {
337        let mut headers = HashMap::new();
338        headers.insert("x-ratelimit-remaining".to_string(), "45".to_string());
339        headers.insert("x-ratelimit-limit".to_string(), "100".to_string());
340        headers.insert("x-ratelimit-reset-after".to_string(), "60".to_string());
341        headers.insert("retry-after".to_string(), "5".to_string());
342
343        let info = RateLimitInfo::from_headers(&headers);
344
345        assert_eq!(info.remaining, Some(45));
346        assert_eq!(info.limit, Some(100));
347        assert_eq!(info.reset_after_secs, Some(60));
348        assert_eq!(info.retry_after_secs, Some(5));
349    }
350
351    #[test]
352    fn test_is_approaching_limit() {
353        let info = RateLimitInfo {
354            remaining: Some(15),
355            limit: Some(100),
356            ..Default::default()
357        };
358        assert!(info.is_approaching_limit());
359
360        let info2 = RateLimitInfo {
361            remaining: Some(50),
362            limit: Some(100),
363            ..Default::default()
364        };
365        assert!(!info2.is_approaching_limit());
366    }
367
368    #[test]
369    fn test_recommended_delay_with_retry_after() {
370        let info = RateLimitInfo {
371            retry_after_secs: Some(10),
372            ..Default::default()
373        };
374        assert_eq!(info.recommended_delay(), Duration::from_secs(10));
375    }
376
377    #[test]
378    fn test_recommended_delay_when_approaching_limit() {
379        let info = RateLimitInfo {
380            remaining: Some(5),
381            limit: Some(100),
382            reset_after_secs: Some(60),
383            ..Default::default()
384        };
385        // Should spread 5 requests over 60 seconds = 12 seconds each
386        assert_eq!(info.recommended_delay(), Duration::from_secs(12));
387    }
388}