use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
const WINDOW: Duration = Duration::from_secs(60);
pub const MAX_WEIGHT: u32 = 1200;
pub const ADDR_INITIAL_BUFFER: u64 = 10_000;
const ADDR_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Debug, Clone)]
pub struct RateLimitError {
pub retry_after: Duration,
}
impl std::fmt::Display for RateLimitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "rate limited, retry after {:?}", self.retry_after)
}
}
impl std::error::Error for RateLimitError {}
pub struct RestRateLimiter {
entries: Mutex<VecDeque<(Instant, u32)>>,
max_weight: u32,
}
impl Default for RestRateLimiter {
fn default() -> Self {
Self::new()
}
}
impl RestRateLimiter {
pub fn new() -> Self {
Self::new_with_budget(MAX_WEIGHT)
}
pub fn new_with_budget(max_weight: u32) -> Self {
RestRateLimiter {
entries: Mutex::new(VecDeque::new()),
max_weight,
}
}
pub async fn acquire(&self, weight: u32) -> Result<(), RateLimitError> {
let mut entries = self.entries.lock().await;
let now = Instant::now();
while let Some(&(t, _)) = entries.front() {
if now.duration_since(t) >= WINDOW {
entries.pop_front();
} else {
break;
}
}
let used: u32 = entries.iter().map(|(_, w)| w).sum();
if used + weight <= self.max_weight {
entries.push_back((now, weight));
return Ok(());
}
let oldest = entries.front().unwrap().0;
let retry_after = WINDOW.saturating_sub(now.duration_since(oldest));
Err(RateLimitError { retry_after })
}
pub async fn acquire_blocking(&self, weight: u32, call: &str) {
loop {
match self.acquire(weight).await {
Ok(()) => return,
Err(e) => {
let used = {
let entries = self.entries.lock().await;
entries.iter().map(|(_, w)| w).sum::<u32>()
};
tracing::warn!(
call,
weight,
used,
budget = self.max_weight,
retry_after_ms = e.retry_after.as_millis(),
"REST rate limited"
);
tokio::time::sleep(e.retry_after).await;
}
}
}
}
}
pub struct AddressRateLimiter {
inner: Mutex<AddrState>,
}
struct AddrState {
budget: u64,
consumed: u64,
last_throttled: Option<Instant>,
}
impl Default for AddressRateLimiter {
fn default() -> Self {
Self::new()
}
}
impl AddressRateLimiter {
pub fn new() -> Self {
Self::new_with_budget(ADDR_INITIAL_BUFFER)
}
pub fn new_with_budget(initial_budget: u64) -> Self {
AddressRateLimiter {
inner: Mutex::new(AddrState {
budget: initial_budget,
consumed: 0,
last_throttled: None,
}),
}
}
pub async fn record_fill(&self, usdc_volume: u64) {
let mut s = self.inner.lock().await;
s.budget = s.budget.saturating_add(usdc_volume);
}
fn cancel_limit(default: u64) -> u64 {
(default + 100_000).min(default * 2)
}
pub async fn acquire(&self, count: u64, is_cancel: bool) -> Result<(), RateLimitError> {
let mut s = self.inner.lock().await;
let effective_limit = if is_cancel {
Self::cancel_limit(s.budget)
} else {
s.budget
};
if s.consumed + count <= effective_limit {
s.consumed += count;
return Ok(());
}
let now = Instant::now();
let retry_after = match s.last_throttled {
Some(t) => ADDR_THROTTLE_INTERVAL.saturating_sub(now.duration_since(t)),
None => Duration::ZERO,
};
if retry_after.is_zero() {
if count > 1 {
return Err(RateLimitError {
retry_after: ADDR_THROTTLE_INTERVAL,
});
}
s.last_throttled = Some(now);
s.consumed += 1;
return Ok(());
}
Err(RateLimitError { retry_after })
}
pub async fn acquire_blocking(&self, count: u64, is_cancel: bool) {
loop {
match self.acquire(count, is_cancel).await {
Ok(()) => return,
Err(e) => {
tracing::warn!(
retry_after_ms = e.retry_after.as_millis(),
"address rate limited"
);
tokio::time::sleep(e.retry_after).await;
}
}
}
}
}