Skip to main content

garmin_cli/sync/
rate_limiter.rs

1//! Shared rate limiter with semaphore-based concurrency control for parallel sync
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6use tokio::sync::{OwnedSemaphorePermit, Semaphore};
7
8/// Thread-safe rate limiter for parallel API requests
9#[derive(Clone)]
10pub struct SharedRateLimiter {
11    /// Semaphore limits concurrent requests
12    semaphore: Arc<Semaphore>,
13    /// Minimum delay between requests (2000ms = ~30 req/min)
14    min_delay: Duration,
15    /// Last request timestamp
16    last_request: Arc<Mutex<Instant>>,
17    /// Current backoff duration
18    backoff: Arc<Mutex<Duration>>,
19    /// Maximum backoff duration
20    max_backoff: Duration,
21    /// Consecutive rate limit hits (for pause detection)
22    consecutive_429s: Arc<AtomicU32>,
23}
24
25impl Default for SharedRateLimiter {
26    fn default() -> Self {
27        Self::new(3) // 3 concurrent by default
28    }
29}
30
31impl SharedRateLimiter {
32    /// Create a new shared rate limiter with given concurrency
33    pub fn new(max_concurrent: usize) -> Self {
34        Self {
35            semaphore: Arc::new(Semaphore::new(max_concurrent)),
36            min_delay: Duration::from_millis(2000),
37            last_request: Arc::new(Mutex::new(Instant::now() - Duration::from_secs(10))),
38            backoff: Arc::new(Mutex::new(Duration::from_secs(0))),
39            max_backoff: Duration::from_secs(300), // 5 minutes
40            consecutive_429s: Arc::new(AtomicU32::new(0)),
41        }
42    }
43
44    /// Acquire a permit and wait for rate limit, returns a guard
45    pub async fn acquire(&self) -> RateLimitGuard {
46        // First acquire a permit from the semaphore
47        let permit = self.semaphore.clone().acquire_owned().await.unwrap();
48
49        // Then ensure minimum delay since last request
50        let required_delay = {
51            let last = self.last_request.lock().unwrap();
52            let backoff = self.backoff.lock().unwrap();
53            let elapsed = last.elapsed();
54            let required = self.min_delay + *backoff;
55
56            if elapsed < required {
57                required - elapsed
58            } else {
59                Duration::ZERO
60            }
61        };
62
63        if !required_delay.is_zero() {
64            tokio::time::sleep(required_delay).await;
65        }
66
67        // Update last request time
68        {
69            let mut last = self.last_request.lock().unwrap();
70            *last = Instant::now();
71        }
72
73        RateLimitGuard { _permit: permit }
74    }
75
76    /// Handle a successful request - reset backoff
77    pub fn on_success(&self) {
78        let mut backoff = self.backoff.lock().unwrap();
79        *backoff = Duration::ZERO;
80        self.consecutive_429s.store(0, Ordering::Relaxed);
81    }
82
83    /// Handle a rate limit (429) response - increase backoff
84    pub fn on_rate_limit(&self) {
85        self.consecutive_429s.fetch_add(1, Ordering::Relaxed);
86        let mut backoff = self.backoff.lock().unwrap();
87        let new_backoff = (*backoff * 2).max(Duration::from_secs(1));
88        *backoff = new_backoff.min(self.max_backoff);
89    }
90
91    /// Check if we should pause sync due to repeated rate limits
92    pub fn should_pause(&self) -> bool {
93        self.consecutive_429s.load(Ordering::Relaxed) >= 5
94    }
95
96    /// Get the current backoff duration
97    pub fn current_backoff(&self) -> Duration {
98        *self.backoff.lock().unwrap()
99    }
100
101    /// Get pause duration (30 minutes after 5 consecutive 429s)
102    pub fn pause_duration(&self) -> Duration {
103        Duration::from_secs(1800)
104    }
105}
106
107/// Guard that holds a rate limit permit
108pub struct RateLimitGuard {
109    _permit: OwnedSemaphorePermit,
110}
111
112// Legacy single-threaded rate limiter for backward compatibility
113/// Rate limiter configuration (legacy, use SharedRateLimiter for parallel sync)
114pub struct RateLimiter {
115    /// Minimum delay between requests
116    min_delay: Duration,
117    /// Current backoff delay
118    backoff: Duration,
119    /// Maximum backoff delay
120    max_backoff: Duration,
121    /// Backoff multiplier
122    backoff_multiplier: f64,
123    /// Last request time
124    last_request: Option<Instant>,
125    /// Consecutive rate limit hits
126    consecutive_429s: u32,
127}
128
129impl Default for RateLimiter {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl RateLimiter {
136    /// Create a new rate limiter with conservative defaults
137    pub fn new() -> Self {
138        Self {
139            min_delay: Duration::from_millis(2000), // 30 req/min
140            backoff: Duration::from_secs(1),
141            max_backoff: Duration::from_secs(300), // 5 minutes
142            backoff_multiplier: 2.0,
143            last_request: None,
144            consecutive_429s: 0,
145        }
146    }
147
148    /// Wait before making the next request
149    pub async fn wait(&mut self) {
150        if let Some(last) = self.last_request {
151            let elapsed = last.elapsed();
152            let required_delay = self.min_delay + self.backoff;
153
154            if elapsed < required_delay {
155                let wait_time = required_delay - elapsed;
156                tokio::time::sleep(wait_time).await;
157            }
158        }
159        self.last_request = Some(Instant::now());
160    }
161
162    /// Handle a successful request
163    pub fn on_success(&mut self) {
164        self.backoff = Duration::from_secs(1);
165        self.consecutive_429s = 0;
166    }
167
168    /// Handle a rate limit (HTTP 429) response
169    pub fn on_rate_limit(&mut self) {
170        self.consecutive_429s += 1;
171        self.backoff = Duration::from_secs_f64(
172            (self.backoff.as_secs_f64() * self.backoff_multiplier)
173                .min(self.max_backoff.as_secs_f64()),
174        );
175    }
176
177    /// Check if we should pause sync due to repeated rate limits
178    pub fn should_pause(&self) -> bool {
179        self.consecutive_429s >= 5
180    }
181
182    /// Get the current backoff duration
183    pub fn current_backoff(&self) -> Duration {
184        self.backoff
185    }
186
187    /// Get pause duration (30 minutes after 5 consecutive 429s)
188    pub fn pause_duration(&self) -> Duration {
189        Duration::from_secs(1800) // 30 minutes
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196
197    #[test]
198    fn test_rate_limiter_defaults() {
199        let limiter = RateLimiter::new();
200        assert_eq!(limiter.min_delay, Duration::from_millis(2000));
201        assert_eq!(limiter.backoff, Duration::from_secs(1));
202    }
203
204    #[test]
205    fn test_exponential_backoff() {
206        let mut limiter = RateLimiter::new();
207
208        limiter.on_rate_limit();
209        assert_eq!(limiter.backoff, Duration::from_secs(2));
210
211        limiter.on_rate_limit();
212        assert_eq!(limiter.backoff, Duration::from_secs(4));
213
214        limiter.on_rate_limit();
215        assert_eq!(limiter.backoff, Duration::from_secs(8));
216    }
217
218    #[test]
219    fn test_backoff_max() {
220        let mut limiter = RateLimiter::new();
221
222        // Hit rate limit many times
223        for _ in 0..20 {
224            limiter.on_rate_limit();
225        }
226
227        // Should not exceed max_backoff
228        assert!(limiter.backoff <= limiter.max_backoff);
229    }
230
231    #[test]
232    fn test_reset_on_success() {
233        let mut limiter = RateLimiter::new();
234
235        limiter.on_rate_limit();
236        limiter.on_rate_limit();
237        assert!(limiter.backoff > Duration::from_secs(1));
238
239        limiter.on_success();
240        assert_eq!(limiter.backoff, Duration::from_secs(1));
241        assert_eq!(limiter.consecutive_429s, 0);
242    }
243
244    #[test]
245    fn test_should_pause() {
246        let mut limiter = RateLimiter::new();
247
248        for _ in 0..4 {
249            limiter.on_rate_limit();
250            assert!(!limiter.should_pause());
251        }
252
253        limiter.on_rate_limit();
254        assert!(limiter.should_pause());
255    }
256
257    #[test]
258    fn test_shared_rate_limiter() {
259        let limiter = SharedRateLimiter::new(3);
260
261        limiter.on_rate_limit();
262        assert!(!limiter.should_pause());
263
264        for _ in 0..4 {
265            limiter.on_rate_limit();
266        }
267        assert!(limiter.should_pause());
268
269        limiter.on_success();
270        assert!(!limiter.should_pause());
271    }
272}