shadow_protocols/
packet_shaper.rs1use std::collections::VecDeque;
6use std::time::{Duration, Instant};
7use bytes::Bytes;
8use rand::Rng;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ShapingStrategy {
13 None,
15 ConstantRate { bytes_per_sec: u64 },
17 SizeDistribution,
19 ConstantPacketRate { packets_per_sec: u64 },
21 ProtocolMimic,
23}
24
25pub struct PacketShaper {
27 strategy: ShapingStrategy,
29 queue: VecDeque<QueuedPacket>,
31 dummy_enabled: bool,
33 last_send: Instant,
35 stats: ShapingStats,
37}
38
39#[derive(Debug, Clone)]
41struct QueuedPacket {
42 data: Bytes,
43 enqueue_time: Instant,
44 priority: u8,
45 is_dummy: bool,
46}
47
48#[derive(Debug, Clone, Default)]
50pub struct ShapingStats {
51 pub packets_shaped: u64,
52 pub dummy_packets_sent: u64,
53 pub bytes_padded: u64,
54 pub avg_queue_delay_ms: f64,
55}
56
57impl PacketShaper {
58 pub fn new(strategy: ShapingStrategy) -> Self {
60 Self {
61 strategy,
62 queue: VecDeque::new(),
63 dummy_enabled: false,
64 last_send: Instant::now(),
65 stats: ShapingStats::default(),
66 }
67 }
68
69 pub fn enqueue(&mut self, data: Bytes, priority: u8) {
71 let packet = QueuedPacket {
72 data,
73 enqueue_time: Instant::now(),
74 priority,
75 is_dummy: false,
76 };
77
78 self.queue.push_back(packet);
79 }
80
81 pub async fn next_packet(&mut self) -> Option<Bytes> {
83 match self.strategy {
84 ShapingStrategy::None => {
85 self.queue.pop_front().map(|p| p.data)
86 }
87
88 ShapingStrategy::ConstantRate { bytes_per_sec } => {
89 self.constant_rate_shaping(bytes_per_sec).await
90 }
91
92 ShapingStrategy::ConstantPacketRate { packets_per_sec } => {
93 self.constant_packet_rate_shaping(packets_per_sec).await
94 }
95
96 ShapingStrategy::SizeDistribution => {
97 self.size_distribution_shaping().await
98 }
99
100 ShapingStrategy::ProtocolMimic => {
101 self.protocol_mimic_shaping().await
102 }
103 }
104 }
105
106 async fn constant_rate_shaping(&mut self, bytes_per_sec: u64) -> Option<Bytes> {
108 let target_interval = Duration::from_secs_f64(1.0 / (bytes_per_sec as f64 / 1200.0));
109 let elapsed = self.last_send.elapsed();
110
111 if elapsed < target_interval {
112 tokio::time::sleep(target_interval - elapsed).await;
113 }
114
115 self.last_send = Instant::now();
116
117 if let Some(mut packet) = self.queue.pop_front() {
119 let delay = packet.enqueue_time.elapsed();
120 self.stats.avg_queue_delay_ms = (self.stats.avg_queue_delay_ms * 0.9) + (delay.as_millis() as f64 * 0.1);
121 self.stats.packets_shaped += 1;
122
123 let target_size = 1200;
125 if packet.data.len() < target_size {
126 let padding_size = target_size - packet.data.len();
127 self.stats.bytes_padded += padding_size as u64;
128
129 let mut padded = packet.data.to_vec();
130 padded.resize(target_size, 0);
131 Some(Bytes::from(padded))
132 } else {
133 Some(packet.data)
134 }
135 } else if self.dummy_enabled {
136 self.stats.dummy_packets_sent += 1;
138 Some(self.generate_dummy_packet(1200))
139 } else {
140 None
141 }
142 }
143
144 async fn constant_packet_rate_shaping(&mut self, packets_per_sec: u64) -> Option<Bytes> {
146 let target_interval = Duration::from_secs_f64(1.0 / packets_per_sec as f64);
147 let elapsed = self.last_send.elapsed();
148
149 if elapsed < target_interval {
150 tokio::time::sleep(target_interval - elapsed).await;
151 }
152
153 self.last_send = Instant::now();
154
155 if let Some(packet) = self.queue.pop_front() {
157 self.stats.packets_shaped += 1;
158 Some(packet.data)
159 } else if self.dummy_enabled {
160 self.stats.dummy_packets_sent += 1;
161 Some(self.generate_dummy_packet(self.random_packet_size()))
162 } else {
163 None
164 }
165 }
166
167 async fn size_distribution_shaping(&mut self) -> Option<Bytes> {
169 if let Some(mut packet) = self.queue.pop_front() {
170 self.stats.packets_shaped += 1;
171
172 let target_size = self.round_to_distribution(packet.data.len());
174
175 if packet.data.len() < target_size {
176 let mut padded = packet.data.to_vec();
177 padded.resize(target_size, 0);
178 self.stats.bytes_padded += (target_size - packet.data.len()) as u64;
179 Some(Bytes::from(padded))
180 } else {
181 Some(packet.data)
182 }
183 } else {
184 None
185 }
186 }
187
188 async fn protocol_mimic_shaping(&mut self) -> Option<Bytes> {
190 self.size_distribution_shaping().await
192 }
193
194 fn generate_dummy_packet(&self, size: usize) -> Bytes {
196 let mut rng = rand::thread_rng();
197 let data: Vec<u8> = (0..size).map(|_| rng.gen()).collect();
198 Bytes::from(data)
199 }
200
201 fn random_packet_size(&self) -> usize {
203 let mut rng = rand::thread_rng();
204 let r: f64 = rng.gen();
205
206 if r < 0.1 {
207 rng.gen_range(64..256)
209 } else if r < 0.7 {
210 rng.gen_range(256..1200)
212 } else {
213 rng.gen_range(1200..1500)
215 }
216 }
217
218 fn round_to_distribution(&self, size: usize) -> usize {
220 const BUCKETS: &[usize] = &[64, 128, 256, 512, 1024, 1200, 1400];
222
223 for &bucket in BUCKETS {
224 if size <= bucket {
225 return bucket;
226 }
227 }
228
229 1500 }
231
232 pub fn set_dummy_traffic(&mut self, enabled: bool) {
234 self.dummy_enabled = enabled;
235 }
236
237 pub fn set_strategy(&mut self, strategy: ShapingStrategy) {
239 self.strategy = strategy;
240 }
241
242 pub fn stats(&self) -> &ShapingStats {
244 &self.stats
245 }
246
247 pub fn clear_queue(&mut self) {
249 self.queue.clear();
250 }
251
252 pub fn queue_len(&self) -> usize {
254 self.queue.len()
255 }
256}
257
258pub struct TrafficMixer {
260 queues: Vec<VecDeque<Bytes>>,
262 weights: Vec<u8>,
264}
265
266impl TrafficMixer {
267 pub fn new(priority_levels: usize) -> Self {
269 let mut queues = Vec::new();
270 let mut weights = Vec::new();
271
272 for i in 0..priority_levels {
273 queues.push(VecDeque::new());
274 weights.push((priority_levels - i) as u8);
276 }
277
278 Self { queues, weights }
279 }
280
281 pub fn enqueue(&mut self, data: Bytes, priority: u8) {
283 let idx = priority as usize % self.queues.len();
284 self.queues[idx].push_back(data);
285 }
286
287 pub fn next_packet(&mut self) -> Option<Bytes> {
289 let mut rng = rand::thread_rng();
291 let total_weight: u32 = self.weights.iter().map(|&w| w as u32).sum();
292 let mut choice: u32 = rng.gen_range(0..total_weight);
293
294 for (idx, &weight) in self.weights.iter().enumerate() {
295 if choice < weight as u32 {
296 return self.queues[idx].pop_front();
297 }
298 choice -= weight as u32;
299 }
300
301 None
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308
309 #[test]
310 fn test_packet_shaper_enqueue() {
311 let mut shaper = PacketShaper::new(ShapingStrategy::None);
312
313 shaper.enqueue(Bytes::from(vec![1, 2, 3]), 0);
314 shaper.enqueue(Bytes::from(vec![4, 5, 6]), 1);
315
316 assert_eq!(shaper.queue_len(), 2);
317 }
318
319 #[tokio::test]
320 async fn test_no_shaping() {
321 let mut shaper = PacketShaper::new(ShapingStrategy::None);
322
323 shaper.enqueue(Bytes::from(vec![1, 2, 3]), 0);
324
325 let packet = shaper.next_packet().await.unwrap();
326 assert_eq!(packet.as_ref(), &[1, 2, 3]);
327 }
328
329 #[test]
330 fn test_size_rounding() {
331 let shaper = PacketShaper::new(ShapingStrategy::SizeDistribution);
332
333 assert_eq!(shaper.round_to_distribution(100), 128);
334 assert_eq!(shaper.round_to_distribution(500), 512);
335 assert_eq!(shaper.round_to_distribution(1300), 1400);
336 }
337
338 #[test]
339 fn test_traffic_mixer() {
340 let mut mixer = TrafficMixer::new(3);
341
342 mixer.enqueue(Bytes::from(vec![1]), 0);
343 mixer.enqueue(Bytes::from(vec![2]), 1);
344 mixer.enqueue(Bytes::from(vec![3]), 2);
345
346 assert!(mixer.next_packet().is_some());
348 }
349
350 #[tokio::test]
351 async fn test_dummy_packet_generation() {
352 let mut shaper = PacketShaper::new(ShapingStrategy::ConstantPacketRate { packets_per_sec: 10 });
353 shaper.set_dummy_traffic(true);
354
355 let packet = shaper.next_packet().await;
357 assert!(packet.is_some());
358 assert_eq!(shaper.stats().dummy_packets_sent, 1);
359 }
360}