Skip to main content

bybit_rust_api/utils/
rate_limiter.rs

1//! Token-bucket rate limiter shared across REST and WebSocket clients.
2//!
3//! Bybit V5 rate limits:
4//! - Public endpoints: 50 requests/second
5//! - Private endpoints: 50 requests/second (separate pool)
6//! - WebSocket order entry: 20 orders/second
7//!
8//! The limiter uses a token bucket algorithm with async waiting.
9
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::Mutex;
13
14/// Token bucket rate limiter.
15///
16/// Tokens refill at a constant rate up to a maximum capacity.
17/// When tokens are exhausted, callers wait asynchronously.
18#[derive(Clone)]
19pub struct RateLimiter {
20    inner: Arc<Mutex<Bucket>>,
21}
22
23struct Bucket {
24    tokens: f64,
25    capacity: f64,
26    refill_rate: f64, // tokens per second
27    last_refill: Instant,
28}
29
30impl RateLimiter {
31    /// Create a new rate limiter.
32    ///
33    /// # Arguments
34    /// * `rate` - Sustained requests per second
35    /// * `burst` - Maximum burst size (default: same as rate)
36    pub fn new(rate: u32, burst: Option<u32>) -> Self {
37        let burst = burst.unwrap_or(rate);
38        RateLimiter {
39            inner: Arc::new(Mutex::new(Bucket {
40                tokens: burst as f64,
41                capacity: burst as f64,
42                refill_rate: rate as f64,
43                last_refill: Instant::now(),
44            })),
45        }
46    }
47
48    /// Acquire a single token, waiting asynchronously if necessary.
49    ///
50    /// Returns immediately if a token is available, otherwise sleeps
51    /// until enough tokens have refilled.
52    pub async fn acquire(&self) {
53        let wait = {
54            let mut bucket = self.inner.lock().await;
55            bucket.refill();
56            if bucket.tokens >= 1.0 {
57                bucket.tokens -= 1.0;
58                None
59            } else {
60                // Calculate how long to wait for 1 token
61                let needed = 1.0 - bucket.tokens;
62                let wait_secs = needed / bucket.refill_rate;
63                Some(Duration::from_secs_f64(wait_secs))
64            }
65        };
66
67        if let Some(duration) = wait {
68            tokio::time::sleep(duration).await;
69            // After waiting, try again
70            let mut bucket = self.inner.lock().await;
71            bucket.refill();
72            if bucket.tokens >= 1.0 {
73                bucket.tokens -= 1.0;
74            }
75        }
76    }
77
78    /// Try to acquire a token without waiting.
79    ///
80    /// Returns `true` if a token was available and consumed.
81    pub async fn try_acquire(&self) -> bool {
82        let mut bucket = self.inner.lock().await;
83        bucket.refill();
84        if bucket.tokens >= 1.0 {
85            bucket.tokens -= 1.0;
86            true
87        } else {
88            false
89        }
90    }
91
92    /// Get the current number of available tokens (for diagnostics).
93    pub async fn available(&self) -> f64 {
94        let mut bucket = self.inner.lock().await;
95        bucket.refill();
96        bucket.tokens
97    }
98
99    /// Create a rate limiter for public REST endpoints (50 req/s).
100    pub fn public_rest() -> Self {
101        RateLimiter::new(50, Some(50))
102    }
103
104    /// Create a rate limiter for private REST endpoints (50 req/s).
105    pub fn private_rest() -> Self {
106        RateLimiter::new(50, Some(50))
107    }
108
109    /// Create a rate limiter for WebSocket order entry (20 req/s).
110    pub fn ws_order_entry() -> Self {
111        RateLimiter::new(20, Some(20))
112    }
113}
114
115impl Bucket {
116    fn refill(&mut self) {
117        let now = Instant::now();
118        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
119        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
120        self.last_refill = now;
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[tokio::test]
129    async fn test_initial_tokens_available() {
130        let limiter = RateLimiter::new(10, Some(10));
131        let available = limiter.available().await;
132        assert!(available > 9.0);
133    }
134
135    #[tokio::test]
136    async fn test_acquire_consumes_token() {
137        let limiter = RateLimiter::new(10, Some(10));
138        let before = limiter.available().await;
139        limiter.acquire().await;
140        let after = limiter.available().await;
141        assert!(after < before);
142    }
143
144    #[tokio::test]
145    async fn test_try_acquire() {
146        let limiter = RateLimiter::new(100, Some(100));
147        assert!(limiter.try_acquire().await);
148        assert!(limiter.try_acquire().await);
149    }
150
151    #[tokio::test]
152    async fn test_burst_behavior() {
153        let limiter = RateLimiter::new(100, Some(5));
154        // Should be able to burst-acquire up to 5 immediately
155        for _ in 0..5 {
156            assert!(limiter.try_acquire().await);
157        }
158        // 6th should fail (burst exhausted)
159        assert!(!limiter.try_acquire().await);
160    }
161
162    #[test]
163    fn test_default_constructors() {
164        let _public = RateLimiter::public_rest();
165        let _private = RateLimiter::private_rest();
166        let _ws = RateLimiter::ws_order_entry();
167    }
168}