use std::sync::RwLock;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::time::{Duration, Instant};
pub const DEFAULT_WEIGHT_LIMIT_1M: u64 = 1200;
pub const DEFAULT_WEIGHT_LIMIT_1S: u64 = 6000;
pub const DEFAULT_ORDER_LIMIT_10S: u64 = 100;
pub const DEFAULT_ORDER_LIMIT_1D: u64 = 200000;
pub const THROTTLE_THRESHOLD: f64 = 0.80;
#[derive(Debug, Clone, Default)]
pub struct RateLimitInfo {
pub used_weight_1m: Option<u64>,
pub used_weight_1s: Option<u64>,
pub order_count_10s: Option<u64>,
pub order_count_1d: Option<u64>,
pub retry_after: Option<u64>,
}
impl RateLimitInfo {
pub fn from_headers(headers: &serde_json::Value) -> Self {
let mut info = Self::default();
if let Some(obj) = headers.as_object() {
if let Some(weight) = obj
.get("x-mbx-used-weight-1m")
.or_else(|| obj.get("X-MBX-USED-WEIGHT-1M"))
.or_else(|| obj.get("x-sapi-used-uid-weight-1m"))
.or_else(|| obj.get("X-SAPI-USED-UID-WEIGHT-1M"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
{
info.used_weight_1m = Some(weight);
}
if let Some(weight) = obj
.get("x-mbx-used-weight-1s")
.or_else(|| obj.get("X-MBX-USED-WEIGHT-1S"))
.or_else(|| obj.get("x-mbx-used-weight"))
.or_else(|| obj.get("X-MBX-USED-WEIGHT"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
{
info.used_weight_1s = Some(weight);
}
if let Some(count) = obj
.get("x-mbx-order-count-10s")
.or_else(|| obj.get("X-MBX-ORDER-COUNT-10S"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
{
info.order_count_10s = Some(count);
}
if let Some(count) = obj
.get("x-mbx-order-count-1d")
.or_else(|| obj.get("X-MBX-ORDER-COUNT-1D"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
{
info.order_count_1d = Some(count);
}
if let Some(retry) = obj
.get("retry-after")
.or_else(|| obj.get("Retry-After"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<u64>().ok())
{
info.retry_after = Some(retry);
}
}
info
}
pub fn has_data(&self) -> bool {
self.used_weight_1m.is_some()
|| self.order_count_10s.is_some()
|| self.order_count_1d.is_some()
|| self.retry_after.is_some()
}
}
#[derive(Debug)]
pub struct WeightRateLimiter {
used_weight_1m: AtomicU64,
used_weight_1s: AtomicU64,
order_count_10s: AtomicU64,
order_count_1d: AtomicU64,
weight_limit_1m: AtomicU64,
weight_limit_1s: AtomicU64,
order_limit_10s: AtomicU64,
order_limit_1d: AtomicU64,
retry_after_until: RwLock<Option<Instant>>,
last_update: RwLock<Option<Instant>>,
ip_banned_until: AtomicI64,
}
impl Default for WeightRateLimiter {
fn default() -> Self {
Self::new()
}
}
impl WeightRateLimiter {
pub fn new() -> Self {
Self {
used_weight_1m: AtomicU64::new(0),
used_weight_1s: AtomicU64::new(0),
order_count_10s: AtomicU64::new(0),
order_count_1d: AtomicU64::new(0),
weight_limit_1m: AtomicU64::new(DEFAULT_WEIGHT_LIMIT_1M),
weight_limit_1s: AtomicU64::new(DEFAULT_WEIGHT_LIMIT_1S),
order_limit_10s: AtomicU64::new(DEFAULT_ORDER_LIMIT_10S),
order_limit_1d: AtomicU64::new(DEFAULT_ORDER_LIMIT_1D),
retry_after_until: RwLock::new(None),
last_update: RwLock::new(None),
ip_banned_until: AtomicI64::new(0),
}
}
pub fn with_limits(weight_limit_1m: u64, order_limit_10s: u64, order_limit_1d: u64) -> Self {
Self {
used_weight_1m: AtomicU64::new(0),
used_weight_1s: AtomicU64::new(0),
order_count_10s: AtomicU64::new(0),
order_count_1d: AtomicU64::new(0),
weight_limit_1m: AtomicU64::new(weight_limit_1m),
weight_limit_1s: AtomicU64::new(DEFAULT_WEIGHT_LIMIT_1S),
order_limit_10s: AtomicU64::new(order_limit_10s),
order_limit_1d: AtomicU64::new(order_limit_1d),
retry_after_until: RwLock::new(None),
last_update: RwLock::new(None),
ip_banned_until: AtomicI64::new(0),
}
}
pub fn update(&self, info: RateLimitInfo) {
if let Some(weight) = info.used_weight_1m {
self.used_weight_1m.store(weight, Ordering::SeqCst);
}
if let Some(weight) = info.used_weight_1s {
self.used_weight_1s.store(weight, Ordering::SeqCst);
}
if let Some(count) = info.order_count_10s {
self.order_count_10s.store(count, Ordering::SeqCst);
}
if let Some(count) = info.order_count_1d {
self.order_count_1d.store(count, Ordering::SeqCst);
}
if let Some(retry_secs) = info.retry_after {
let until = Instant::now() + Duration::from_secs(retry_secs);
if let Ok(mut guard) = self.retry_after_until.write() {
*guard = Some(until);
}
}
if let Ok(mut guard) = self.last_update.write() {
*guard = Some(Instant::now());
}
}
pub fn set_ip_banned(&self, duration: Duration) {
let until = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
+ duration.as_secs() as i64;
self.ip_banned_until.store(until, Ordering::SeqCst);
}
pub fn is_ip_banned(&self) -> bool {
let banned_until = self.ip_banned_until.load(Ordering::SeqCst);
if banned_until == 0 {
return false;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
now < banned_until
}
fn decay_stale_counters(&self) {
let last_update = if let Ok(guard) = self.last_update.read() {
*guard
} else {
return;
};
let Some(last) = last_update else {
return;
};
let elapsed = last.elapsed();
if elapsed > Duration::from_secs(1) {
self.used_weight_1s.store(0, Ordering::SeqCst);
}
if elapsed > Duration::from_secs(10) {
self.order_count_10s.store(0, Ordering::SeqCst);
}
if elapsed > Duration::from_secs(60) {
self.used_weight_1m.store(0, Ordering::SeqCst);
}
}
pub fn should_throttle(&self) -> bool {
self.decay_stale_counters();
if self.is_ip_banned() {
return true;
}
if let Ok(guard) = self.retry_after_until.read() {
if let Some(until) = *guard {
if Instant::now() < until {
return true;
}
}
}
let weight = self.used_weight_1m.load(Ordering::SeqCst);
let weight_limit = self.weight_limit_1m.load(Ordering::SeqCst);
#[allow(clippy::cast_precision_loss)]
if (weight as f64) >= (weight_limit as f64) * THROTTLE_THRESHOLD {
return true;
}
let weight_1s = self.used_weight_1s.load(Ordering::SeqCst);
let weight_limit_1s = self.weight_limit_1s.load(Ordering::SeqCst);
#[allow(clippy::cast_precision_loss)]
if (weight_1s as f64) >= (weight_limit_1s as f64) * THROTTLE_THRESHOLD {
return true;
}
let order_count = self.order_count_10s.load(Ordering::SeqCst);
let order_limit = self.order_limit_10s.load(Ordering::SeqCst);
if order_count as f64 >= order_limit as f64 * THROTTLE_THRESHOLD {
return true;
}
false
}
pub fn wait_duration(&self) -> Option<Duration> {
self.decay_stale_counters();
if self.is_ip_banned() {
let banned_until = self.ip_banned_until.load(Ordering::SeqCst);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
if banned_until > now {
return Some(Duration::from_secs((banned_until - now) as u64));
}
}
if let Ok(guard) = self.retry_after_until.read() {
if let Some(until) = *guard {
let now = Instant::now();
if until > now {
return Some(until - now);
}
}
}
if self.should_throttle() {
return Some(Duration::from_secs(1));
}
None
}
pub fn used_weight(&self) -> u64 {
self.used_weight_1m.load(Ordering::SeqCst)
}
pub fn weight_limit(&self) -> u64 {
self.weight_limit_1m.load(Ordering::SeqCst)
}
pub fn order_count_10s(&self) -> u64 {
self.order_count_10s.load(Ordering::SeqCst)
}
pub fn order_count_1d(&self) -> u64 {
self.order_count_1d.load(Ordering::SeqCst)
}
pub fn order_limit_1d(&self) -> u64 {
self.order_limit_1d.load(Ordering::SeqCst)
}
pub fn weight_usage_ratio(&self) -> f64 {
let weight = self.used_weight_1m.load(Ordering::SeqCst) as f64;
let limit = self.weight_limit_1m.load(Ordering::SeqCst) as f64;
if limit > 0.0 { weight / limit } else { 0.0 }
}
pub fn reset(&self) {
self.used_weight_1m.store(0, Ordering::SeqCst);
self.used_weight_1s.store(0, Ordering::SeqCst);
self.order_count_10s.store(0, Ordering::SeqCst);
self.order_count_1d.store(0, Ordering::SeqCst);
if let Ok(mut guard) = self.retry_after_until.write() {
*guard = None;
}
self.ip_banned_until.store(0, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rate_limiter_new() {
let limiter = WeightRateLimiter::new();
assert_eq!(limiter.used_weight(), 0);
assert_eq!(limiter.weight_limit(), DEFAULT_WEIGHT_LIMIT_1M);
assert!(!limiter.should_throttle());
}
#[test]
fn test_rate_limiter_with_limits() {
let limiter = WeightRateLimiter::with_limits(2400, 200, 400000);
assert_eq!(limiter.weight_limit(), 2400);
}
#[test]
fn test_rate_limiter_update() {
let limiter = WeightRateLimiter::new();
let info = RateLimitInfo {
used_weight_1m: Some(500),
used_weight_1s: None,
order_count_10s: Some(5),
order_count_1d: Some(1000),
retry_after: None,
};
limiter.update(info);
assert_eq!(limiter.used_weight(), 500);
assert_eq!(limiter.order_count_10s(), 5);
assert_eq!(limiter.order_count_1d(), 1000);
}
#[test]
fn test_rate_limiter_throttle_at_threshold() {
let limiter = WeightRateLimiter::new();
let threshold_weight = (DEFAULT_WEIGHT_LIMIT_1M as f64 * THROTTLE_THRESHOLD) as u64;
let info = RateLimitInfo {
used_weight_1m: Some(threshold_weight),
..Default::default()
};
limiter.update(info);
assert!(limiter.should_throttle());
}
#[test]
fn test_rate_limiter_no_throttle_below_threshold() {
let limiter = WeightRateLimiter::new();
let info = RateLimitInfo {
used_weight_1m: Some(DEFAULT_WEIGHT_LIMIT_1M / 2),
..Default::default()
};
limiter.update(info);
assert!(!limiter.should_throttle());
}
#[test]
fn test_rate_limiter_retry_after() {
let limiter = WeightRateLimiter::new();
let info = RateLimitInfo {
retry_after: Some(5),
..Default::default()
};
limiter.update(info);
assert!(limiter.should_throttle());
assert!(limiter.wait_duration().is_some());
}
#[test]
fn test_rate_limiter_ip_banned() {
let limiter = WeightRateLimiter::new();
limiter.set_ip_banned(Duration::from_secs(60));
assert!(limiter.is_ip_banned());
assert!(limiter.should_throttle());
}
#[test]
fn test_rate_limiter_reset() {
let limiter = WeightRateLimiter::new();
let info = RateLimitInfo {
used_weight_1m: Some(1000),
used_weight_1s: None,
order_count_10s: Some(50),
order_count_1d: Some(5000),
retry_after: Some(10),
};
limiter.update(info);
limiter.set_ip_banned(Duration::from_secs(60));
limiter.reset();
assert_eq!(limiter.used_weight(), 0);
assert_eq!(limiter.order_count_10s(), 0);
assert_eq!(limiter.order_count_1d(), 0);
assert!(!limiter.is_ip_banned());
assert!(!limiter.should_throttle());
}
#[test]
fn test_rate_limit_info_from_headers() {
let headers = serde_json::json!({
"x-mbx-used-weight-1m": "500",
"x-mbx-order-count-10s": "5",
"x-mbx-order-count-1d": "1000",
"retry-after": "30"
});
let info = RateLimitInfo::from_headers(&headers);
assert_eq!(info.used_weight_1m, Some(500));
assert_eq!(info.order_count_10s, Some(5));
assert_eq!(info.order_count_1d, Some(1000));
assert_eq!(info.retry_after, Some(30));
}
#[test]
fn test_rate_limit_info_from_headers_uppercase() {
let headers = serde_json::json!({
"X-MBX-USED-WEIGHT-1M": "600",
"X-MBX-ORDER-COUNT-10S": "10",
"Retry-After": "60"
});
let info = RateLimitInfo::from_headers(&headers);
assert_eq!(info.used_weight_1m, Some(600));
assert_eq!(info.order_count_10s, Some(10));
assert_eq!(info.retry_after, Some(60));
}
#[test]
fn test_rate_limit_info_has_data() {
let empty = RateLimitInfo::default();
assert!(!empty.has_data());
let with_weight = RateLimitInfo {
used_weight_1m: Some(100),
..Default::default()
};
assert!(with_weight.has_data());
}
#[test]
fn test_weight_usage_ratio() {
let limiter = WeightRateLimiter::new();
let info = RateLimitInfo {
used_weight_1m: Some(DEFAULT_WEIGHT_LIMIT_1M / 2),
..Default::default()
};
limiter.update(info);
let ratio = limiter.weight_usage_ratio();
assert!((ratio - 0.5).abs() < 0.01);
}
}