garmin_cli/sync/
rate_limiter.rs1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6use tokio::sync::{OwnedSemaphorePermit, Semaphore};
7
8#[derive(Clone)]
10pub struct SharedRateLimiter {
11 semaphore: Arc<Semaphore>,
13 min_delay: Duration,
15 last_request: Arc<Mutex<Instant>>,
17 backoff: Arc<Mutex<Duration>>,
19 max_backoff: Duration,
21 consecutive_429s: Arc<AtomicU32>,
23}
24
25impl Default for SharedRateLimiter {
26 fn default() -> Self {
27 Self::new(3) }
29}
30
31impl SharedRateLimiter {
32 pub fn new(max_concurrent: usize) -> Self {
34 Self {
35 semaphore: Arc::new(Semaphore::new(max_concurrent)),
36 min_delay: Duration::from_millis(2000),
37 last_request: Arc::new(Mutex::new(Instant::now() - Duration::from_secs(10))),
38 backoff: Arc::new(Mutex::new(Duration::from_secs(0))),
39 max_backoff: Duration::from_secs(300), consecutive_429s: Arc::new(AtomicU32::new(0)),
41 }
42 }
43
44 pub async fn acquire(&self) -> RateLimitGuard {
46 let permit = self.semaphore.clone().acquire_owned().await.unwrap();
48
49 let required_delay = {
51 let last = self.last_request.lock().unwrap();
52 let backoff = self.backoff.lock().unwrap();
53 let elapsed = last.elapsed();
54 let required = self.min_delay + *backoff;
55
56 if elapsed < required {
57 required - elapsed
58 } else {
59 Duration::ZERO
60 }
61 };
62
63 if !required_delay.is_zero() {
64 tokio::time::sleep(required_delay).await;
65 }
66
67 {
69 let mut last = self.last_request.lock().unwrap();
70 *last = Instant::now();
71 }
72
73 RateLimitGuard { _permit: permit }
74 }
75
76 pub fn on_success(&self) {
78 let mut backoff = self.backoff.lock().unwrap();
79 *backoff = Duration::ZERO;
80 self.consecutive_429s.store(0, Ordering::Relaxed);
81 }
82
83 pub fn on_rate_limit(&self) {
85 self.consecutive_429s.fetch_add(1, Ordering::Relaxed);
86 let mut backoff = self.backoff.lock().unwrap();
87 let new_backoff = (*backoff * 2).max(Duration::from_secs(1));
88 *backoff = new_backoff.min(self.max_backoff);
89 }
90
91 pub fn should_pause(&self) -> bool {
93 self.consecutive_429s.load(Ordering::Relaxed) >= 5
94 }
95
96 pub fn current_backoff(&self) -> Duration {
98 *self.backoff.lock().unwrap()
99 }
100
101 pub fn pause_duration(&self) -> Duration {
103 Duration::from_secs(1800)
104 }
105}
106
107pub struct RateLimitGuard {
109 _permit: OwnedSemaphorePermit,
110}
111
112pub struct RateLimiter {
115 min_delay: Duration,
117 backoff: Duration,
119 max_backoff: Duration,
121 backoff_multiplier: f64,
123 last_request: Option<Instant>,
125 consecutive_429s: u32,
127}
128
129impl Default for RateLimiter {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl RateLimiter {
136 pub fn new() -> Self {
138 Self {
139 min_delay: Duration::from_millis(2000), backoff: Duration::from_secs(1),
141 max_backoff: Duration::from_secs(300), backoff_multiplier: 2.0,
143 last_request: None,
144 consecutive_429s: 0,
145 }
146 }
147
148 pub async fn wait(&mut self) {
150 if let Some(last) = self.last_request {
151 let elapsed = last.elapsed();
152 let required_delay = self.min_delay + self.backoff;
153
154 if elapsed < required_delay {
155 let wait_time = required_delay - elapsed;
156 tokio::time::sleep(wait_time).await;
157 }
158 }
159 self.last_request = Some(Instant::now());
160 }
161
162 pub fn on_success(&mut self) {
164 self.backoff = Duration::from_secs(1);
165 self.consecutive_429s = 0;
166 }
167
168 pub fn on_rate_limit(&mut self) {
170 self.consecutive_429s += 1;
171 self.backoff = Duration::from_secs_f64(
172 (self.backoff.as_secs_f64() * self.backoff_multiplier)
173 .min(self.max_backoff.as_secs_f64()),
174 );
175 }
176
177 pub fn should_pause(&self) -> bool {
179 self.consecutive_429s >= 5
180 }
181
182 pub fn current_backoff(&self) -> Duration {
184 self.backoff
185 }
186
187 pub fn pause_duration(&self) -> Duration {
189 Duration::from_secs(1800) }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196
197 #[test]
198 fn test_rate_limiter_defaults() {
199 let limiter = RateLimiter::new();
200 assert_eq!(limiter.min_delay, Duration::from_millis(2000));
201 assert_eq!(limiter.backoff, Duration::from_secs(1));
202 }
203
204 #[test]
205 fn test_exponential_backoff() {
206 let mut limiter = RateLimiter::new();
207
208 limiter.on_rate_limit();
209 assert_eq!(limiter.backoff, Duration::from_secs(2));
210
211 limiter.on_rate_limit();
212 assert_eq!(limiter.backoff, Duration::from_secs(4));
213
214 limiter.on_rate_limit();
215 assert_eq!(limiter.backoff, Duration::from_secs(8));
216 }
217
218 #[test]
219 fn test_backoff_max() {
220 let mut limiter = RateLimiter::new();
221
222 for _ in 0..20 {
224 limiter.on_rate_limit();
225 }
226
227 assert!(limiter.backoff <= limiter.max_backoff);
229 }
230
231 #[test]
232 fn test_reset_on_success() {
233 let mut limiter = RateLimiter::new();
234
235 limiter.on_rate_limit();
236 limiter.on_rate_limit();
237 assert!(limiter.backoff > Duration::from_secs(1));
238
239 limiter.on_success();
240 assert_eq!(limiter.backoff, Duration::from_secs(1));
241 assert_eq!(limiter.consecutive_429s, 0);
242 }
243
244 #[test]
245 fn test_should_pause() {
246 let mut limiter = RateLimiter::new();
247
248 for _ in 0..4 {
249 limiter.on_rate_limit();
250 assert!(!limiter.should_pause());
251 }
252
253 limiter.on_rate_limit();
254 assert!(limiter.should_pause());
255 }
256
257 #[test]
258 fn test_shared_rate_limiter() {
259 let limiter = SharedRateLimiter::new(3);
260
261 limiter.on_rate_limit();
262 assert!(!limiter.should_pause());
263
264 for _ in 0..4 {
265 limiter.on_rate_limit();
266 }
267 assert!(limiter.should_pause());
268
269 limiter.on_success();
270 assert!(!limiter.should_pause());
271 }
272}