1use crate::onion::{MixMessage, MixMessageType, MixNode, TrafficAnalysisResistance};
12use crate::types::{MessagePriority, NetworkError, NetworkMessage};
13use base64::{engine::general_purpose, Engine as _};
14use rand::{thread_rng, Rng, RngCore};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18use tokio::sync::{Mutex, RwLock};
19use tokio::time::{interval, sleep};
20use tracing::{info, warn};
21
22pub const STANDARD_MESSAGE_SIZES: [usize; 8] = [
24 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, ];
33
34pub const DEFAULT_MESSAGE_SIZE: usize = 4096;
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct TrafficObfuscationConfig {
40 pub enable_size_normalization: bool,
42 pub standard_message_size: usize,
44 pub enable_dummy_traffic: bool,
46 pub dummy_traffic_ratio: f64,
48 pub enable_traffic_shaping: bool,
50 pub traffic_delay_range: (u64, u64),
52 pub enable_mix_batching: bool,
54 pub mix_batch_size: usize,
56 pub mix_batch_timeout: Duration,
58 pub enable_protocol_obfuscation: bool,
60 pub obfuscation_patterns: Vec<ObfuscationPattern>,
62 pub enable_burst_prevention: bool,
64 pub max_burst_size: usize,
66 pub burst_prevention_delay: u64,
68}
69
70impl Default for TrafficObfuscationConfig {
71 fn default() -> Self {
72 Self {
73 enable_size_normalization: true,
74 standard_message_size: DEFAULT_MESSAGE_SIZE,
75 enable_dummy_traffic: true,
76 dummy_traffic_ratio: 0.15, enable_traffic_shaping: true,
78 traffic_delay_range: (10, 100), enable_mix_batching: true,
80 mix_batch_size: 50,
81 mix_batch_timeout: Duration::from_millis(500),
82 enable_protocol_obfuscation: true,
83 obfuscation_patterns: vec![
84 ObfuscationPattern::Http,
85 ObfuscationPattern::Https,
86 ObfuscationPattern::WebSocket,
87 ObfuscationPattern::Custom(vec![0x00, 0x01, 0x02, 0x03]),
88 ],
89 enable_burst_prevention: true,
90 max_burst_size: 100,
91 burst_prevention_delay: 50,
92 }
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum ObfuscationPattern {
99 Http,
101 Https,
103 WebSocket,
105 Dns,
107 Custom(Vec<u8>),
109}
110
111pub struct TrafficObfuscator {
113 config: Arc<RwLock<TrafficObfuscationConfig>>,
115 mix_node: Arc<Mutex<MixNode>>,
117 traffic_resistance: Arc<TrafficAnalysisResistance>,
119 dummy_generator: Arc<DummyTrafficGenerator>,
121 traffic_shaper: Arc<Mutex<TrafficShaper>>,
123 protocol_obfuscator: Arc<ProtocolObfuscator>,
125 stats: Arc<RwLock<ObfuscationStats>>,
127 shutdown_tx: tokio::sync::broadcast::Sender<()>,
129}
130
131#[derive(Debug, Clone, Default)]
133pub struct ObfuscationStats {
134 pub total_messages: u64,
136 pub dummy_messages: u64,
138 pub normalized_messages: u64,
140 pub batches_processed: u64,
142 pub avg_batch_size: f64,
144 pub total_padding_bytes: u64,
146 pub protocol_obfuscations: u64,
148 pub bursts_prevented: u64,
150}
151
152unsafe impl Send for TrafficObfuscator {}
154unsafe impl Sync for TrafficObfuscator {}
155
156impl TrafficObfuscator {
157 pub fn new(config: TrafficObfuscationConfig) -> Self {
159 let (shutdown_tx, _shutdown_rx) = tokio::sync::broadcast::channel(1);
160
161 let mix_config = crate::onion::MixConfig {
162 batch_size: config.mix_batch_size,
163 batch_timeout: config.mix_batch_timeout,
164 target_rate: 50.0,
165 dummy_probability: config.dummy_traffic_ratio,
166 timing_obfuscation: config.enable_traffic_shaping,
167 };
168
169 Self {
170 config: Arc::new(RwLock::new(config.clone())),
171 mix_node: Arc::new(Mutex::new(MixNode::with_config(
172 vec![0u8; 32], mix_config,
174 ))),
175 traffic_resistance: Arc::new(TrafficAnalysisResistance::new()),
176 dummy_generator: Arc::new(DummyTrafficGenerator::new(config.dummy_traffic_ratio)),
177 traffic_shaper: Arc::new(Mutex::new(TrafficShaper::new(config.traffic_delay_range))),
178 protocol_obfuscator: Arc::new(ProtocolObfuscator::new(config.obfuscation_patterns)),
179 stats: Arc::new(RwLock::new(ObfuscationStats::default())),
180 shutdown_tx,
181 }
182 }
183
184 pub async fn start(&self) {
186 info!("Starting traffic obfuscator");
187
188 if self.config.read().await.enable_dummy_traffic {
190 self.start_dummy_traffic_generation().await;
191 }
192
193 if self.config.read().await.enable_mix_batching {
195 self.start_batch_flushing().await;
196 }
197 }
198
199 pub async fn stop(&self) {
201 info!("Stopping traffic obfuscator");
202 let _ = self.shutdown_tx.send(());
203 }
204
205 pub async fn obfuscate_message(
207 &self,
208 mut message: NetworkMessage,
209 ) -> Result<Vec<u8>, NetworkError> {
210 let config = self.config.read().await;
211
212 {
214 let mut stats = self.stats.write().await;
215 stats.total_messages += 1;
216 }
217
218 if config.enable_size_normalization {
220 message = self.normalize_message_size(message).await?;
221 }
222
223 if config.enable_traffic_shaping {
225 self.apply_traffic_shaping().await?;
226 }
227
228 let mix_message = self.to_mix_message(message).await?;
230
231 if config.enable_mix_batching {
233 self.mix_node
234 .lock()
235 .await
236 .add_message(mix_message)
237 .await
238 .map_err(|e| NetworkError::Internal(format!("Mix batching failed: {}", e)))?;
239
240 return Ok(Vec::new());
242 }
243
244 let serialized = bincode::serialize(&mix_message)
246 .map_err(|e| NetworkError::Internal(format!("Serialization failed: {}", e)))?;
247
248 if config.enable_protocol_obfuscation {
249 Ok(self.protocol_obfuscator.obfuscate(serialized).await?)
250 } else {
251 Ok(serialized)
252 }
253 }
254
255 pub async fn process_batch(&self) -> Result<Vec<Vec<u8>>, NetworkError> {
257 let config = self.config.read().await;
258 let mut mix_node = self.mix_node.lock().await;
259
260 let batch = mix_node
262 .flush_batch()
263 .await
264 .map_err(|e| NetworkError::Internal(format!("Batch flush failed: {}", e)))?;
265
266 if batch.is_empty() {
267 return Ok(Vec::new());
268 }
269
270 {
272 let mut stats = self.stats.write().await;
273 stats.batches_processed += 1;
274 stats.avg_batch_size = ((stats.avg_batch_size * (stats.batches_processed - 1) as f64)
275 + batch.len() as f64)
276 / stats.batches_processed as f64;
277 }
278
279 let mut batch_messages = batch;
281 self.traffic_resistance
282 .apply_resistance(&mut batch_messages)
283 .await
284 .map_err(|e| NetworkError::Internal(format!("Traffic resistance failed: {}", e)))?;
285
286 let mut obfuscated_messages = Vec::new();
288 for msg in batch_messages {
289 let serialized = bincode::serialize(&msg)
290 .map_err(|e| NetworkError::Internal(format!("Serialization failed: {}", e)))?;
291
292 if config.enable_protocol_obfuscation {
293 obfuscated_messages.push(self.protocol_obfuscator.obfuscate(serialized).await?);
294 } else {
295 obfuscated_messages.push(serialized);
296 }
297 }
298
299 Ok(obfuscated_messages)
300 }
301
302 async fn normalize_message_size(
304 &self,
305 mut message: NetworkMessage,
306 ) -> Result<NetworkMessage, NetworkError> {
307 let config = self.config.read().await;
308 let target_size = config.standard_message_size;
309 let current_size = message.payload.len();
310
311 if current_size < target_size {
312 let padding_size = target_size - current_size;
314 let mut padding = vec![0u8; padding_size];
315 thread_rng().fill_bytes(&mut padding);
316 message.payload.extend(padding);
317
318 let mut stats = self.stats.write().await;
320 stats.normalized_messages += 1;
321 stats.total_padding_bytes += padding_size as u64;
322 } else if current_size > target_size {
323 let next_size = STANDARD_MESSAGE_SIZES
325 .iter()
326 .find(|&&size| size >= current_size)
327 .copied()
328 .unwrap_or_else(|| {
329 let largest = STANDARD_MESSAGE_SIZES.last().unwrap();
331 current_size.div_ceil(*largest) * largest
332 });
333
334 if next_size > current_size {
335 let padding_size = next_size - current_size;
336 let mut padding = vec![0u8; padding_size];
337 thread_rng().fill_bytes(&mut padding);
338 message.payload.extend(padding);
339
340 let mut stats = self.stats.write().await;
342 stats.normalized_messages += 1;
343 stats.total_padding_bytes += padding_size as u64;
344 }
345 }
346
347 Ok(message)
348 }
349
350 async fn apply_traffic_shaping(&self) -> Result<(), NetworkError> {
352 self.traffic_shaper.lock().await.apply_delay().await;
353 Ok(())
354 }
355
356 async fn to_mix_message(&self, message: NetworkMessage) -> Result<MixMessage, NetworkError> {
358 let content = bincode::serialize(&message)
359 .map_err(|e| NetworkError::Internal(format!("Serialization failed: {}", e)))?;
360
361 let priority = match message.priority {
362 MessagePriority::High => 2,
363 MessagePriority::Normal => 1,
364 MessagePriority::Low => 0,
365 };
366
367 Ok(MixMessage {
368 content,
369 priority,
370 timestamp: SystemTime::now()
371 .duration_since(UNIX_EPOCH)
372 .unwrap()
373 .as_millis() as u64,
374 message_type: MixMessageType::Real,
375 normalized_size: 0, })
377 }
378
379 async fn start_dummy_traffic_generation(&self) {
381 let dummy_generator = self.dummy_generator.clone();
382 let mix_node = self.mix_node.clone();
383 let stats = self.stats.clone();
384 let mut shutdown_rx = self.shutdown_tx.subscribe();
385
386 tokio::spawn(async move {
387 let mut interval = interval(Duration::from_millis(100));
388
389 loop {
390 tokio::select! {
391 _ = interval.tick() => {
392 if let Some(dummy_msg) = dummy_generator.generate().await {
393 if let Err(e) = mix_node.lock().await.add_message(dummy_msg).await {
394 warn!("Failed to add dummy message: {}", e);
395 } else {
396 let mut stats = stats.write().await;
397 stats.dummy_messages += 1;
398 }
399 }
400 }
401 _ = shutdown_rx.recv() => {
402 info!("Stopping dummy traffic generation");
403 break;
404 }
405 }
406 }
407 });
408 }
409
410 async fn start_batch_flushing(&self) {
412 let obfuscator = self.clone();
413 let mut shutdown_rx = self.shutdown_tx.subscribe();
414
415 tokio::spawn(async move {
416 let mut interval = interval(Duration::from_millis(100));
417
418 loop {
419 tokio::select! {
420 _ = interval.tick() => {
421 let should_flush = {
423 let mix_node = obfuscator.mix_node.lock().await;
424 mix_node.get_stats().buffer_size > 0
425 };
426
427 if should_flush {
428 if let Err(e) = obfuscator.process_batch().await {
429 warn!("Failed to process batch: {}", e);
430 }
431 }
432 }
433 _ = shutdown_rx.recv() => {
434 info!("Stopping batch flushing");
435 break;
436 }
437 }
438 }
439 });
440 }
441
442 pub async fn get_stats(&self) -> ObfuscationStats {
444 self.stats.read().await.clone()
445 }
446
447 pub async fn update_config(&self, config: TrafficObfuscationConfig) {
449 *self.config.write().await = config;
450 }
451}
452
453struct DummyTrafficGenerator {
455 ratio: f64,
457 message_counter: Arc<Mutex<u64>>,
459}
460
461unsafe impl Send for DummyTrafficGenerator {}
463unsafe impl Sync for DummyTrafficGenerator {}
464
465impl DummyTrafficGenerator {
466 fn new(ratio: f64) -> Self {
467 Self {
468 ratio: ratio.clamp(0.0, 1.0),
469 message_counter: Arc::new(Mutex::new(0)),
470 }
471 }
472
473 async fn generate(&self) -> Option<MixMessage> {
474 let mut counter = self.message_counter.lock().await;
475 *counter += 1;
476
477 if thread_rng().gen::<f64>() < self.ratio {
479 let size =
481 STANDARD_MESSAGE_SIZES[thread_rng().gen_range(0..STANDARD_MESSAGE_SIZES.len())];
482 let mut content = vec![0u8; size];
483 thread_rng().fill_bytes(&mut content);
484
485 Some(MixMessage {
486 content,
487 priority: 0,
488 timestamp: SystemTime::now()
489 .duration_since(UNIX_EPOCH)
490 .unwrap()
491 .as_millis() as u64,
492 message_type: MixMessageType::Dummy,
493 normalized_size: size,
494 })
495 } else {
496 None
497 }
498 }
499}
500
501struct TrafficShaper {
503 delay_range: (u64, u64),
505 last_message_time: Arc<Mutex<SystemTime>>,
507 burst_counter: Arc<Mutex<usize>>,
509 burst_reset_time: Arc<Mutex<SystemTime>>,
510}
511
512unsafe impl Send for TrafficShaper {}
514unsafe impl Sync for TrafficShaper {}
515
516impl TrafficShaper {
517 fn new(delay_range: (u64, u64)) -> Self {
518 Self {
519 delay_range,
520 last_message_time: Arc::new(Mutex::new(SystemTime::now())),
521 burst_counter: Arc::new(Mutex::new(0)),
522 burst_reset_time: Arc::new(Mutex::new(SystemTime::now())),
523 }
524 }
525
526 async fn apply_delay(&self) {
527 let delay_ms = thread_rng().gen_range(self.delay_range.0..=self.delay_range.1);
529
530 let mut burst_counter = self.burst_counter.lock().await;
532 let mut burst_reset_time = self.burst_reset_time.lock().await;
533
534 let now = SystemTime::now();
535 if now
536 .duration_since(*burst_reset_time)
537 .unwrap_or(Duration::ZERO)
538 > Duration::from_secs(1)
539 {
540 *burst_counter = 0;
542 *burst_reset_time = now;
543 }
544
545 *burst_counter += 1;
546 if *burst_counter > 100 {
547 sleep(Duration::from_millis(delay_ms * 2)).await;
549 } else {
550 sleep(Duration::from_millis(delay_ms)).await;
551 }
552
553 *self.last_message_time.lock().await = SystemTime::now();
555 }
556}
557
558struct ProtocolObfuscator {
560 patterns: Vec<ObfuscationPattern>,
562}
563
564unsafe impl Send for ProtocolObfuscator {}
566unsafe impl Sync for ProtocolObfuscator {}
567
568impl ProtocolObfuscator {
569 fn new(patterns: Vec<ObfuscationPattern>) -> Self {
570 Self { patterns }
571 }
572
573 async fn obfuscate(&self, data: Vec<u8>) -> Result<Vec<u8>, NetworkError> {
574 let pattern = &self.patterns[thread_rng().gen_range(0..self.patterns.len())];
576
577 match pattern {
578 ObfuscationPattern::Http => self.obfuscate_as_http(data),
579 ObfuscationPattern::Https => self.obfuscate_as_https(data),
580 ObfuscationPattern::WebSocket => self.obfuscate_as_websocket(data),
581 ObfuscationPattern::Dns => self.obfuscate_as_dns(data),
582 ObfuscationPattern::Custom(header) => self.obfuscate_with_custom(data, header),
583 }
584 }
585
586 fn obfuscate_as_http(&self, data: Vec<u8>) -> Result<Vec<u8>, NetworkError> {
587 let encoded = general_purpose::STANDARD.encode(&data);
589 let http_request = format!(
590 "POST /api/v1/data HTTP/1.1\r\n\
591 Host: example.com\r\n\
592 User-Agent: Mozilla/5.0\r\n\
593 Content-Type: application/octet-stream\r\n\
594 Content-Length: {}\r\n\
595 X-Request-ID: {}\r\n\
596 \r\n\
597 {}",
598 encoded.len(),
599 uuid::Uuid::new_v4(),
600 encoded
601 );
602
603 Ok(http_request.into_bytes())
604 }
605
606 fn obfuscate_as_https(&self, data: Vec<u8>) -> Result<Vec<u8>, NetworkError> {
607 let mut obfuscated = Vec::new();
609
610 obfuscated.push(0x17); obfuscated.push(0x03); obfuscated.push(0x03);
614 obfuscated.extend_from_slice(&(data.len() as u16).to_be_bytes());
615
616 obfuscated.extend_from_slice(&data);
618
619 Ok(obfuscated)
620 }
621
622 fn obfuscate_as_websocket(&self, data: Vec<u8>) -> Result<Vec<u8>, NetworkError> {
623 let mut frame = Vec::new();
625
626 frame.push(0x82);
628
629 if data.len() < 126 {
631 frame.push(data.len() as u8 | 0x80); } else if data.len() < 65536 {
633 frame.push(126 | 0x80);
634 frame.extend_from_slice(&(data.len() as u16).to_be_bytes());
635 } else {
636 frame.push(127 | 0x80);
637 frame.extend_from_slice(&(data.len() as u64).to_be_bytes());
638 }
639
640 let mut mask = [0u8; 4];
642 thread_rng().fill_bytes(&mut mask);
643 frame.extend_from_slice(&mask);
644
645 for (i, &byte) in data.iter().enumerate() {
647 frame.push(byte ^ mask[i % 4]);
648 }
649
650 Ok(frame)
651 }
652
653 fn obfuscate_as_dns(&self, data: Vec<u8>) -> Result<Vec<u8>, NetworkError> {
654 let encoded = general_purpose::STANDARD
656 .encode(&data)
657 .chars()
658 .filter(|c| c.is_alphanumeric())
659 .collect::<String>();
660
661 let labels: Vec<String> = encoded
663 .chars()
664 .collect::<Vec<char>>()
665 .chunks(63)
666 .map(|chunk| chunk.iter().collect())
667 .collect();
668
669 let mut dns_query = Vec::new();
671
672 dns_query.extend_from_slice(&thread_rng().next_u32().to_be_bytes()[..2]); dns_query.extend_from_slice(&[0x01, 0x00]); dns_query.extend_from_slice(&[0x00, 0x01]); dns_query.extend_from_slice(&[0x00, 0x00]); dns_query.extend_from_slice(&[0x00, 0x00]); dns_query.extend_from_slice(&[0x00, 0x00]); for label in labels.iter().take(4) {
682 dns_query.push(label.len() as u8);
684 dns_query.extend_from_slice(label.as_bytes());
685 }
686 dns_query.push(0); dns_query.extend_from_slice(&[0x00, 0x01]); dns_query.extend_from_slice(&[0x00, 0x01]); Ok(dns_query)
693 }
694
695 fn obfuscate_with_custom(
696 &self,
697 mut data: Vec<u8>,
698 header: &[u8],
699 ) -> Result<Vec<u8>, NetworkError> {
700 let mut obfuscated = header.to_vec();
702 obfuscated.append(&mut data);
703 Ok(obfuscated)
704 }
705}
706
707impl Clone for TrafficObfuscator {
708 fn clone(&self) -> Self {
709 Self {
710 config: self.config.clone(),
711 mix_node: self.mix_node.clone(),
712 traffic_resistance: self.traffic_resistance.clone(),
713 dummy_generator: self.dummy_generator.clone(),
714 traffic_shaper: self.traffic_shaper.clone(),
715 protocol_obfuscator: self.protocol_obfuscator.clone(),
716 stats: self.stats.clone(),
717 shutdown_tx: self.shutdown_tx.clone(),
718 }
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 #[tokio::test]
727 async fn test_message_normalization() {
728 let config = TrafficObfuscationConfig {
729 enable_size_normalization: true,
730 standard_message_size: 4096,
731 ..Default::default()
732 };
733
734 let obfuscator = TrafficObfuscator::new(config);
735
736 let small_msg = NetworkMessage {
738 id: "test1".to_string(),
739 source: vec![1, 2, 3],
740 destination: vec![4, 5, 6],
741 payload: vec![0u8; 100], priority: MessagePriority::Normal,
743 ttl: Duration::from_secs(60),
744 };
745
746 let normalized = obfuscator.normalize_message_size(small_msg).await.unwrap();
747 assert_eq!(normalized.payload.len(), 4096);
748
749 let large_msg = NetworkMessage {
751 id: "test2".to_string(),
752 source: vec![1, 2, 3],
753 destination: vec![4, 5, 6],
754 payload: vec![0u8; 5000], priority: MessagePriority::Normal,
756 ttl: Duration::from_secs(60),
757 };
758
759 let normalized = obfuscator.normalize_message_size(large_msg).await.unwrap();
760 assert_eq!(normalized.payload.len(), 8192); }
762
763 #[tokio::test]
764 async fn test_dummy_traffic_generation() {
765 let generator = DummyTrafficGenerator::new(0.5); let mut dummy_count = 0;
768 for _ in 0..100 {
769 if generator.generate().await.is_some() {
770 dummy_count += 1;
771 }
772 }
773
774 assert!(dummy_count > 30 && dummy_count < 70);
776 }
777
778 #[tokio::test]
779 async fn test_protocol_obfuscation() {
780 let obfuscator = ProtocolObfuscator::new(vec![
781 ObfuscationPattern::Http,
782 ObfuscationPattern::Https,
783 ObfuscationPattern::WebSocket,
784 ObfuscationPattern::Dns,
785 ]);
786
787 let data = vec![1, 2, 3, 4, 5];
788
789 let http_result = obfuscator.obfuscate_as_http(data.clone()).unwrap();
791 let http_str = String::from_utf8_lossy(&http_result);
792 assert!(http_str.contains("HTTP/1.1"));
793 assert!(http_str.contains("Content-Type: application/octet-stream"));
794
795 let https_result = obfuscator.obfuscate_as_https(data.clone()).unwrap();
797 assert_eq!(https_result[0], 0x17); assert_eq!(https_result[1], 0x03); assert_eq!(https_result[2], 0x03);
800
801 let ws_result = obfuscator.obfuscate_as_websocket(data.clone()).unwrap();
803 assert_eq!(ws_result[0], 0x82); let dns_result = obfuscator.obfuscate_as_dns(data).unwrap();
807 assert!(dns_result.len() > 12); }
809
810 #[tokio::test]
811 async fn test_traffic_shaping() {
812 let shaper = TrafficShaper::new((10, 50));
813
814 let start = SystemTime::now();
815 shaper.apply_delay().await;
816 let elapsed = start.elapsed().unwrap();
817
818 assert!(elapsed >= Duration::from_millis(10));
820 assert!(elapsed <= Duration::from_millis(60)); }
822}