Skip to main content

irontide_session/
rate_limiter.rs

1//! Token bucket rate limiter and local network detection.
2
3use std::net::IpAddr;
4use std::time::Duration;
5
6use serde::{Deserialize, Serialize};
7
8/// Token bucket rate limiter.
9///
10/// Tokens represent bytes. `rate` is bytes/second.
11/// Tokens are added via `refill()` (called on a timer).
12/// Burst capacity = 1 second of tokens.
13#[allow(dead_code)] // consumed by torrent/session modules (wired in later tasks)
14pub(crate) struct TokenBucket {
15    rate: u64,     // bytes/sec, 0 = unlimited
16    tokens: u64,   // current available tokens
17    capacity: u64, // max tokens (= rate, i.e., 1 second burst)
18}
19
20#[allow(dead_code)]
21impl TokenBucket {
22    pub fn new(rate: u64) -> Self {
23        Self {
24            rate,
25            tokens: 0,
26            capacity: rate, // 1 second burst
27        }
28    }
29
30    pub fn unlimited() -> Self {
31        Self {
32            rate: 0,
33            tokens: 0,
34            capacity: 0,
35        }
36    }
37
38    pub fn is_unlimited(&self) -> bool {
39        self.rate == 0
40    }
41
42    /// Current rate limit in bytes/sec (0 = unlimited).
43    pub fn rate(&self) -> u64 {
44        self.rate
45    }
46
47    /// Add tokens proportional to elapsed time.
48    pub fn refill(&mut self, elapsed: Duration) {
49        if self.rate == 0 {
50            return;
51        }
52        let add = (self.rate as u128 * elapsed.as_millis() / 1000) as u64;
53        self.tokens = (self.tokens + add).min(self.capacity);
54    }
55
56    /// Try to consume `amount` tokens. Returns true if allowed.
57    pub fn try_consume(&mut self, amount: u64) -> bool {
58        if self.rate == 0 {
59            return true;
60        }
61        if self.tokens >= amount {
62            self.tokens -= amount;
63            true
64        } else {
65            false
66        }
67    }
68
69    /// How many bytes can be consumed right now.
70    pub fn available(&self) -> u64 {
71        if self.rate == 0 {
72            u64::MAX
73        } else {
74            self.tokens
75        }
76    }
77
78    /// Update the rate limit. Resets capacity but preserves current tokens (clamped).
79    pub fn set_rate(&mut self, rate: u64) {
80        self.rate = rate;
81        self.capacity = rate;
82        if rate > 0 {
83            self.tokens = self.tokens.min(self.capacity);
84        }
85    }
86}
87
88/// Transport type for per-class rate limiting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90#[allow(dead_code)]
91pub(crate) enum PeerTransport {
92    Tcp,
93    Utp,
94}
95
96/// Mixed-mode bandwidth allocation algorithm for TCP/uTP coexistence.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
98pub enum MixedModeAlgorithm {
99    /// Throttle uTP upload when any TCP peer is connected.
100    /// uTP gets at most 10% of the global upload rate when TCP peers are present.
101    PreferTcp,
102    /// Allocate bandwidth proportional to the number of TCP vs uTP peers.
103    PeerProportional,
104}
105
106/// Per-class rate limiter set (BEP 40 / libtorrent parity).
107///
108/// Maintains separate upload/download buckets for TCP and uTP, plus global
109/// upload/download buckets. Uses check-before-consume pattern to avoid
110/// partial consumption when one bucket has capacity but another doesn't.
111#[allow(dead_code)]
112pub(crate) struct RateLimiterSet {
113    tcp_upload: TokenBucket,
114    tcp_download: TokenBucket,
115    utp_upload: TokenBucket,
116    utp_download: TokenBucket,
117    global_upload: TokenBucket,
118    global_download: TokenBucket,
119}
120
121#[allow(dead_code)]
122impl RateLimiterSet {
123    /// Create a new rate limiter set. Rate of 0 = unlimited.
124    pub fn new(
125        tcp_upload_rate: u64,
126        tcp_download_rate: u64,
127        utp_upload_rate: u64,
128        utp_download_rate: u64,
129        global_upload_rate: u64,
130        global_download_rate: u64,
131    ) -> Self {
132        Self {
133            tcp_upload: TokenBucket::new(tcp_upload_rate),
134            tcp_download: TokenBucket::new(tcp_download_rate),
135            utp_upload: TokenBucket::new(utp_upload_rate),
136            utp_download: TokenBucket::new(utp_download_rate),
137            global_upload: TokenBucket::new(global_upload_rate),
138            global_download: TokenBucket::new(global_download_rate),
139        }
140    }
141
142    /// Refill all buckets proportional to elapsed time.
143    pub fn refill(&mut self, elapsed: Duration) {
144        self.tcp_upload.refill(elapsed);
145        self.tcp_download.refill(elapsed);
146        self.utp_upload.refill(elapsed);
147        self.utp_download.refill(elapsed);
148        self.global_upload.refill(elapsed);
149        self.global_download.refill(elapsed);
150    }
151
152    /// Try to consume upload tokens for the given transport class.
153    ///
154    /// Checks both the class bucket and global bucket *before* consuming
155    /// either, to avoid partial consumption without refund.
156    pub fn try_consume_upload(&mut self, amount: u64, transport: PeerTransport) -> bool {
157        let class = match transport {
158            PeerTransport::Tcp => &self.tcp_upload,
159            PeerTransport::Utp => &self.utp_upload,
160        };
161        // Check both before consuming either
162        if !class.is_unlimited() && class.available() < amount {
163            return false;
164        }
165        if !self.global_upload.is_unlimited() && self.global_upload.available() < amount {
166            return false;
167        }
168        // Both have capacity — consume from both
169        let class = match transport {
170            PeerTransport::Tcp => &mut self.tcp_upload,
171            PeerTransport::Utp => &mut self.utp_upload,
172        };
173        class.try_consume(amount);
174        self.global_upload.try_consume(amount);
175        true
176    }
177
178    /// Try to consume download tokens for the given transport class.
179    pub fn try_consume_download(&mut self, amount: u64, transport: PeerTransport) -> bool {
180        let class = match transport {
181            PeerTransport::Tcp => &self.tcp_download,
182            PeerTransport::Utp => &self.utp_download,
183        };
184        if !class.is_unlimited() && class.available() < amount {
185            return false;
186        }
187        if !self.global_download.is_unlimited() && self.global_download.available() < amount {
188            return false;
189        }
190        let class = match transport {
191            PeerTransport::Tcp => &mut self.tcp_download,
192            PeerTransport::Utp => &mut self.utp_download,
193        };
194        class.try_consume(amount);
195        self.global_download.try_consume(amount);
196        true
197    }
198
199    /// Update per-class rates at runtime (e.g., from apply_settings).
200    pub fn set_rates(
201        &mut self,
202        tcp_upload: u64,
203        tcp_download: u64,
204        utp_upload: u64,
205        utp_download: u64,
206        global_upload: u64,
207        global_download: u64,
208    ) {
209        self.tcp_upload.set_rate(tcp_upload);
210        self.tcp_download.set_rate(tcp_download);
211        self.utp_upload.set_rate(utp_upload);
212        self.utp_download.set_rate(utp_download);
213        self.global_upload.set_rate(global_upload);
214        self.global_download.set_rate(global_download);
215    }
216
217    /// Apply mixed-mode bandwidth allocation based on peer transport composition.
218    /// Only adjusts upload — download is not throttled by transport type.
219    pub fn apply_mixed_mode(
220        &mut self,
221        algorithm: MixedModeAlgorithm,
222        tcp_peers: usize,
223        utp_peers: usize,
224        global_upload_rate: u64,
225    ) {
226        if global_upload_rate == 0 {
227            self.tcp_upload.set_rate(0);
228            self.utp_upload.set_rate(0);
229            return;
230        }
231        if tcp_peers == 0 && utp_peers == 0 {
232            self.tcp_upload.set_rate(0);
233            self.utp_upload.set_rate(0);
234            return;
235        }
236        match algorithm {
237            MixedModeAlgorithm::PreferTcp => {
238                if tcp_peers > 0 && utp_peers > 0 {
239                    let tcp_rate = global_upload_rate * 9 / 10;
240                    let utp_rate = global_upload_rate / 10;
241                    self.tcp_upload.set_rate(tcp_rate.max(1));
242                    self.utp_upload.set_rate(utp_rate.max(1));
243                } else {
244                    self.tcp_upload.set_rate(0);
245                    self.utp_upload.set_rate(0);
246                }
247            }
248            MixedModeAlgorithm::PeerProportional => {
249                let total = tcp_peers + utp_peers;
250                let tcp_rate = global_upload_rate * tcp_peers as u64 / total as u64;
251                let utp_rate = global_upload_rate * utp_peers as u64 / total as u64;
252                self.tcp_upload
253                    .set_rate(if tcp_peers > 0 { tcp_rate.max(1) } else { 0 });
254                self.utp_upload
255                    .set_rate(if utp_peers > 0 { utp_rate.max(1) } else { 0 });
256            }
257        }
258    }
259}
260
261/// Check if an IP address is on a local/private network.
262///
263/// IPv4: loopback, private (RFC 1918), link-local (169.254.0.0/16).
264/// IPv6: loopback (::1), link-local (fe80::/10), unique-local / ULA (fc00::/7).
265#[allow(dead_code)] // consumed by torrent module (wired in later tasks)
266pub(crate) fn is_local_network(addr: IpAddr) -> bool {
267    match addr {
268        IpAddr::V4(ip) => ip.is_loopback() || ip.is_private() || ip.is_link_local(),
269        IpAddr::V6(ip) => {
270            if ip.is_loopback() {
271                return true;
272            }
273            let octets = ip.octets();
274            // fe80::/10 — link-local
275            if octets[0] == 0xfe && (octets[1] & 0xc0) == 0x80 {
276                return true;
277            }
278            // fc00::/7 — unique-local (ULA)
279            if (octets[0] & 0xfe) == 0xfc {
280                return true;
281            }
282            false
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn unlimited_bucket_always_allows() {
293        let mut tb = TokenBucket::unlimited();
294        assert!(tb.try_consume(1_000_000));
295        assert!(tb.is_unlimited());
296        assert_eq!(tb.available(), u64::MAX);
297    }
298
299    #[test]
300    fn limited_bucket_allows_up_to_capacity() {
301        let mut tb = TokenBucket::new(1000); // 1000 bytes/sec
302        tb.refill(Duration::from_millis(100)); // +100 tokens
303        assert!(tb.try_consume(100));
304        assert!(!tb.try_consume(1)); // exhausted
305    }
306
307    #[test]
308    fn refill_adds_tokens_proportionally() {
309        let mut tb = TokenBucket::new(10_000); // 10 KB/s
310        tb.refill(Duration::from_millis(100)); // +1000 tokens
311        assert!(tb.try_consume(1000));
312        assert!(!tb.try_consume(1));
313    }
314
315    #[test]
316    fn tokens_cap_at_one_second_burst() {
317        let mut tb = TokenBucket::new(1000);
318        tb.refill(Duration::from_secs(5)); // would be 5000, but capped at 1000
319        assert!(tb.try_consume(1000));
320        assert!(!tb.try_consume(1));
321    }
322
323    #[test]
324    fn try_consume_partial() {
325        let mut tb = TokenBucket::new(1000);
326        tb.refill(Duration::from_millis(100)); // +100
327        assert_eq!(tb.available(), 100);
328        assert!(tb.try_consume(50));
329        assert_eq!(tb.available(), 50);
330    }
331
332    #[test]
333    fn set_rate_clamps_tokens() {
334        let mut tb = TokenBucket::new(1000);
335        tb.refill(Duration::from_secs(1)); // 1000 tokens
336        assert_eq!(tb.available(), 1000);
337        tb.set_rate(500); // capacity now 500, tokens clamped
338        assert_eq!(tb.available(), 500);
339    }
340
341    #[test]
342    fn local_network_detection() {
343        assert!(is_local_network("127.0.0.1".parse().unwrap()));
344        assert!(is_local_network("192.168.1.1".parse().unwrap()));
345        assert!(is_local_network("10.0.0.1".parse().unwrap()));
346        assert!(is_local_network("172.16.0.1".parse().unwrap()));
347        assert!(is_local_network("169.254.1.1".parse().unwrap()));
348        assert!(is_local_network("::1".parse().unwrap()));
349        assert!(!is_local_network("8.8.8.8".parse().unwrap()));
350        assert!(!is_local_network("1.2.3.4".parse().unwrap()));
351    }
352
353    #[test]
354    fn ipv6_local_network_detection() {
355        // Loopback
356        assert!(is_local_network("::1".parse().unwrap()));
357        // Link-local (fe80::/10)
358        assert!(is_local_network("fe80::1".parse().unwrap()));
359        assert!(is_local_network("fe80::abcd:1234".parse().unwrap()));
360        // Unique-local / ULA (fc00::/7)
361        assert!(is_local_network("fc00::1".parse().unwrap()));
362        assert!(is_local_network("fd00::1".parse().unwrap()));
363        assert!(is_local_network("fd12:3456:789a::1".parse().unwrap()));
364        // Global unicast — not local
365        assert!(!is_local_network("2001:db8::1".parse().unwrap()));
366        assert!(!is_local_network(
367            "2607:f8b0:4004:800::200e".parse().unwrap()
368        ));
369    }
370
371    #[test]
372    fn rate_limiter_set_all_unlimited() {
373        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 0, 0);
374        rls.refill(Duration::from_secs(1));
375        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Tcp));
376        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Utp));
377        assert!(rls.try_consume_download(1_000_000, PeerTransport::Tcp));
378        assert!(rls.try_consume_download(1_000_000, PeerTransport::Utp));
379    }
380
381    #[test]
382    fn rate_limiter_set_class_limited() {
383        let mut rls = RateLimiterSet::new(1000, 1000, 500, 500, 0, 0);
384        rls.refill(Duration::from_secs(1));
385        // TCP: 1000 capacity
386        assert!(rls.try_consume_upload(1000, PeerTransport::Tcp));
387        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp)); // exhausted
388        // uTP: 500 capacity, independent
389        assert!(rls.try_consume_upload(500, PeerTransport::Utp));
390        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
391    }
392
393    #[test]
394    fn rate_limiter_set_global_limits() {
395        // Global upload limit = 500, class limit = 1000 each
396        let mut rls = RateLimiterSet::new(1000, 0, 1000, 0, 500, 0);
397        rls.refill(Duration::from_secs(1));
398        // TCP class has 1000, but global only has 500
399        assert!(rls.try_consume_upload(500, PeerTransport::Tcp));
400        // Now global is exhausted — uTP should also be blocked
401        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
402    }
403
404    #[test]
405    fn rate_limiter_set_check_before_consume_no_partial() {
406        // If global allows but class doesn't, no partial consumption
407        let mut rls = RateLimiterSet::new(100, 0, 0, 0, 1000, 0);
408        rls.refill(Duration::from_secs(1));
409        assert!(rls.try_consume_upload(100, PeerTransport::Tcp));
410        // Class exhausted, global still has 900 — should fail cleanly
411        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
412        // uTP is unlimited, global has 900
413        assert!(rls.try_consume_upload(900, PeerTransport::Utp));
414    }
415
416    #[test]
417    fn rate_limiter_set_refill_all() {
418        let mut rls = RateLimiterSet::new(1000, 2000, 500, 750, 5000, 10000);
419        rls.refill(Duration::from_millis(100));
420        // Each bucket should have 10% of its rate
421        assert!(rls.try_consume_upload(100, PeerTransport::Tcp));
422        assert!(rls.try_consume_download(200, PeerTransport::Tcp));
423        assert!(rls.try_consume_upload(50, PeerTransport::Utp));
424        assert!(rls.try_consume_download(75, PeerTransport::Utp));
425    }
426
427    #[test]
428    fn rate_limiter_set_runtime_update() {
429        let mut rls = RateLimiterSet::new(1000, 1000, 1000, 1000, 0, 0);
430        rls.refill(Duration::from_secs(1));
431        assert!(rls.try_consume_upload(1000, PeerTransport::Tcp));
432        // Update TCP upload to 500
433        rls.set_rates(500, 1000, 1000, 1000, 0, 0);
434        rls.refill(Duration::from_secs(1));
435        assert!(rls.try_consume_upload(500, PeerTransport::Tcp));
436        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
437    }
438
439    #[test]
440    fn mixed_mode_prefer_tcp_both_present() {
441        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 10000, 0);
442        rls.apply_mixed_mode(MixedModeAlgorithm::PreferTcp, 3, 5, 10000);
443        rls.refill(Duration::from_secs(1));
444        assert!(rls.try_consume_upload(9000, PeerTransport::Tcp));
445        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
446        rls.refill(Duration::from_secs(1));
447        assert!(rls.try_consume_upload(1000, PeerTransport::Utp));
448        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
449    }
450
451    #[test]
452    fn mixed_mode_prefer_tcp_only_utp() {
453        // When only uTP peers exist, per-class rate is set to unlimited (0),
454        // so uTP can consume up to the full global limit without per-class throttling.
455        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 10000, 0);
456        rls.apply_mixed_mode(MixedModeAlgorithm::PreferTcp, 0, 5, 10000);
457        rls.refill(Duration::from_secs(1));
458        // uTP per-class bucket is unlimited, so full global capacity is available
459        assert!(rls.try_consume_upload(10000, PeerTransport::Utp));
460        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
461    }
462
463    #[test]
464    fn mixed_mode_proportional() {
465        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 10000, 0);
466        rls.apply_mixed_mode(MixedModeAlgorithm::PeerProportional, 3, 7, 10000);
467        rls.refill(Duration::from_secs(1));
468        assert!(rls.try_consume_upload(3000, PeerTransport::Tcp));
469        assert!(!rls.try_consume_upload(1, PeerTransport::Tcp));
470        rls.refill(Duration::from_secs(1));
471        assert!(rls.try_consume_upload(7000, PeerTransport::Utp));
472        assert!(!rls.try_consume_upload(1, PeerTransport::Utp));
473    }
474
475    #[test]
476    fn mixed_mode_unlimited_global_noop() {
477        let mut rls = RateLimiterSet::new(0, 0, 0, 0, 0, 0);
478        rls.apply_mixed_mode(MixedModeAlgorithm::PeerProportional, 3, 7, 0);
479        rls.refill(Duration::from_secs(1));
480        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Tcp));
481        assert!(rls.try_consume_upload(1_000_000, PeerTransport::Utp));
482    }
483}