Skip to main content

irontide_engine_support/
rate_limiter.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    reason = "M175: token-bucket rate limiter — u128 → u64 truncation harmless at realistic rate budgets"
4)]
5
6//! Token bucket rate limiter.
7
8use std::time::Duration;
9
10/// Token bucket rate limiter.
11///
12/// Tokens represent bytes. `rate` is bytes/second.
13/// Tokens are added via `refill()` (called on a timer).
14/// Burst capacity = 1 second of tokens.
15#[allow(dead_code)] // consumed by torrent/session modules (wired in later tasks)
16pub struct TokenBucket {
17    rate: u64,     // bytes/sec, 0 = unlimited
18    tokens: u64,   // current available tokens
19    capacity: u64, // max tokens (= rate, i.e., 1 second burst)
20}
21
22#[allow(dead_code)]
23impl TokenBucket {
24    #[must_use]
25    pub fn new(rate: u64) -> Self {
26        Self {
27            rate,
28            tokens: 0,
29            capacity: rate, // 1 second burst
30        }
31    }
32
33    #[must_use]
34    pub fn unlimited() -> Self {
35        Self {
36            rate: 0,
37            tokens: 0,
38            capacity: 0,
39        }
40    }
41
42    #[must_use]
43    pub fn is_unlimited(&self) -> bool {
44        self.rate == 0
45    }
46
47    /// Current rate limit in bytes/sec (0 = unlimited).
48    #[must_use]
49    pub fn rate(&self) -> u64 {
50        self.rate
51    }
52
53    /// Add tokens proportional to elapsed time.
54    pub fn refill(&mut self, elapsed: Duration) {
55        if self.rate == 0 {
56            return;
57        }
58        let add = (u128::from(self.rate) * elapsed.as_millis() / 1000) as u64;
59        self.tokens = (self.tokens + add).min(self.capacity);
60    }
61
62    /// Try to consume `amount` tokens. Returns true if allowed.
63    pub fn try_consume(&mut self, amount: u64) -> bool {
64        if self.rate == 0 {
65            return true;
66        }
67        if self.tokens >= amount {
68            self.tokens -= amount;
69            true
70        } else {
71            false
72        }
73    }
74
75    /// How many bytes can be consumed right now.
76    #[must_use]
77    pub fn available(&self) -> u64 {
78        if self.rate == 0 {
79            u64::MAX
80        } else {
81            self.tokens
82        }
83    }
84
85    /// Update the rate limit. Resets capacity but preserves current tokens (clamped).
86    pub fn set_rate(&mut self, rate: u64) {
87        self.rate = rate;
88        self.capacity = rate;
89        if rate > 0 {
90            self.tokens = self.tokens.min(self.capacity);
91        }
92    }
93}
94
95/// Transport type for per-class rate limiting.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
97#[allow(dead_code)]
98pub enum PeerTransport {
99    Tcp,
100    Utp,
101}
102
103/// Mixed-mode bandwidth allocation algorithm — lifted to irontide-core (M242).
104pub use irontide_core::MixedModeAlgorithm;
105
106/// Per-class rate limiter set (BEP 40 / libtorrent parity).
107///
108/// Maintains separate upload/download buckets for TCP and uTP, plus global
109/// upload/download buckets. Uses check-before-consume pattern to avoid
110/// partial consumption when one bucket has capacity but another doesn't.
111#[allow(dead_code)]
112pub struct RateLimiterSet {
113    tcp_upload: TokenBucket,
114    tcp_download: TokenBucket,
115    utp_upload: TokenBucket,
116    utp_download: TokenBucket,
117    global_upload: TokenBucket,
118    global_download: TokenBucket,
119}
120
121#[allow(dead_code)]
122impl RateLimiterSet {
123    /// Create a new rate limiter set. Rate of 0 = unlimited.
124    #[must_use]
125    pub fn new(
126        tcp_upload_rate: u64,
127        tcp_download_rate: u64,
128        utp_upload_rate: u64,
129        utp_download_rate: u64,
130        global_upload_rate: u64,
131        global_download_rate: u64,
132    ) -> Self {
133        Self {
134            tcp_upload: TokenBucket::new(tcp_upload_rate),
135            tcp_download: TokenBucket::new(tcp_download_rate),
136            utp_upload: TokenBucket::new(utp_upload_rate),
137            utp_download: TokenBucket::new(utp_download_rate),
138            global_upload: TokenBucket::new(global_upload_rate),
139            global_download: TokenBucket::new(global_download_rate),
140        }
141    }
142
143    /// Refill all buckets proportional to elapsed time.
144    pub fn refill(&mut self, elapsed: Duration) {
145        self.tcp_upload.refill(elapsed);
146        self.tcp_download.refill(elapsed);
147        self.utp_upload.refill(elapsed);
148        self.utp_download.refill(elapsed);
149        self.global_upload.refill(elapsed);
150        self.global_download.refill(elapsed);
151    }
152
153    /// Try to consume upload tokens for the given transport class.
154    ///
155    /// Checks both the class bucket and global bucket *before* consuming
156    /// either, to avoid partial consumption without refund.
157    pub fn try_consume_upload(&mut self, amount: u64, transport: PeerTransport) -> bool {
158        let class = match transport {
159            PeerTransport::Tcp => &self.tcp_upload,
160            PeerTransport::Utp => &self.utp_upload,
161        };
162        // Check both before consuming either
163        if !class.is_unlimited() && class.available() < amount {
164            return false;
165        }
166        if !self.global_upload.is_unlimited() && self.global_upload.available() < amount {
167            return false;
168        }
169        // Both have capacity — consume from both
170        let class = match transport {
171            PeerTransport::Tcp => &mut self.tcp_upload,
172            PeerTransport::Utp => &mut self.utp_upload,
173        };
174        class.try_consume(amount);
175        self.global_upload.try_consume(amount);
176        true
177    }
178
179    /// Try to consume download tokens for the given transport class.
180    pub fn try_consume_download(&mut self, amount: u64, transport: PeerTransport) -> bool {
181        let class = match transport {
182            PeerTransport::Tcp => &self.tcp_download,
183            PeerTransport::Utp => &self.utp_download,
184        };
185        if !class.is_unlimited() && class.available() < amount {
186            return false;
187        }
188        if !self.global_download.is_unlimited() && self.global_download.available() < amount {
189            return false;
190        }
191        let class = match transport {
192            PeerTransport::Tcp => &mut self.tcp_download,
193            PeerTransport::Utp => &mut self.utp_download,
194        };
195        class.try_consume(amount);
196        self.global_download.try_consume(amount);
197        true
198    }
199
200    /// Update per-class rates at runtime (e.g., from `apply_settings`).
201    pub fn set_rates(
202        &mut self,
203        tcp_upload: u64,
204        tcp_download: u64,
205        utp_upload: u64,
206        utp_download: u64,
207        global_upload: u64,
208        global_download: u64,
209    ) {
210        self.tcp_upload.set_rate(tcp_upload);
211        self.tcp_download.set_rate(tcp_download);
212        self.utp_upload.set_rate(utp_upload);
213        self.utp_download.set_rate(utp_download);
214        self.global_upload.set_rate(global_upload);
215        self.global_download.set_rate(global_download);
216    }
217
218    /// Apply mixed-mode bandwidth allocation based on peer transport composition.
219    /// Only adjusts upload — download is not throttled by transport type.
220    pub fn apply_mixed_mode(
221        &mut self,
222        algorithm: MixedModeAlgorithm,
223        tcp_peers: usize,
224        utp_peers: usize,
225        global_upload_rate: u64,
226    ) {
227        if global_upload_rate == 0 {
228            self.tcp_upload.set_rate(0);
229            self.utp_upload.set_rate(0);
230            return;
231        }
232        if tcp_peers == 0 && utp_peers == 0 {
233            self.tcp_upload.set_rate(0);
234            self.utp_upload.set_rate(0);
235            return;
236        }
237        match algorithm {
238            MixedModeAlgorithm::PreferTcp => {
239                if tcp_peers > 0 && utp_peers > 0 {
240                    let tcp_rate = global_upload_rate * 9 / 10;
241                    let utp_rate = global_upload_rate / 10;
242                    self.tcp_upload.set_rate(tcp_rate.max(1));
243                    self.utp_upload.set_rate(utp_rate.max(1));
244                } else {
245                    self.tcp_upload.set_rate(0);
246                    self.utp_upload.set_rate(0);
247                }
248            }
249            MixedModeAlgorithm::PeerProportional => {
250                let total = tcp_peers + utp_peers;
251                let tcp_rate = global_upload_rate * tcp_peers as u64 / total as u64;
252                let utp_rate = global_upload_rate * utp_peers as u64 / total as u64;
253                self.tcp_upload
254                    .set_rate(if tcp_peers > 0 { tcp_rate.max(1) } else { 0 });
255                self.utp_upload
256                    .set_rate(if utp_peers > 0 { utp_rate.max(1) } else { 0 });
257            }
258        }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn unlimited_bucket_always_allows() {
268        let mut tb = TokenBucket::unlimited();
269        assert!(tb.try_consume(1_000_000));
270        assert!(tb.is_unlimited());
271        assert_eq!(tb.available(), u64::MAX);
272    }
273
274    #[test]
275    fn limited_bucket_allows_up_to_capacity() {
276        let mut tb = TokenBucket::new(1000); // 1000 bytes/sec
277        tb.refill(Duration::from_millis(100)); // +100 tokens
278        assert!(tb.try_consume(100));
279        assert!(!tb.try_consume(1)); // exhausted
280    }
281
282    #[test]
283    fn refill_adds_tokens_proportionally() {
284        let mut tb = TokenBucket::new(10_000); // 10 KB/s
285        tb.refill(Duration::from_millis(100)); // +1000 tokens
286        assert!(tb.try_consume(1000));
287        assert!(!tb.try_consume(1));
288    }
289
290    #[test]
291    fn tokens_cap_at_one_second_burst() {
292        let mut tb = TokenBucket::new(1000);
293        tb.refill(Duration::from_secs(5)); // would be 5000, but capped at 1000
294        assert!(tb.try_consume(1000));
295        assert!(!tb.try_consume(1));
296    }
297
298    #[test]
299    fn try_consume_partial() {
300        let mut tb = TokenBucket::new(1000);
301        tb.refill(Duration::from_millis(100)); // +100
302        assert_eq!(tb.available(), 100);
303        assert!(tb.try_consume(50));
304        assert_eq!(tb.available(), 50);
305    }
306
307    #[test]
308    fn set_rate_clamps_tokens() {
309        let mut tb = TokenBucket::new(1000);
310        tb.refill(Duration::from_secs(1)); // 1000 tokens
311        assert_eq!(tb.available(), 1000);
312        tb.set_rate(500); // capacity now 500, tokens clamped
313        assert_eq!(tb.available(), 500);
314    }
315
316    #[test]
317    fn rate_limiter_set_all_unlimited() {
318        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 0, 0);
319        rls.refill(Duration::from_secs(1));
320        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Tcp));
321        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Utp));
322        assert!(rls.try_consume_download(1_000_000, PeerTransport::Tcp));
323        assert!(rls.try_consume_download(1_000_000, PeerTransport::Utp));
324    }
325
326    #[test]
327    fn rate_limiter_set_class_limited() {
328        let mut rls = RateLimiterSet::new(1000, 1000, 500, 500, 0, 0);
329        rls.refill(Duration::from_secs(1));
330        // TCP: 1000 capacity
331        assert!(rls.try_consume_upload(1000, PeerTransport::Tcp));
332        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp)); // exhausted
333        // uTP: 500 capacity, independent
334        assert!(rls.try_consume_upload(500, PeerTransport::Utp));
335        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
336    }
337
338    #[test]
339    fn rate_limiter_set_global_limits() {
340        // Global upload limit = 500, class limit = 1000 each
341        let mut rls = RateLimiterSet::new(1000, 0, 1000, 0, 500, 0);
342        rls.refill(Duration::from_secs(1));
343        // TCP class has 1000, but global only has 500
344        assert!(rls.try_consume_upload(500, PeerTransport::Tcp));
345        // Now global is exhausted — uTP should also be blocked
346        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
347    }
348
349    #[test]
350    fn rate_limiter_set_check_before_consume_no_partial() {
351        // If global allows but class doesn't, no partial consumption
352        let mut rls = RateLimiterSet::new(100, 0, 0, 0, 1000, 0);
353        rls.refill(Duration::from_secs(1));
354        assert!(rls.try_consume_upload(100, PeerTransport::Tcp));
355        // Class exhausted, global still has 900 — should fail cleanly
356        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
357        // uTP is unlimited, global has 900
358        assert!(rls.try_consume_upload(900, PeerTransport::Utp));
359    }
360
361    #[test]
362    fn rate_limiter_set_refill_all() {
363        let mut rls = RateLimiterSet::new(1000, 2000, 500, 750, 5000, 10000);
364        rls.refill(Duration::from_millis(100));
365        // Each bucket should have 10% of its rate
366        assert!(rls.try_consume_upload(100, PeerTransport::Tcp));
367        assert!(rls.try_consume_download(200, PeerTransport::Tcp));
368        assert!(rls.try_consume_upload(50, PeerTransport::Utp));
369        assert!(rls.try_consume_download(75, PeerTransport::Utp));
370    }
371
372    #[test]
373    fn rate_limiter_set_runtime_update() {
374        let mut rls = RateLimiterSet::new(1000, 1000, 1000, 1000, 0, 0);
375        rls.refill(Duration::from_secs(1));
376        assert!(rls.try_consume_upload(1000, PeerTransport::Tcp));
377        // Update TCP upload to 500
378        rls.set_rates(500, 1000, 1000, 1000, 0, 0);
379        rls.refill(Duration::from_secs(1));
380        assert!(rls.try_consume_upload(500, PeerTransport::Tcp));
381        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
382    }
383
384    #[test]
385    fn mixed_mode_prefer_tcp_both_present() {
386        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 10000, 0);
387        rls.apply_mixed_mode(MixedModeAlgorithm::PreferTcp, 3, 5, 10000);
388        rls.refill(Duration::from_secs(1));
389        assert!(rls.try_consume_upload(9000, PeerTransport::Tcp));
390        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
391        rls.refill(Duration::from_secs(1));
392        assert!(rls.try_consume_upload(1000, PeerTransport::Utp));
393        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
394    }
395
396    #[test]
397    fn mixed_mode_prefer_tcp_only_utp() {
398        // When only uTP peers exist, per-class rate is set to unlimited (0),
399        // so uTP can consume up to the full global limit without per-class throttling.
400        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 10000, 0);
401        rls.apply_mixed_mode(MixedModeAlgorithm::PreferTcp, 0, 5, 10000);
402        rls.refill(Duration::from_secs(1));
403        // uTP per-class bucket is unlimited, so full global capacity is available
404        assert!(rls.try_consume_upload(10000, PeerTransport::Utp));
405        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
406    }
407
408    #[test]
409    fn mixed_mode_proportional() {
410        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 10000, 0);
411        rls.apply_mixed_mode(MixedModeAlgorithm::PeerProportional, 3, 7, 10000);
412        rls.refill(Duration::from_secs(1));
413        assert!(rls.try_consume_upload(3000, PeerTransport::Tcp));
414        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
415        rls.refill(Duration::from_secs(1));
416        assert!(rls.try_consume_upload(7000, PeerTransport::Utp));
417        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
418    }
419
420    #[test]
421    fn mixed_mode_unlimited_global_noop() {
422        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 0, 0);
423        rls.apply_mixed_mode(MixedModeAlgorithm::PeerProportional, 3, 7, 0);
424        rls.refill(Duration::from_secs(1));
425        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Tcp));
426        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Utp));
427    }
428}