garmin_cli/sync/
rate_limiter.rs1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6use tokio::sync::{OwnedSemaphorePermit, Semaphore};
7
8#[derive(Clone)]
10pub struct SharedRateLimiter {
11 semaphore: Arc<Semaphore>,
13 min_delay: Duration,
15 last_request: Arc<Mutex<Instant>>,
17 backoff: Arc<Mutex<Duration>>,
19 max_backoff: Duration,
21 consecutive_429s: Arc<AtomicU32>,
23}
24
25impl Default for SharedRateLimiter {
26 fn default() -> Self {
27 Self::new(3) }
29}
30
31impl SharedRateLimiter {
32 pub fn new(max_concurrent: usize) -> Self {
34 Self {
35 semaphore: Arc::new(Semaphore::new(max_concurrent)),
36 min_delay: Duration::from_millis(2000),
37 last_request: Arc::new(Mutex::new(Instant::now() - Duration::from_secs(10))),
38 backoff: Arc::new(Mutex::new(Duration::from_secs(0))),
39 max_backoff: Duration::from_secs(300), consecutive_429s: Arc::new(AtomicU32::new(0)),
41 }
42 }
43
44 pub async fn acquire(&self) -> RateLimitGuard {
46 let permit = self.semaphore.clone().acquire_owned().await.unwrap();
48
49 let required_delay = {
51 let last = self.last_request.lock().unwrap();
52 let backoff = self.backoff.lock().unwrap();
53 let elapsed = last.elapsed();
54 let required = self.min_delay + *backoff;
55
56 if elapsed < required {
57 required - elapsed
58 } else {
59 Duration::ZERO
60 }
61 };
62
63 if !required_delay.is_zero() {
64 tokio::time::sleep(required_delay).await;
65 }
66
67 {
69 let mut last = self.last_request.lock().unwrap();
70 *last = Instant::now();
71 }
72
73 RateLimitGuard { _permit: permit }
74 }
75
76 pub fn on_success(&self) {
78 let mut backoff = self.backoff.lock().unwrap();
79 *backoff = Duration::ZERO;
80 self.consecutive_429s.store(0, Ordering::Relaxed);
81 }
82
83 pub fn on_rate_limit(&self) {
85 self.consecutive_429s.fetch_add(1, Ordering::Relaxed);
86 let mut backoff = self.backoff.lock().unwrap();
87 let new_backoff = (*backoff * 2).max(Duration::from_secs(1));
88 *backoff = new_backoff.min(self.max_backoff);
89 }
90
91 pub fn should_pause(&self) -> bool {
93 self.consecutive_429s.load(Ordering::Relaxed) >= 5
94 }
95
96 pub fn current_backoff(&self) -> Duration {
98 *self.backoff.lock().unwrap()
99 }
100
101 pub fn pause_duration(&self) -> Duration {
103 Duration::from_secs(1800)
104 }
105}
106
107pub struct RateLimitGuard {
109 _permit: OwnedSemaphorePermit,
110}
111
112pub struct RateLimiter {
115 min_delay: Duration,
117 backoff: Duration,
119 max_backoff: Duration,
121 backoff_multiplier: f64,
123 last_request: Option<Instant>,
125 consecutive_429s: u32,
127}
128
129impl Default for RateLimiter {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl RateLimiter {
136 pub fn new() -> Self {
138 Self {
139 min_delay: Duration::from_millis(2000), backoff: Duration::from_secs(1),
141 max_backoff: Duration::from_secs(300), backoff_multiplier: 2.0,
143 last_request: None,
144 consecutive_429s: 0,
145 }
146 }
147
148 pub async fn wait(&mut self) {
150 if let Some(last) = self.last_request {
151 let elapsed = last.elapsed();
152 let required_delay = self.min_delay + self.backoff;
153
154 if elapsed < required_delay {
155 let wait_time = required_delay - elapsed;
156 tokio::time::sleep(wait_time).await;
157 }
158 }
159 self.last_request = Some(Instant::now());
160 }
161
162 pub fn on_success(&mut self) {
164 self.backoff = Duration::from_secs(1);
165 self.consecutive_429s = 0;
166 }
167
168 pub fn on_rate_limit(&mut self) {
170 self.consecutive_429s += 1;
171 self.backoff = Duration::from_secs_f64(
172 (self.backoff.as_secs_f64() * self.backoff_multiplier).min(self.max_backoff.as_secs_f64()),
173 );
174 }
175
176 pub fn should_pause(&self) -> bool {
178 self.consecutive_429s >= 5
179 }
180
181 pub fn current_backoff(&self) -> Duration {
183 self.backoff
184 }
185
186 pub fn pause_duration(&self) -> Duration {
188 Duration::from_secs(1800) }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_rate_limiter_defaults() {
198 let limiter = RateLimiter::new();
199 assert_eq!(limiter.min_delay, Duration::from_millis(2000));
200 assert_eq!(limiter.backoff, Duration::from_secs(1));
201 }
202
203 #[test]
204 fn test_exponential_backoff() {
205 let mut limiter = RateLimiter::new();
206
207 limiter.on_rate_limit();
208 assert_eq!(limiter.backoff, Duration::from_secs(2));
209
210 limiter.on_rate_limit();
211 assert_eq!(limiter.backoff, Duration::from_secs(4));
212
213 limiter.on_rate_limit();
214 assert_eq!(limiter.backoff, Duration::from_secs(8));
215 }
216
217 #[test]
218 fn test_backoff_max() {
219 let mut limiter = RateLimiter::new();
220
221 for _ in 0..20 {
223 limiter.on_rate_limit();
224 }
225
226 assert!(limiter.backoff <= limiter.max_backoff);
228 }
229
230 #[test]
231 fn test_reset_on_success() {
232 let mut limiter = RateLimiter::new();
233
234 limiter.on_rate_limit();
235 limiter.on_rate_limit();
236 assert!(limiter.backoff > Duration::from_secs(1));
237
238 limiter.on_success();
239 assert_eq!(limiter.backoff, Duration::from_secs(1));
240 assert_eq!(limiter.consecutive_429s, 0);
241 }
242
243 #[test]
244 fn test_should_pause() {
245 let mut limiter = RateLimiter::new();
246
247 for _ in 0..4 {
248 limiter.on_rate_limit();
249 assert!(!limiter.should_pause());
250 }
251
252 limiter.on_rate_limit();
253 assert!(limiter.should_pause());
254 }
255
256 #[test]
257 fn test_shared_rate_limiter() {
258 let limiter = SharedRateLimiter::new(3);
259
260 limiter.on_rate_limit();
261 assert!(!limiter.should_pause());
262
263 for _ in 0..4 {
264 limiter.on_rate_limit();
265 }
266 assert!(limiter.should_pause());
267
268 limiter.on_success();
269 assert!(!limiter.should_pause());
270 }
271}