1use std::sync::RwLock;
44use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
45use std::time::{Duration, Instant};
46
47pub const DEFAULT_WEIGHT_LIMIT_1M: u64 = 1200;
49
50pub const DEFAULT_WEIGHT_LIMIT_1S: u64 = 6000;
52
53pub const DEFAULT_ORDER_LIMIT_10S: u64 = 100;
55
56pub const DEFAULT_ORDER_LIMIT_1D: u64 = 200000;
58
59pub const THROTTLE_THRESHOLD: f64 = 0.80;
61
62#[derive(Debug, Clone, Default)]
64pub struct RateLimitInfo {
65 pub used_weight_1m: Option<u64>,
67 pub used_weight_1s: Option<u64>,
69 pub order_count_10s: Option<u64>,
71 pub order_count_1d: Option<u64>,
73 pub retry_after: Option<u64>,
75}
76
77impl RateLimitInfo {
78 pub fn from_headers(headers: &serde_json::Value) -> Self {
88 let mut info = Self::default();
89
90 if let Some(obj) = headers.as_object() {
91 if let Some(weight) = obj
93 .get("x-mbx-used-weight-1m")
94 .or_else(|| obj.get("X-MBX-USED-WEIGHT-1M"))
95 .or_else(|| obj.get("x-sapi-used-uid-weight-1m"))
96 .or_else(|| obj.get("X-SAPI-USED-UID-WEIGHT-1M"))
97 .and_then(|v| v.as_str())
98 .and_then(|s| s.parse::<u64>().ok())
99 {
100 info.used_weight_1m = Some(weight);
101 }
102
103 if let Some(weight) = obj
105 .get("x-mbx-used-weight-1s")
106 .or_else(|| obj.get("X-MBX-USED-WEIGHT-1S"))
107 .or_else(|| obj.get("x-mbx-used-weight"))
108 .or_else(|| obj.get("X-MBX-USED-WEIGHT"))
109 .and_then(|v| v.as_str())
110 .and_then(|s| s.parse::<u64>().ok())
111 {
112 info.used_weight_1s = Some(weight);
113 }
114
115 if let Some(count) = obj
117 .get("x-mbx-order-count-10s")
118 .or_else(|| obj.get("X-MBX-ORDER-COUNT-10S"))
119 .and_then(|v| v.as_str())
120 .and_then(|s| s.parse::<u64>().ok())
121 {
122 info.order_count_10s = Some(count);
123 }
124
125 if let Some(count) = obj
127 .get("x-mbx-order-count-1d")
128 .or_else(|| obj.get("X-MBX-ORDER-COUNT-1D"))
129 .and_then(|v| v.as_str())
130 .and_then(|s| s.parse::<u64>().ok())
131 {
132 info.order_count_1d = Some(count);
133 }
134
135 if let Some(retry) = obj
137 .get("retry-after")
138 .or_else(|| obj.get("Retry-After"))
139 .and_then(|v| v.as_str())
140 .and_then(|s| s.parse::<u64>().ok())
141 {
142 info.retry_after = Some(retry);
143 }
144 }
145
146 info
147 }
148
149 pub fn has_data(&self) -> bool {
151 self.used_weight_1m.is_some()
152 || self.order_count_10s.is_some()
153 || self.order_count_1d.is_some()
154 || self.retry_after.is_some()
155 }
156}
157
158#[derive(Debug)]
163pub struct WeightRateLimiter {
164 used_weight_1m: AtomicU64,
166 used_weight_1s: AtomicU64,
168 order_count_10s: AtomicU64,
170 order_count_1d: AtomicU64,
172 weight_limit_1m: AtomicU64,
174 weight_limit_1s: AtomicU64,
176 order_limit_10s: AtomicU64,
178 order_limit_1d: AtomicU64,
180 retry_after_until: RwLock<Option<Instant>>,
182 last_update: RwLock<Option<Instant>>,
184 ip_banned_until: AtomicI64,
186}
187
188impl Default for WeightRateLimiter {
189 fn default() -> Self {
190 Self::new()
191 }
192}
193
194impl WeightRateLimiter {
195 pub fn new() -> Self {
197 Self {
198 used_weight_1m: AtomicU64::new(0),
199 used_weight_1s: AtomicU64::new(0),
200 order_count_10s: AtomicU64::new(0),
201 order_count_1d: AtomicU64::new(0),
202 weight_limit_1m: AtomicU64::new(DEFAULT_WEIGHT_LIMIT_1M),
203 weight_limit_1s: AtomicU64::new(DEFAULT_WEIGHT_LIMIT_1S),
204 order_limit_10s: AtomicU64::new(DEFAULT_ORDER_LIMIT_10S),
205 order_limit_1d: AtomicU64::new(DEFAULT_ORDER_LIMIT_1D),
206 retry_after_until: RwLock::new(None),
207 last_update: RwLock::new(None),
208 ip_banned_until: AtomicI64::new(0),
209 }
210 }
211
212 pub fn with_limits(weight_limit_1m: u64, order_limit_10s: u64, order_limit_1d: u64) -> Self {
220 Self {
221 used_weight_1m: AtomicU64::new(0),
222 used_weight_1s: AtomicU64::new(0),
223 order_count_10s: AtomicU64::new(0),
224 order_count_1d: AtomicU64::new(0),
225 weight_limit_1m: AtomicU64::new(weight_limit_1m),
226 weight_limit_1s: AtomicU64::new(DEFAULT_WEIGHT_LIMIT_1S),
227 order_limit_10s: AtomicU64::new(order_limit_10s),
228 order_limit_1d: AtomicU64::new(order_limit_1d),
229 retry_after_until: RwLock::new(None),
230 last_update: RwLock::new(None),
231 ip_banned_until: AtomicI64::new(0),
232 }
233 }
234
235 pub fn update(&self, info: RateLimitInfo) {
241 if let Some(weight) = info.used_weight_1m {
242 self.used_weight_1m.store(weight, Ordering::SeqCst);
243 }
244
245 if let Some(weight) = info.used_weight_1s {
246 self.used_weight_1s.store(weight, Ordering::SeqCst);
247 }
248
249 if let Some(count) = info.order_count_10s {
250 self.order_count_10s.store(count, Ordering::SeqCst);
251 }
252
253 if let Some(count) = info.order_count_1d {
254 self.order_count_1d.store(count, Ordering::SeqCst);
255 }
256
257 if let Some(retry_secs) = info.retry_after {
258 let until = Instant::now() + Duration::from_secs(retry_secs);
259 if let Ok(mut guard) = self.retry_after_until.write() {
260 *guard = Some(until);
261 }
262 }
263
264 if let Ok(mut guard) = self.last_update.write() {
265 *guard = Some(Instant::now());
266 }
267 }
268
269 pub fn set_ip_banned(&self, duration: Duration) {
275 let until = std::time::SystemTime::now()
276 .duration_since(std::time::UNIX_EPOCH)
277 .unwrap_or_default()
278 .as_secs() as i64
279 + duration.as_secs() as i64;
280 self.ip_banned_until.store(until, Ordering::SeqCst);
281 }
282
283 pub fn is_ip_banned(&self) -> bool {
285 let banned_until = self.ip_banned_until.load(Ordering::SeqCst);
286 if banned_until == 0 {
287 return false;
288 }
289
290 let now = std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .unwrap_or_default()
293 .as_secs() as i64;
294
295 now < banned_until
296 }
297
298 fn decay_stale_counters(&self) {
303 let last_update = if let Ok(guard) = self.last_update.read() {
304 *guard
305 } else {
306 return;
307 };
308
309 let Some(last) = last_update else {
310 return;
311 };
312
313 let elapsed = last.elapsed();
314
315 if elapsed > Duration::from_secs(1) {
317 self.used_weight_1s.store(0, Ordering::SeqCst);
318 }
319
320 if elapsed > Duration::from_secs(10) {
322 self.order_count_10s.store(0, Ordering::SeqCst);
323 }
324
325 if elapsed > Duration::from_secs(60) {
327 self.used_weight_1m.store(0, Ordering::SeqCst);
328 }
329 }
330
331 pub fn should_throttle(&self) -> bool {
339 self.decay_stale_counters();
340
341 if self.is_ip_banned() {
343 return true;
344 }
345
346 if let Ok(guard) = self.retry_after_until.read() {
348 if let Some(until) = *guard {
349 if Instant::now() < until {
350 return true;
351 }
352 }
353 }
354
355 let weight = self.used_weight_1m.load(Ordering::SeqCst);
357 let weight_limit = self.weight_limit_1m.load(Ordering::SeqCst);
358 #[allow(clippy::cast_precision_loss)]
359 if (weight as f64) >= (weight_limit as f64) * THROTTLE_THRESHOLD {
360 return true;
361 }
362
363 let weight_1s = self.used_weight_1s.load(Ordering::SeqCst);
365 let weight_limit_1s = self.weight_limit_1s.load(Ordering::SeqCst);
366 #[allow(clippy::cast_precision_loss)]
367 if (weight_1s as f64) >= (weight_limit_1s as f64) * THROTTLE_THRESHOLD {
368 return true;
369 }
370
371 let order_count = self.order_count_10s.load(Ordering::SeqCst);
373 let order_limit = self.order_limit_10s.load(Ordering::SeqCst);
374 if order_count as f64 >= order_limit as f64 * THROTTLE_THRESHOLD {
375 return true;
376 }
377
378 false
379 }
380
381 pub fn wait_duration(&self) -> Option<Duration> {
385 self.decay_stale_counters();
386
387 if self.is_ip_banned() {
389 let banned_until = self.ip_banned_until.load(Ordering::SeqCst);
390 let now = std::time::SystemTime::now()
391 .duration_since(std::time::UNIX_EPOCH)
392 .unwrap_or_default()
393 .as_secs() as i64;
394 if banned_until > now {
395 return Some(Duration::from_secs((banned_until - now) as u64));
396 }
397 }
398
399 if let Ok(guard) = self.retry_after_until.read() {
401 if let Some(until) = *guard {
402 let now = Instant::now();
403 if until > now {
404 return Some(until - now);
405 }
406 }
407 }
408
409 if self.should_throttle() {
411 return Some(Duration::from_secs(1));
413 }
414
415 None
416 }
417
418 pub fn used_weight(&self) -> u64 {
420 self.used_weight_1m.load(Ordering::SeqCst)
421 }
422
423 pub fn weight_limit(&self) -> u64 {
425 self.weight_limit_1m.load(Ordering::SeqCst)
426 }
427
428 pub fn order_count_10s(&self) -> u64 {
430 self.order_count_10s.load(Ordering::SeqCst)
431 }
432
433 pub fn order_count_1d(&self) -> u64 {
435 self.order_count_1d.load(Ordering::SeqCst)
436 }
437
438 pub fn order_limit_1d(&self) -> u64 {
440 self.order_limit_1d.load(Ordering::SeqCst)
441 }
442
443 pub fn weight_usage_ratio(&self) -> f64 {
445 let weight = self.used_weight_1m.load(Ordering::SeqCst) as f64;
446 let limit = self.weight_limit_1m.load(Ordering::SeqCst) as f64;
447 if limit > 0.0 { weight / limit } else { 0.0 }
448 }
449
450 pub fn reset(&self) {
454 self.used_weight_1m.store(0, Ordering::SeqCst);
455 self.used_weight_1s.store(0, Ordering::SeqCst);
456 self.order_count_10s.store(0, Ordering::SeqCst);
457 self.order_count_1d.store(0, Ordering::SeqCst);
458 if let Ok(mut guard) = self.retry_after_until.write() {
459 *guard = None;
460 }
461 self.ip_banned_until.store(0, Ordering::SeqCst);
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_rate_limiter_new() {
471 let limiter = WeightRateLimiter::new();
472 assert_eq!(limiter.used_weight(), 0);
473 assert_eq!(limiter.weight_limit(), DEFAULT_WEIGHT_LIMIT_1M);
474 assert!(!limiter.should_throttle());
475 }
476
477 #[test]
478 fn test_rate_limiter_with_limits() {
479 let limiter = WeightRateLimiter::with_limits(2400, 200, 400000);
480 assert_eq!(limiter.weight_limit(), 2400);
481 }
482
483 #[test]
484 fn test_rate_limiter_update() {
485 let limiter = WeightRateLimiter::new();
486
487 let info = RateLimitInfo {
488 used_weight_1m: Some(500),
489 used_weight_1s: None,
490 order_count_10s: Some(5),
491 order_count_1d: Some(1000),
492 retry_after: None,
493 };
494
495 limiter.update(info);
496
497 assert_eq!(limiter.used_weight(), 500);
498 assert_eq!(limiter.order_count_10s(), 5);
499 assert_eq!(limiter.order_count_1d(), 1000);
500 }
501
502 #[test]
503 fn test_rate_limiter_throttle_at_threshold() {
504 let limiter = WeightRateLimiter::new();
505
506 let threshold_weight = (DEFAULT_WEIGHT_LIMIT_1M as f64 * THROTTLE_THRESHOLD) as u64;
508 let info = RateLimitInfo {
509 used_weight_1m: Some(threshold_weight),
510 ..Default::default()
511 };
512
513 limiter.update(info);
514 assert!(limiter.should_throttle());
515 }
516
517 #[test]
518 fn test_rate_limiter_no_throttle_below_threshold() {
519 let limiter = WeightRateLimiter::new();
520
521 let info = RateLimitInfo {
523 used_weight_1m: Some(DEFAULT_WEIGHT_LIMIT_1M / 2),
524 ..Default::default()
525 };
526
527 limiter.update(info);
528 assert!(!limiter.should_throttle());
529 }
530
531 #[test]
532 fn test_rate_limiter_retry_after() {
533 let limiter = WeightRateLimiter::new();
534
535 let info = RateLimitInfo {
536 retry_after: Some(5),
537 ..Default::default()
538 };
539
540 limiter.update(info);
541 assert!(limiter.should_throttle());
542 assert!(limiter.wait_duration().is_some());
543 }
544
545 #[test]
546 fn test_rate_limiter_ip_banned() {
547 let limiter = WeightRateLimiter::new();
548
549 limiter.set_ip_banned(Duration::from_secs(60));
550 assert!(limiter.is_ip_banned());
551 assert!(limiter.should_throttle());
552 }
553
554 #[test]
555 fn test_rate_limiter_reset() {
556 let limiter = WeightRateLimiter::new();
557
558 let info = RateLimitInfo {
559 used_weight_1m: Some(1000),
560 used_weight_1s: None,
561 order_count_10s: Some(50),
562 order_count_1d: Some(5000),
563 retry_after: Some(10),
564 };
565
566 limiter.update(info);
567 limiter.set_ip_banned(Duration::from_secs(60));
568
569 limiter.reset();
570
571 assert_eq!(limiter.used_weight(), 0);
572 assert_eq!(limiter.order_count_10s(), 0);
573 assert_eq!(limiter.order_count_1d(), 0);
574 assert!(!limiter.is_ip_banned());
575 assert!(!limiter.should_throttle());
576 }
577
578 #[test]
579 fn test_rate_limit_info_from_headers() {
580 let headers = serde_json::json!({
581 "x-mbx-used-weight-1m": "500",
582 "x-mbx-order-count-10s": "5",
583 "x-mbx-order-count-1d": "1000",
584 "retry-after": "30"
585 });
586
587 let info = RateLimitInfo::from_headers(&headers);
588
589 assert_eq!(info.used_weight_1m, Some(500));
590 assert_eq!(info.order_count_10s, Some(5));
591 assert_eq!(info.order_count_1d, Some(1000));
592 assert_eq!(info.retry_after, Some(30));
593 }
594
595 #[test]
596 fn test_rate_limit_info_from_headers_uppercase() {
597 let headers = serde_json::json!({
598 "X-MBX-USED-WEIGHT-1M": "600",
599 "X-MBX-ORDER-COUNT-10S": "10",
600 "Retry-After": "60"
601 });
602
603 let info = RateLimitInfo::from_headers(&headers);
604
605 assert_eq!(info.used_weight_1m, Some(600));
606 assert_eq!(info.order_count_10s, Some(10));
607 assert_eq!(info.retry_after, Some(60));
608 }
609
610 #[test]
611 fn test_rate_limit_info_has_data() {
612 let empty = RateLimitInfo::default();
613 assert!(!empty.has_data());
614
615 let with_weight = RateLimitInfo {
616 used_weight_1m: Some(100),
617 ..Default::default()
618 };
619 assert!(with_weight.has_data());
620 }
621
622 #[test]
623 fn test_weight_usage_ratio() {
624 let limiter = WeightRateLimiter::new();
625
626 let info = RateLimitInfo {
628 used_weight_1m: Some(DEFAULT_WEIGHT_LIMIT_1M / 2),
629 ..Default::default()
630 };
631 limiter.update(info);
632
633 let ratio = limiter.weight_usage_ratio();
634 assert!((ratio - 0.5).abs() < 0.01);
635 }
636}