phantom_protocol/transport/
pacer.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::time::{Duration, Instant};
19
20const MIN_PACING_RATE: u64 = 1_024;
22
23const MAX_BURST_BYTES: u64 = 64 * 1_024; pub struct Pacer {
30 tokens: AtomicU64,
32 rate_bps: AtomicU64,
34 max_burst: u64,
36 last_refill_ns: AtomicU64,
38 enabled: AtomicBool,
40 epoch: Instant,
42}
43
44impl Pacer {
45 pub fn new(initial_rate_bps: u64) -> Self {
47 let rate = initial_rate_bps.max(MIN_PACING_RATE);
48 let epoch = Instant::now();
49
50 Self {
51 tokens: AtomicU64::new(MAX_BURST_BYTES),
52 rate_bps: AtomicU64::new(rate),
53 max_burst: MAX_BURST_BYTES,
54 last_refill_ns: AtomicU64::new(0),
55 enabled: AtomicBool::new(true),
56 epoch,
57 }
58 }
59
60 pub fn unlimited() -> Self {
62 let pacer = Self::new(u64::MAX);
63 pacer.enabled.store(false, Ordering::Relaxed);
64 pacer
65 }
66
67 pub fn try_consume(&self, bytes: u64) -> bool {
69 if !self.enabled.load(Ordering::Relaxed) {
70 return true;
71 }
72
73 self.refill_tokens();
74
75 let current = self.tokens.load(Ordering::Relaxed);
76 if current >= bytes {
77 self.tokens.fetch_sub(bytes, Ordering::Relaxed);
78 true
79 } else {
80 false
81 }
82 }
83
84 pub fn time_until_available(&self, bytes: u64) -> Duration {
86 if !self.enabled.load(Ordering::Relaxed) {
87 return Duration::ZERO;
88 }
89
90 self.refill_tokens();
91
92 let current = self.tokens.load(Ordering::Relaxed);
93 if current >= bytes {
94 return Duration::ZERO;
95 }
96
97 let deficit = bytes - current;
98 let rate = self.rate_bps.load(Ordering::Relaxed).max(1);
99 Duration::from_nanos(deficit * 1_000_000_000 / rate)
100 }
101
102 pub fn set_rate(&self, rate_bps: u64) {
104 let rate = rate_bps.max(MIN_PACING_RATE);
105 self.rate_bps.store(rate, Ordering::Relaxed);
106 }
107
108 pub fn rate(&self) -> u64 {
110 self.rate_bps.load(Ordering::Relaxed)
111 }
112
113 pub fn set_enabled(&self, enabled: bool) {
115 self.enabled.store(enabled, Ordering::Relaxed);
116 }
117
118 pub fn is_enabled(&self) -> bool {
120 self.enabled.load(Ordering::Relaxed)
121 }
122
123 pub fn available_tokens(&self) -> u64 {
125 self.refill_tokens();
126 self.tokens.load(Ordering::Relaxed)
127 }
128
129 fn refill_tokens(&self) {
131 let now_ns = self.epoch.elapsed().as_nanos() as u64;
132 let last_ns = self.last_refill_ns.load(Ordering::Relaxed);
133 let elapsed_ns = now_ns.saturating_sub(last_ns);
134
135 if elapsed_ns == 0 {
136 return;
137 }
138
139 let rate = self.rate_bps.load(Ordering::Relaxed);
140 let new_tokens = (rate as u128 * elapsed_ns as u128 / 1_000_000_000) as u64;
142
143 if new_tokens > 0 {
144 loop {
146 let current = self.tokens.load(Ordering::Relaxed);
147 let updated = (current + new_tokens).min(self.max_burst);
148 if self
149 .tokens
150 .compare_exchange_weak(current, updated, Ordering::Relaxed, Ordering::Relaxed)
151 .is_ok()
152 {
153 break;
154 }
155 }
156 self.last_refill_ns.store(now_ns, Ordering::Relaxed);
157 }
158 }
159}
160
161impl std::fmt::Debug for Pacer {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("Pacer")
164 .field("rate_bps", &self.rate_bps.load(Ordering::Relaxed))
165 .field("tokens", &self.tokens.load(Ordering::Relaxed))
166 .field("enabled", &self.enabled.load(Ordering::Relaxed))
167 .finish()
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use std::thread;
175
176 #[test]
177 fn test_pacer_allows_burst() {
178 let pacer = Pacer::new(1_000_000); assert!(pacer.try_consume(1400)); assert!(pacer.try_consume(1400)); }
183
184 #[test]
185 fn test_pacer_blocks_when_empty() {
186 let pacer = Pacer::new(1_024); assert!(pacer.try_consume(MAX_BURST_BYTES));
189 assert!(!pacer.try_consume(1));
191 }
192
193 #[test]
194 fn test_pacer_refills_over_time() {
195 let pacer = Pacer::new(100_000); assert!(pacer.try_consume(MAX_BURST_BYTES));
198 assert!(!pacer.try_consume(1));
199
200 thread::sleep(Duration::from_millis(50));
202
203 let available = pacer.available_tokens();
205 assert!(
206 available > 0,
207 "expected tokens after sleep, got {}",
208 available
209 );
210 }
211
212 #[test]
213 fn test_pacer_rate_update() {
214 let pacer = Pacer::new(1_000_000);
215 assert_eq!(pacer.rate(), 1_000_000);
216
217 pacer.set_rate(2_000_000);
218 assert_eq!(pacer.rate(), 2_000_000);
219 }
220
221 #[test]
222 fn test_pacer_min_rate() {
223 let pacer = Pacer::new(0); assert_eq!(pacer.rate(), MIN_PACING_RATE);
225 }
226
227 #[test]
228 fn test_pacer_unlimited() {
229 let pacer = Pacer::unlimited();
230 assert!(!pacer.is_enabled());
231 assert!(pacer.try_consume(u64::MAX));
233 assert_eq!(pacer.time_until_available(1), Duration::ZERO);
234 }
235
236 #[test]
237 fn test_pacer_time_until_available() {
238 let pacer = Pacer::new(1_000_000); pacer.try_consume(MAX_BURST_BYTES);
241
242 let wait = pacer.time_until_available(10_000);
244 assert!(wait > Duration::ZERO);
245 assert!(wait < Duration::from_millis(50), "wait was {:?}", wait);
246 }
247}