garmin_cli/sync/
rate_limiter.rs

1//! Shared rate limiter with semaphore-based concurrency control for parallel sync
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6use tokio::sync::{OwnedSemaphorePermit, Semaphore};
7
8/// Thread-safe rate limiter for parallel API requests
9#[derive(Clone)]
10pub struct SharedRateLimiter {
11    /// Semaphore limits concurrent requests
12    semaphore: Arc<Semaphore>,
13    /// Minimum delay between requests (2000ms = ~30 req/min)
14    min_delay: Duration,
15    /// Last request timestamp
16    last_request: Arc<Mutex<Instant>>,
17    /// Current backoff duration
18    backoff: Arc<Mutex<Duration>>,
19    /// Maximum backoff duration
20    max_backoff: Duration,
21    /// Consecutive rate limit hits (for pause detection)
22    consecutive_429s: Arc<AtomicU32>,
23}
24
25impl Default for SharedRateLimiter {
26    fn default() -> Self {
27        Self::new(3) // 3 concurrent by default
28    }
29}
30
31impl SharedRateLimiter {
32    /// Create a new shared rate limiter with given concurrency
33    pub fn new(max_concurrent: usize) -> Self {
34        Self {
35            semaphore: Arc::new(Semaphore::new(max_concurrent)),
36            min_delay: Duration::from_millis(2000),
37            last_request: Arc::new(Mutex::new(Instant::now() - Duration::from_secs(10))),
38            backoff: Arc::new(Mutex::new(Duration::from_secs(0))),
39            max_backoff: Duration::from_secs(300), // 5 minutes
40            consecutive_429s: Arc::new(AtomicU32::new(0)),
41        }
42    }
43
44    /// Acquire a permit and wait for rate limit, returns a guard
45    pub async fn acquire(&self) -> RateLimitGuard {
46        // First acquire a permit from the semaphore
47        let permit = self.semaphore.clone().acquire_owned().await.unwrap();
48
49        // Then ensure minimum delay since last request
50        let required_delay = {
51            let last = self.last_request.lock().unwrap();
52            let backoff = self.backoff.lock().unwrap();
53            let elapsed = last.elapsed();
54            let required = self.min_delay + *backoff;
55
56            if elapsed < required {
57                required - elapsed
58            } else {
59                Duration::ZERO
60            }
61        };
62
63        if !required_delay.is_zero() {
64            tokio::time::sleep(required_delay).await;
65        }
66
67        // Update last request time
68        {
69            let mut last = self.last_request.lock().unwrap();
70            *last = Instant::now();
71        }
72
73        RateLimitGuard { _permit: permit }
74    }
75
76    /// Handle a successful request - reset backoff
77    pub fn on_success(&self) {
78        let mut backoff = self.backoff.lock().unwrap();
79        *backoff = Duration::ZERO;
80        self.consecutive_429s.store(0, Ordering::Relaxed);
81    }
82
83    /// Handle a rate limit (429) response - increase backoff
84    pub fn on_rate_limit(&self) {
85        self.consecutive_429s.fetch_add(1, Ordering::Relaxed);
86        let mut backoff = self.backoff.lock().unwrap();
87        let new_backoff = (*backoff * 2).max(Duration::from_secs(1));
88        *backoff = new_backoff.min(self.max_backoff);
89    }
90
91    /// Check if we should pause sync due to repeated rate limits
92    pub fn should_pause(&self) -> bool {
93        self.consecutive_429s.load(Ordering::Relaxed) >= 5
94    }
95
96    /// Get the current backoff duration
97    pub fn current_backoff(&self) -> Duration {
98        *self.backoff.lock().unwrap()
99    }
100
101    /// Get pause duration (30 minutes after 5 consecutive 429s)
102    pub fn pause_duration(&self) -> Duration {
103        Duration::from_secs(1800)
104    }
105}
106
107/// Guard that holds a rate limit permit
108pub struct RateLimitGuard {
109    _permit: OwnedSemaphorePermit,
110}
111
112// Legacy single-threaded rate limiter for backward compatibility
113/// Rate limiter configuration (legacy, use SharedRateLimiter for parallel sync)
114pub struct RateLimiter {
115    /// Minimum delay between requests
116    min_delay: Duration,
117    /// Current backoff delay
118    backoff: Duration,
119    /// Maximum backoff delay
120    max_backoff: Duration,
121    /// Backoff multiplier
122    backoff_multiplier: f64,
123    /// Last request time
124    last_request: Option<Instant>,
125    /// Consecutive rate limit hits
126    consecutive_429s: u32,
127}
128
129impl Default for RateLimiter {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl RateLimiter {
136    /// Create a new rate limiter with conservative defaults
137    pub fn new() -> Self {
138        Self {
139            min_delay: Duration::from_millis(2000), // 30 req/min
140            backoff: Duration::from_secs(1),
141            max_backoff: Duration::from_secs(300), // 5 minutes
142            backoff_multiplier: 2.0,
143            last_request: None,
144            consecutive_429s: 0,
145        }
146    }
147
148    /// Wait before making the next request
149    pub async fn wait(&mut self) {
150        if let Some(last) = self.last_request {
151            let elapsed = last.elapsed();
152            let required_delay = self.min_delay + self.backoff;
153
154            if elapsed < required_delay {
155                let wait_time = required_delay - elapsed;
156                tokio::time::sleep(wait_time).await;
157            }
158        }
159        self.last_request = Some(Instant::now());
160    }
161
162    /// Handle a successful request
163    pub fn on_success(&mut self) {
164        self.backoff = Duration::from_secs(1);
165        self.consecutive_429s = 0;
166    }
167
168    /// Handle a rate limit (HTTP 429) response
169    pub fn on_rate_limit(&mut self) {
170        self.consecutive_429s += 1;
171        self.backoff = Duration::from_secs_f64(
172            (self.backoff.as_secs_f64() * self.backoff_multiplier).min(self.max_backoff.as_secs_f64()),
173        );
174    }
175
176    /// Check if we should pause sync due to repeated rate limits
177    pub fn should_pause(&self) -> bool {
178        self.consecutive_429s >= 5
179    }
180
181    /// Get the current backoff duration
182    pub fn current_backoff(&self) -> Duration {
183        self.backoff
184    }
185
186    /// Get pause duration (30 minutes after 5 consecutive 429s)
187    pub fn pause_duration(&self) -> Duration {
188        Duration::from_secs(1800) // 30 minutes
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_rate_limiter_defaults() {
198        let limiter = RateLimiter::new();
199        assert_eq!(limiter.min_delay, Duration::from_millis(2000));
200        assert_eq!(limiter.backoff, Duration::from_secs(1));
201    }
202
203    #[test]
204    fn test_exponential_backoff() {
205        let mut limiter = RateLimiter::new();
206
207        limiter.on_rate_limit();
208        assert_eq!(limiter.backoff, Duration::from_secs(2));
209
210        limiter.on_rate_limit();
211        assert_eq!(limiter.backoff, Duration::from_secs(4));
212
213        limiter.on_rate_limit();
214        assert_eq!(limiter.backoff, Duration::from_secs(8));
215    }
216
217    #[test]
218    fn test_backoff_max() {
219        let mut limiter = RateLimiter::new();
220
221        // Hit rate limit many times
222        for _ in 0..20 {
223            limiter.on_rate_limit();
224        }
225
226        // Should not exceed max_backoff
227        assert!(limiter.backoff <= limiter.max_backoff);
228    }
229
230    #[test]
231    fn test_reset_on_success() {
232        let mut limiter = RateLimiter::new();
233
234        limiter.on_rate_limit();
235        limiter.on_rate_limit();
236        assert!(limiter.backoff > Duration::from_secs(1));
237
238        limiter.on_success();
239        assert_eq!(limiter.backoff, Duration::from_secs(1));
240        assert_eq!(limiter.consecutive_429s, 0);
241    }
242
243    #[test]
244    fn test_should_pause() {
245        let mut limiter = RateLimiter::new();
246
247        for _ in 0..4 {
248            limiter.on_rate_limit();
249            assert!(!limiter.should_pause());
250        }
251
252        limiter.on_rate_limit();
253        assert!(limiter.should_pause());
254    }
255
256    #[test]
257    fn test_shared_rate_limiter() {
258        let limiter = SharedRateLimiter::new(3);
259
260        limiter.on_rate_limit();
261        assert!(!limiter.should_pause());
262
263        for _ in 0..4 {
264            limiter.on_rate_limit();
265        }
266        assert!(limiter.should_pause());
267
268        limiter.on_success();
269        assert!(!limiter.should_pause());
270    }
271}