use std::{sync::Arc, time::Duration};
#[cfg(feature = "native")]
#[allow(
clippy::disallowed_types,
reason = "std::sync::Mutex is intentional here: the critical section is \
microseconds and a poisoned mutex is actually the right failure \
mode for a misbehaving caller — parking_lot would silently \
continue with corrupted bucket state"
)]
use std::sync::Mutex;
#[cfg(feature = "native")]
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub struct RateLimiter {
#[cfg(feature = "native")]
#[allow(
clippy::disallowed_types,
reason = "std::sync::Mutex is chosen deliberately — see the import comment"
)]
state: Arc<Mutex<BucketState>>,
rate: f64,
capacity: f64,
}
#[cfg(feature = "native")]
#[derive(Debug)]
struct BucketState {
tokens: f64,
last_refill: Instant,
}
impl RateLimiter {
#[must_use]
#[allow(
clippy::panic,
reason = "configuration error that must be surfaced loudly at construction"
)]
pub fn new(rate: f64, capacity: f64) -> Self {
assert!(
rate.is_finite() && rate > 0.0,
"RateLimiter rate must be a finite positive number (got {rate})"
);
assert!(
capacity.is_finite() && capacity > 0.0,
"RateLimiter capacity must be a finite positive number (got {capacity})"
);
#[cfg(feature = "native")]
#[allow(
clippy::disallowed_types,
reason = "std::sync::Mutex is chosen deliberately — see the import comment"
)]
{
Self {
state: Arc::new(Mutex::new(BucketState {
tokens: capacity,
last_refill: Instant::now(),
})),
rate,
capacity,
}
}
#[cfg(not(feature = "native"))]
{
Self { rate, capacity }
}
}
#[must_use]
pub fn default_orderbook() -> Self {
Self::new(5.0, 5.0)
}
#[must_use]
pub const fn rate(&self) -> f64 {
self.rate
}
#[must_use]
pub const fn capacity(&self) -> f64 {
self.capacity
}
#[allow(
clippy::unused_async,
reason = "wasm path is intentionally synchronous so the API stays unchanged"
)]
pub async fn acquire(&self) {
#[cfg(feature = "native")]
{
loop {
let wait = {
#[allow(
clippy::expect_used,
reason = "poisoned mutex is unrecoverable — surface it immediately"
)]
let mut state = self.state.lock().expect("rate limiter mutex poisoned");
let now = Instant::now();
let elapsed = now.duration_since(state.last_refill).as_secs_f64();
state.tokens = elapsed.mul_add(self.rate, state.tokens).min(self.capacity);
state.last_refill = now;
if state.tokens >= 1.0 {
state.tokens -= 1.0;
return;
}
let missing = 1.0 - state.tokens;
Duration::from_secs_f64(missing / self.rate)
};
tokio::time::sleep(wait).await;
}
}
}
}
impl Default for RateLimiter {
fn default() -> Self {
Self::default_orderbook()
}
}
pub const DEFAULT_RETRY_STATUS_CODES: &[u16] = &[
408, 425, 429, 500, 502, 503, 504, ];
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub retry_status_codes: &'static [u16],
}
impl RetryPolicy {
#[must_use]
pub const fn default_orderbook() -> Self {
Self {
max_attempts: 10,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
retry_status_codes: DEFAULT_RETRY_STATUS_CODES,
}
}
#[must_use]
pub const fn no_retry() -> Self {
Self {
max_attempts: 1,
initial_delay: Duration::ZERO,
max_delay: Duration::ZERO,
retry_status_codes: &[],
}
}
#[must_use]
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
let factor = 2u64.saturating_pow(attempt);
let nanos = self.initial_delay.as_nanos().saturating_mul(u128::from(factor));
let capped = nanos.min(self.max_delay.as_nanos());
u64::try_from(capped).map_or(self.max_delay, Duration::from_nanos)
}
#[must_use]
pub fn should_retry_status(&self, status: u16) -> bool {
self.retry_status_codes.contains(&status)
}
#[must_use]
pub fn should_retry_error(&self, err: &reqwest::Error) -> bool {
!err.is_decode()
}
#[allow(clippy::unused_async, reason = "wasm path is intentionally synchronous")]
pub(crate) async fn wait(&self, delay: Duration) {
#[cfg(feature = "native")]
tokio::time::sleep(delay).await;
#[cfg(not(feature = "native"))]
let _ = delay;
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self::default_orderbook()
}
}
#[cfg(not(feature = "native"))]
const _: () = {
let _: Option<Arc<()>> = None;
};
#[cfg(all(test, feature = "native"))]
#[allow(
clippy::tests_outside_test_module,
clippy::let_underscore_must_use,
reason = "the compound `cfg(all(test, feature = \"native\"))` gate confuses the \
test-module lint, and `let _ =` is the idiomatic form for \
`#[should_panic]` expression discards"
)]
mod tests {
use super::*;
#[test]
fn retry_policy_default_matches_upstream() {
let p = RetryPolicy::default_orderbook();
assert_eq!(p.max_attempts, 10);
assert_eq!(p.initial_delay, Duration::from_millis(100));
assert_eq!(p.max_delay, Duration::from_secs(30));
assert_eq!(p.retry_status_codes, DEFAULT_RETRY_STATUS_CODES);
}
#[test]
fn retry_policy_delay_doubles_and_caps() {
let p = RetryPolicy::default_orderbook();
assert_eq!(p.delay_for_attempt(0), Duration::from_millis(100));
assert_eq!(p.delay_for_attempt(1), Duration::from_millis(200));
assert_eq!(p.delay_for_attempt(2), Duration::from_millis(400));
assert_eq!(p.delay_for_attempt(3), Duration::from_millis(800));
assert_eq!(p.delay_for_attempt(9), Duration::from_secs(30));
assert_eq!(p.delay_for_attempt(1_000), Duration::from_secs(30));
}
#[test]
fn retry_policy_should_retry_status_matches_upstream() {
let p = RetryPolicy::default_orderbook();
for code in [408_u16, 425, 429, 500, 502, 503, 504] {
assert!(p.should_retry_status(code), "{code} should retry");
}
for code in [200_u16, 201, 204, 400, 401, 403, 404, 422] {
assert!(!p.should_retry_status(code), "{code} must not retry");
}
}
#[test]
fn retry_policy_no_retry_disables_everything() {
let p = RetryPolicy::no_retry();
assert_eq!(p.max_attempts, 1);
assert!(!p.should_retry_status(500));
}
#[test]
fn rate_limiter_accessors() {
let limiter = RateLimiter::new(5.0, 10.0);
assert!((limiter.rate() - 5.0).abs() < f64::EPSILON);
assert!((limiter.capacity() - 10.0).abs() < f64::EPSILON);
}
#[test]
#[should_panic(expected = "rate must be a finite positive number")]
fn rate_limiter_rejects_zero_rate() {
let _ = RateLimiter::new(0.0, 5.0);
}
#[test]
#[should_panic(expected = "capacity must be a finite positive number")]
fn rate_limiter_rejects_negative_capacity() {
let _ = RateLimiter::new(5.0, -1.0);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn rate_limiter_consumes_initial_burst_immediately() {
let limiter = RateLimiter::new(5.0, 3.0);
let start = tokio::time::Instant::now();
limiter.acquire().await;
limiter.acquire().await;
limiter.acquire().await;
assert!(
start.elapsed() < Duration::from_millis(1),
"initial burst should be instantaneous"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn rate_limiter_throttles_after_burst() {
let limiter = RateLimiter::new(5.0, 1.0);
limiter.acquire().await; let start = tokio::time::Instant::now();
limiter.acquire().await;
let waited = start.elapsed();
assert!(
waited >= Duration::from_millis(200),
"second acquire should wait for a refill (got {waited:?})"
);
assert!(
waited < Duration::from_millis(500),
"second acquire should wait roughly 1 / rate seconds (got {waited:?})"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn rate_limiter_shared_state_across_clones() {
let a = RateLimiter::new(5.0, 1.0);
let b = a.clone();
a.acquire().await; let start = tokio::time::Instant::now();
b.acquire().await; assert!(
start.elapsed() >= Duration::from_millis(200),
"cloned limiter should share the bucket"
);
}
#[test]
fn rate_limiter_default_matches_default_orderbook() {
let a = RateLimiter::default();
let b = RateLimiter::default_orderbook();
assert!((a.rate() - b.rate()).abs() < f64::EPSILON);
assert!((a.capacity() - b.capacity()).abs() < f64::EPSILON);
}
#[test]
fn retry_policy_default_trait() {
let a = RetryPolicy::default();
let b = RetryPolicy::default_orderbook();
assert_eq!(a.max_attempts, b.max_attempts);
assert_eq!(a.initial_delay, b.initial_delay);
assert_eq!(a.max_delay, b.max_delay);
}
#[test]
#[should_panic(expected = "rate must be a finite positive number")]
fn rate_limiter_rejects_nan_rate() {
let _ = RateLimiter::new(f64::NAN, 5.0);
}
#[test]
#[should_panic(expected = "capacity must be a finite positive number")]
fn rate_limiter_rejects_inf_capacity() {
let _ = RateLimiter::new(5.0, f64::INFINITY);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn retry_policy_wait_sleeps() {
let p = RetryPolicy::default_orderbook();
let start = tokio::time::Instant::now();
p.wait(Duration::from_millis(100)).await;
assert!(start.elapsed() >= Duration::from_millis(100));
}
}