Skip to main content

phantom_protocol/transport/
pacer.rs

1//! Token Bucket Pacer
2//!
3//! Smooths out traffic bursts by enforcing a uniform send rate.
4//! Instead of `socket.send()` dumping N packets at once (causing router queue buildups),
5//! the pacer spreads packets uniformly over the RTT interval.
6//!
7//! # Design
8//!
9//! Uses a Token Bucket algorithm:
10//! - Tokens replenish at `rate` bytes per second
11//! - Each send consumes `packet_size` tokens
12//! - If no tokens are available, the send is delayed
13//!
14//! The pacer operates in conjunction with the `BandwidthEstimator` which feeds
15//! the target rate. When BBR says "send at X MB/s", the pacer enforces that rate.
16
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::time::{Duration, Instant};
19
20/// Minimum pacing rate (1 KB/s) — prevents stalling
21const MIN_PACING_RATE: u64 = 1_024;
22
23/// Maximum tokens that can accumulate (burst allowance, in bytes)
24const MAX_BURST_BYTES: u64 = 64 * 1_024; // 64 KB
25
26/// Token Bucket Pacer
27///
28/// Thread-safe pacer that can be shared across async tasks.
29pub struct Pacer {
30    /// Current available tokens (bytes)
31    tokens: AtomicU64,
32    /// Pacing rate in bytes/sec
33    rate_bps: AtomicU64,
34    /// Maximum burst size in bytes
35    max_burst: u64,
36    /// Last token refill timestamp (nanoseconds since some epoch)
37    last_refill_ns: AtomicU64,
38    /// Whether pacing is enabled
39    enabled: AtomicBool,
40    /// Creation time (for relative nanoseconds)
41    epoch: Instant,
42}
43
44impl Pacer {
45    /// Create a new pacer with the given initial rate (bytes/sec).
46    pub fn new(initial_rate_bps: u64) -> Self {
47        let rate = initial_rate_bps.max(MIN_PACING_RATE);
48        let epoch = Instant::now();
49
50        Self {
51            tokens: AtomicU64::new(MAX_BURST_BYTES),
52            rate_bps: AtomicU64::new(rate),
53            max_burst: MAX_BURST_BYTES,
54            last_refill_ns: AtomicU64::new(0),
55            enabled: AtomicBool::new(true),
56            epoch,
57        }
58    }
59
60    /// Create an unlimited pacer (always allows sends).
61    pub fn unlimited() -> Self {
62        let pacer = Self::new(u64::MAX);
63        pacer.enabled.store(false, Ordering::Relaxed);
64        pacer
65    }
66
67    /// Try to consume `bytes` tokens. Returns `true` if allowed, `false` if should wait.
68    pub fn try_consume(&self, bytes: u64) -> bool {
69        if !self.enabled.load(Ordering::Relaxed) {
70            return true;
71        }
72
73        self.refill_tokens();
74
75        let current = self.tokens.load(Ordering::Relaxed);
76        if current >= bytes {
77            self.tokens.fetch_sub(bytes, Ordering::Relaxed);
78            true
79        } else {
80            false
81        }
82    }
83
84    /// How long to wait before `bytes` tokens are available.
85    pub fn time_until_available(&self, bytes: u64) -> Duration {
86        if !self.enabled.load(Ordering::Relaxed) {
87            return Duration::ZERO;
88        }
89
90        self.refill_tokens();
91
92        let current = self.tokens.load(Ordering::Relaxed);
93        if current >= bytes {
94            return Duration::ZERO;
95        }
96
97        let deficit = bytes - current;
98        let rate = self.rate_bps.load(Ordering::Relaxed).max(1);
99        Duration::from_nanos(deficit * 1_000_000_000 / rate)
100    }
101
102    /// Update the pacing rate (called by BandwidthEstimator).
103    pub fn set_rate(&self, rate_bps: u64) {
104        let rate = rate_bps.max(MIN_PACING_RATE);
105        self.rate_bps.store(rate, Ordering::Relaxed);
106    }
107
108    /// Get the current pacing rate (bytes/sec).
109    pub fn rate(&self) -> u64 {
110        self.rate_bps.load(Ordering::Relaxed)
111    }
112
113    /// Enable or disable pacing.
114    pub fn set_enabled(&self, enabled: bool) {
115        self.enabled.store(enabled, Ordering::Relaxed);
116    }
117
118    /// Whether pacing is enabled.
119    pub fn is_enabled(&self) -> bool {
120        self.enabled.load(Ordering::Relaxed)
121    }
122
123    /// Current available tokens.
124    pub fn available_tokens(&self) -> u64 {
125        self.refill_tokens();
126        self.tokens.load(Ordering::Relaxed)
127    }
128
129    /// Refill tokens based on elapsed time.
130    fn refill_tokens(&self) {
131        let now_ns = self.epoch.elapsed().as_nanos() as u64;
132        let last_ns = self.last_refill_ns.load(Ordering::Relaxed);
133        let elapsed_ns = now_ns.saturating_sub(last_ns);
134
135        if elapsed_ns == 0 {
136            return;
137        }
138
139        let rate = self.rate_bps.load(Ordering::Relaxed);
140        // tokens_to_add = rate * elapsed_time
141        let new_tokens = (rate as u128 * elapsed_ns as u128 / 1_000_000_000) as u64;
142
143        if new_tokens > 0 {
144            // CAS loop to update atomically
145            loop {
146                let current = self.tokens.load(Ordering::Relaxed);
147                let updated = (current + new_tokens).min(self.max_burst);
148                if self
149                    .tokens
150                    .compare_exchange_weak(current, updated, Ordering::Relaxed, Ordering::Relaxed)
151                    .is_ok()
152                {
153                    break;
154                }
155            }
156            self.last_refill_ns.store(now_ns, Ordering::Relaxed);
157        }
158    }
159}
160
161impl std::fmt::Debug for Pacer {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("Pacer")
164            .field("rate_bps", &self.rate_bps.load(Ordering::Relaxed))
165            .field("tokens", &self.tokens.load(Ordering::Relaxed))
166            .field("enabled", &self.enabled.load(Ordering::Relaxed))
167            .finish()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use std::thread;
175
176    #[test]
177    fn test_pacer_allows_burst() {
178        let pacer = Pacer::new(1_000_000); // 1 MB/s
179                                           // Initial burst should be allowed (MAX_BURST_BYTES = 64 KB)
180        assert!(pacer.try_consume(1400)); // one packet
181        assert!(pacer.try_consume(1400)); // another packet
182    }
183
184    #[test]
185    fn test_pacer_blocks_when_empty() {
186        let pacer = Pacer::new(1_024); // 1 KB/s — very slow
187                                       // Exhaust the burst allowance
188        assert!(pacer.try_consume(MAX_BURST_BYTES));
189        // No tokens left
190        assert!(!pacer.try_consume(1));
191    }
192
193    #[test]
194    fn test_pacer_refills_over_time() {
195        let pacer = Pacer::new(100_000); // 100 KB/s
196                                         // Drain all tokens
197        assert!(pacer.try_consume(MAX_BURST_BYTES));
198        assert!(!pacer.try_consume(1));
199
200        // Wait a bit for refill
201        thread::sleep(Duration::from_millis(50));
202
203        // Should have some tokens now (~5KB at 100KB/s over 50ms)
204        let available = pacer.available_tokens();
205        assert!(
206            available > 0,
207            "expected tokens after sleep, got {}",
208            available
209        );
210    }
211
212    #[test]
213    fn test_pacer_rate_update() {
214        let pacer = Pacer::new(1_000_000);
215        assert_eq!(pacer.rate(), 1_000_000);
216
217        pacer.set_rate(2_000_000);
218        assert_eq!(pacer.rate(), 2_000_000);
219    }
220
221    #[test]
222    fn test_pacer_min_rate() {
223        let pacer = Pacer::new(0); // Should clamp to MIN
224        assert_eq!(pacer.rate(), MIN_PACING_RATE);
225    }
226
227    #[test]
228    fn test_pacer_unlimited() {
229        let pacer = Pacer::unlimited();
230        assert!(!pacer.is_enabled());
231        // Should always allow
232        assert!(pacer.try_consume(u64::MAX));
233        assert_eq!(pacer.time_until_available(1), Duration::ZERO);
234    }
235
236    #[test]
237    fn test_pacer_time_until_available() {
238        let pacer = Pacer::new(1_000_000); // 1 MB/s
239                                           // Drain tokens
240        pacer.try_consume(MAX_BURST_BYTES);
241
242        // Need 10_000 bytes at 1 MB/s = 10ms
243        let wait = pacer.time_until_available(10_000);
244        assert!(wait > Duration::ZERO);
245        assert!(wait < Duration::from_millis(50), "wait was {:?}", wait);
246    }
247}