Skip to main content

codetether_agent/swarm/
rate_limiter.rs

1//! Adaptive rate limiting for swarm operations
2//!
3//! Parses rate limit headers from provider responses and dynamically
4//! adjusts request timing to avoid hitting rate limits.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12/// Rate limit information extracted from API response headers
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct RateLimitInfo {
15    /// Number of requests remaining in the current window
16    pub remaining: Option<u32>,
17    /// Total limit for the current window
18    pub limit: Option<u32>,
19    /// Seconds until the rate limit resets
20    pub reset_after_secs: Option<u64>,
21    /// Unix timestamp when the rate limit resets
22    pub reset_at: Option<u64>,
23    /// Retry-After header value (seconds to wait before retry)
24    pub retry_after_secs: Option<u64>,
25    /// The policy string (e.g., "60:100" for 60 seconds, 100 requests)
26    pub policy: Option<String>,
27}
28
29impl RateLimitInfo {
30    /// Parse rate limit headers from a header map
31    pub fn from_headers(headers: &HashMap<String, String>) -> Self {
32        let mut info = Self::default();
33
34        for (key, value) in headers {
35            let key_lower = key.to_lowercase();
36            match key_lower.as_str() {
37                // X-RateLimit-Remaining or x-ratelimit-remaining
38                k if k.contains("ratelimit-remaining") => {
39                    info.remaining = value.parse().ok();
40                }
41                // X-RateLimit-Limit or x-ratelimit-limit
42                k if k.contains("ratelimit-limit") => {
43                    info.limit = value.parse().ok();
44                }
45                // X-RateLimit-Reset or x-ratelimit-reset (timestamp)
46                k if k.contains("ratelimit-reset") && !k.contains("after") => {
47                    info.reset_at = value.parse().ok();
48                }
49                // X-RateLimit-Reset-After or x-ratelimit-reset-after (seconds)
50                k if k.contains("ratelimit-reset-after") => {
51                    info.reset_after_secs = value.parse().ok();
52                }
53                // Retry-After
54                "retry-after" => {
55                    info.retry_after_secs = value.parse().ok();
56                }
57                // X-RateLimit-Policy
58                k if k.contains("ratelimit-policy") => {
59                    info.policy = Some(value.clone());
60                }
61                _ => {}
62            }
63        }
64
65        info
66    }
67
68    /// Check if we're approaching the rate limit (less than 20% remaining)
69    pub fn is_approaching_limit(&self) -> bool {
70        match (self.remaining, self.limit) {
71            (Some(remaining), Some(limit)) if limit > 0 => (remaining as f64 / limit as f64) < 0.2,
72            _ => false,
73        }
74    }
75
76    /// Check if we've hit the rate limit
77    pub fn is_limit_exceeded(&self) -> bool {
78        self.remaining == Some(0)
79    }
80
81    /// Calculate recommended delay before next request
82    pub fn recommended_delay(&self) -> Duration {
83        // If we have a retry-after header, use that
84        if let Some(retry_after) = self.retry_after_secs {
85            return Duration::from_secs(retry_after);
86        }
87
88        // If we're approaching limit and have reset info, calculate delay
89        if self.is_approaching_limit() {
90            if let Some(reset_after) = self.reset_after_secs {
91                // Spread requests evenly across remaining time
92                if let Some(remaining) = self.remaining {
93                    if remaining > 0 {
94                        return Duration::from_millis((reset_after * 1000) / remaining as u64);
95                    }
96                }
97                return Duration::from_secs(reset_after);
98            }
99
100            if let Some(reset_at) = self.reset_at {
101                let now = std::time::SystemTime::now()
102                    .duration_since(std::time::UNIX_EPOCH)
103                    .unwrap_or_default()
104                    .as_secs();
105                if reset_at > now {
106                    let remaining_secs = reset_at - now;
107                    if let Some(remaining) = self.remaining {
108                        if remaining > 0 {
109                            return Duration::from_millis(
110                                (remaining_secs * 1000) / remaining as u64,
111                            );
112                        }
113                    }
114                    return Duration::from_secs(remaining_secs);
115                }
116            }
117        }
118
119        Duration::from_millis(0)
120    }
121}
122
123/// Statistics for rate limit tracking
124#[derive(Debug, Clone, Default, Serialize, Deserialize)]
125pub struct RateLimitStats {
126    /// Total number of requests made
127    pub total_requests: u64,
128    /// Number of 429 responses received
129    pub rate_limit_hits: u64,
130    /// Number of retries due to rate limiting
131    pub retry_count: u64,
132    /// Current estimated requests per minute
133    pub current_rpm: f64,
134    /// Average delay between requests (ms)
135    pub avg_delay_ms: u64,
136    /// Current adaptive delay (ms)
137    pub adaptive_delay_ms: u64,
138    /// Last rate limit info received
139    pub last_rate_limit_info: Option<RateLimitInfo>,
140    /// Unix timestamp of last request (seconds since epoch)
141    pub last_request_timestamp_secs: Option<u64>,
142}
143
144/// Adaptive rate limiter that adjusts based on provider responses
145#[derive(Debug, Clone)]
146pub struct AdaptiveRateLimiter {
147    /// Base delay between requests
148    base_delay_ms: u64,
149    /// Minimum delay (ms)
150    min_delay_ms: u64,
151    /// Maximum delay (ms)
152    max_delay_ms: u64,
153    /// Current adaptive delay
154    current_delay_ms: Arc<RwLock<u64>>,
155    /// Exponential backoff multiplier for 429s
156    backoff_multiplier: Arc<RwLock<f64>>,
157    /// Statistics
158    stats: Arc<RwLock<RateLimitStats>>,
159    /// Last rate limit info
160    rate_limit_info: Arc<RwLock<RateLimitInfo>>,
161    /// Request timestamps for RPM calculation
162    request_times: Arc<RwLock<Vec<Instant>>>,
163}
164
165impl AdaptiveRateLimiter {
166    /// Create a new adaptive rate limiter
167    pub fn new(base_delay_ms: u64) -> Self {
168        Self {
169            base_delay_ms,
170            min_delay_ms: 100,    // 100ms minimum
171            max_delay_ms: 60_000, // 60s maximum
172            current_delay_ms: Arc::new(RwLock::new(base_delay_ms)),
173            backoff_multiplier: Arc::new(RwLock::new(1.0)),
174            stats: Arc::new(RwLock::new(RateLimitStats::default())),
175            rate_limit_info: Arc::new(RwLock::new(RateLimitInfo::default())),
176            request_times: Arc::new(RwLock::new(Vec::new())),
177        }
178    }
179
180    /// Get the current delay
181    pub async fn current_delay(&self) -> Duration {
182        let info = self.rate_limit_info.read().await;
183        let recommended = info.recommended_delay();
184
185        // Use the larger of adaptive delay or recommended delay
186        let current = *self.current_delay_ms.read().await;
187        let delay_ms = current.max(recommended.as_millis() as u64);
188        Duration::from_millis(delay_ms.min(self.max_delay_ms).max(self.min_delay_ms))
189    }
190
191    /// Record a successful request with optional rate limit headers
192    pub async fn record_success(&self, headers: Option<&HashMap<String, String>>) {
193        let now = Instant::now();
194        let now_secs = std::time::SystemTime::now()
195            .duration_since(std::time::UNIX_EPOCH)
196            .unwrap_or_default()
197            .as_secs();
198
199        // Update rate limit info if headers provided
200        if let Some(h) = headers {
201            let info = RateLimitInfo::from_headers(h);
202            let mut rate_limit = self.rate_limit_info.write().await;
203            *rate_limit = info.clone();
204
205            // Adjust delay based on remaining requests
206            if let (Some(remaining), Some(limit)) = (info.remaining, info.limit) {
207                if limit > 0 {
208                    let ratio = remaining as f64 / limit as f64;
209                    let backoff = *self.backoff_multiplier.read().await;
210                    let mut new_delay = self.base_delay_ms as f64;
211
212                    if ratio < 0.1 {
213                        // Less than 10% remaining - be very conservative
214                        new_delay *= 3.0;
215                    } else if ratio < 0.3 {
216                        // Less than 30% remaining - be cautious
217                        new_delay *= 1.5;
218                    } else if ratio > 0.5 && backoff <= 1.0 {
219                        // More than 50% remaining and no recent 429s - can be more aggressive
220                        new_delay *= 0.8;
221                    }
222
223                    let mut current_delay = self.current_delay_ms.write().await;
224                    *current_delay = new_delay as u64;
225                }
226            }
227        }
228
229        // Update stats
230        let mut stats = self.stats.write().await;
231        stats.total_requests += 1;
232        stats.last_request_timestamp_secs = Some(now_secs);
233        let current_delay = *self.current_delay_ms.read().await;
234        stats.adaptive_delay_ms = current_delay;
235
236        // Reset backoff multiplier on success
237        let mut backoff = self.backoff_multiplier.write().await;
238        *backoff = 1.0_f64.max(*backoff * 0.9);
239
240        // Update request times for RPM calculation
241        drop(stats);
242        let mut times = self.request_times.write().await;
243        times.push(now);
244
245        // Keep only last 60 seconds of request times
246        let cutoff = now - Duration::from_secs(60);
247        times.retain(|&t| t > cutoff);
248
249        // Update RPM in stats
250        let rpm = times.len() as f64;
251        let mut stats = self.stats.write().await;
252        stats.current_rpm = rpm;
253    }
254
255    /// Record a rate limit hit (429 response)
256    pub async fn record_rate_limit_hit(&self, retry_after: Option<u64>) {
257        let mut stats = self.stats.write().await;
258        stats.rate_limit_hits += 1;
259        stats.retry_count += 1;
260        drop(stats);
261
262        // Increase backoff multiplier exponentially
263        let mut backoff = self.backoff_multiplier.write().await;
264        *backoff = (*backoff * 2.0).min(32.0);
265        let current_backoff = *backoff;
266        drop(backoff);
267
268        // Set delay based on retry-after or exponential backoff
269        let delay_ms = if let Some(retry) = retry_after {
270            retry * 1000
271        } else {
272            let base = self.base_delay_ms as f64;
273            (base * current_backoff) as u64
274        };
275
276        let mut current_delay = self.current_delay_ms.write().await;
277        *current_delay = delay_ms.min(self.max_delay_ms);
278        let delay_value = *current_delay;
279        drop(current_delay);
280
281        let mut stats = self.stats.write().await;
282        stats.adaptive_delay_ms = delay_value;
283    }
284
285    /// Record a retry attempt
286    pub async fn record_retry(&self) {
287        let mut stats = self.stats.write().await;
288        stats.retry_count += 1;
289    }
290
291    /// Get current statistics
292    pub async fn get_stats(&self) -> RateLimitStats {
293        self.stats.read().await.clone()
294    }
295
296    /// Get current rate limit info
297    pub async fn get_rate_limit_info(&self) -> RateLimitInfo {
298        self.rate_limit_info.read().await.clone()
299    }
300
301    /// Wait for the appropriate delay before making next request
302    pub async fn wait(&self) {
303        let delay = self.current_delay().await;
304        if delay > Duration::from_millis(0) {
305            tokio::time::sleep(delay).await;
306        }
307    }
308
309    /// Reset the rate limiter to initial state
310    pub async fn reset(&self) {
311        let mut current_delay = self.current_delay_ms.write().await;
312        *current_delay = self.base_delay_ms;
313        let mut backoff = self.backoff_multiplier.write().await;
314        *backoff = 1.0;
315
316        let mut stats = self.stats.write().await;
317        *stats = RateLimitStats::default();
318
319        let mut info = self.rate_limit_info.write().await;
320        *info = RateLimitInfo::default();
321
322        let mut times = self.request_times.write().await;
323        times.clear();
324    }
325}
326
327impl Default for AdaptiveRateLimiter {
328    fn default() -> Self {
329        Self::new(1000) // 1 second default base delay
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_rate_limit_info_from_headers() {
339        let mut headers = HashMap::new();
340        headers.insert("x-ratelimit-remaining".to_string(), "45".to_string());
341        headers.insert("x-ratelimit-limit".to_string(), "100".to_string());
342        headers.insert("x-ratelimit-reset-after".to_string(), "60".to_string());
343        headers.insert("retry-after".to_string(), "5".to_string());
344
345        let info = RateLimitInfo::from_headers(&headers);
346
347        assert_eq!(info.remaining, Some(45));
348        assert_eq!(info.limit, Some(100));
349        assert_eq!(info.reset_after_secs, Some(60));
350        assert_eq!(info.retry_after_secs, Some(5));
351    }
352
353    #[test]
354    fn test_is_approaching_limit() {
355        let info = RateLimitInfo {
356            remaining: Some(15),
357            limit: Some(100),
358            ..Default::default()
359        };
360        assert!(info.is_approaching_limit());
361
362        let info2 = RateLimitInfo {
363            remaining: Some(50),
364            limit: Some(100),
365            ..Default::default()
366        };
367        assert!(!info2.is_approaching_limit());
368    }
369
370    #[test]
371    fn test_recommended_delay_with_retry_after() {
372        let info = RateLimitInfo {
373            retry_after_secs: Some(10),
374            ..Default::default()
375        };
376        assert_eq!(info.recommended_delay(), Duration::from_secs(10));
377    }
378
379    #[test]
380    fn test_recommended_delay_when_approaching_limit() {
381        let info = RateLimitInfo {
382            remaining: Some(5),
383            limit: Some(100),
384            reset_after_secs: Some(60),
385            ..Default::default()
386        };
387        // Should spread 5 requests over 60 seconds = 12 seconds each
388        assert_eq!(info.recommended_delay(), Duration::from_secs(12));
389    }
390}