1use qudag_crypto::kem::{PublicKey as KEMPublicKey, SecretKey as KEMSecretKey};
2use qudag_crypto::ml_kem::MlKem768;
3use rand::{thread_rng, Rng, RngCore};
4use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
5use ring::rand::{SecureRandom, SystemRandom};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10use thiserror::Error;
11use tokio::sync::Mutex as TokioMutex;
12
13#[derive(Error, Debug)]
15pub enum OnionError {
16 #[error("layer encryption failed: {0}")]
18 EncryptionError(String),
19
20 #[error("layer decryption failed: {0}")]
22 DecryptionError(String),
23
24 #[error("invalid layer format: {0}")]
26 InvalidFormat(String),
27
28 #[error("ML-KEM error: {0}")]
30 MLKEMError(String),
31
32 #[error("RNG error: {0}")]
34 RngError(String),
35
36 #[error("route construction failed: {0}")]
38 RouteError(String),
39
40 #[error("timing constraint violated: {0}")]
42 TimingError(String),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct OnionLayer {
48 pub next_hop: Vec<u8>,
50 pub payload: Vec<u8>,
52 pub metadata: Vec<u8>,
54 pub kem_ciphertext: Vec<u8>,
56 pub nonce: [u8; 12],
58 pub auth_tag: Vec<u8>,
60 pub timestamp: u64,
62 pub padding: Vec<u8>,
64}
65
66impl OnionLayer {
67 pub fn new(next_hop: Vec<u8>, payload: Vec<u8>, metadata: Vec<u8>) -> Self {
69 let rng = SystemRandom::new();
70 let mut nonce = [0u8; 12];
71 rng.fill(&mut nonce).expect("RNG failure");
72
73 let timestamp = SystemTime::now()
74 .duration_since(UNIX_EPOCH)
75 .unwrap()
76 .as_millis() as u64;
77
78 let mut padding = vec![0u8; thread_rng().next_u32() as usize % 256];
80 thread_rng().fill_bytes(&mut padding);
81
82 Self {
83 next_hop,
84 payload,
85 metadata,
86 kem_ciphertext: Vec::new(),
87 nonce,
88 auth_tag: Vec::new(),
89 timestamp,
90 padding,
91 }
92 }
93
94 pub fn validate(&self) -> Result<(), OnionError> {
96 if self.next_hop.is_empty() {
97 return Err(OnionError::InvalidFormat("empty next hop key".into()));
98 }
99 if self.payload.is_empty() {
100 return Err(OnionError::InvalidFormat("empty payload".into()));
101 }
102 if self.kem_ciphertext.is_empty() {
103 return Err(OnionError::InvalidFormat("missing KEM ciphertext".into()));
104 }
105
106 let now = SystemTime::now()
108 .duration_since(UNIX_EPOCH)
109 .unwrap()
110 .as_millis() as u64;
111
112 if now.saturating_sub(self.timestamp) > 300_000 {
113 return Err(OnionError::TimingError("layer too old".into()));
115 }
116
117 Ok(())
118 }
119
120 pub fn total_size(&self) -> usize {
122 self.next_hop.len()
123 + self.payload.len()
124 + self.metadata.len()
125 + self.kem_ciphertext.len()
126 + self.auth_tag.len()
127 + self.padding.len()
128 + 12
129 + 8 }
131
132 pub fn normalize_size(&mut self, target_size: usize) {
134 let current_size = self.total_size();
135 if current_size < target_size {
136 let padding_needed = target_size - current_size;
137 let mut additional_padding = vec![0u8; padding_needed];
138 thread_rng().fill_bytes(&mut additional_padding);
139 self.padding.extend(additional_padding);
140 }
141 }
142}
143
144pub trait OnionRouter: Send + Sync {
146 fn encrypt_layers(
148 &self,
149 message: Vec<u8>,
150 route: Vec<Vec<u8>>,
151 ) -> Result<Vec<OnionLayer>, OnionError>;
152
153 fn decrypt_layer(&self, layer: OnionLayer)
155 -> Result<(Vec<u8>, Option<OnionLayer>), OnionError>;
156
157 fn create_metadata(&self, route_info: Vec<u8>) -> Result<Vec<u8>, OnionError>;
159}
160
161pub struct MLKEMOnionRouter {
163 ml_kem_secret_key: KEMSecretKey,
165 ml_kem_public_key: KEMPublicKey,
167 rng: SystemRandom,
169 standard_layer_size: usize,
171 #[allow(dead_code)]
173 circuit_manager: Arc<TokioMutex<CircuitManager>>,
174 directory_client: Arc<DirectoryClient>,
176}
177
178impl MLKEMOnionRouter {
179 pub async fn new() -> Result<Self, OnionError> {
181 let (public_key, secret_key) = MlKem768::keygen()
183 .map_err(|e| OnionError::MLKEMError(format!("Key generation failed: {:?}", e)))?;
184
185 let circuit_manager = Arc::new(TokioMutex::new(CircuitManager::new()));
186 let directory_client = Arc::new(DirectoryClient::new());
187
188 Ok(Self {
189 ml_kem_secret_key: secret_key,
190 ml_kem_public_key: public_key,
191 rng: SystemRandom::new(),
192 standard_layer_size: 4096, circuit_manager,
194 directory_client,
195 })
196 }
197
198 pub async fn with_layer_size(layer_size: usize) -> Result<Self, OnionError> {
200 let mut router = Self::new().await?;
201 router.standard_layer_size = layer_size;
202 Ok(router)
203 }
204
205 pub fn get_public_key(&self) -> &KEMPublicKey {
207 &self.ml_kem_public_key
208 }
209
210 fn derive_symmetric_key(&self, shared_secret: &[u8]) -> Result<[u8; 32], OnionError> {
212 use ring::hkdf;
213
214 let salt = ring::hkdf::Salt::new(hkdf::HKDF_SHA256, b"QuDAG-Onion-v1");
215 let prk = salt.extract(shared_secret);
216
217 let mut key = [0u8; 32];
218 let info = [&b"symmetric-key"[..]];
219 prk.expand(&info[..], hkdf::HKDF_SHA256)
220 .map_err(|_| OnionError::EncryptionError("Key derivation failed".into()))?
221 .fill(&mut key)
222 .map_err(|_| OnionError::EncryptionError("Key derivation failed".into()))?;
223
224 Ok(key)
225 }
226
227 fn encrypt_aead(
229 &self,
230 key: &[u8; 32],
231 nonce: &[u8; 12],
232 data: &[u8],
233 ) -> Result<Vec<u8>, OnionError> {
234 let unbound_key = UnboundKey::new(&CHACHA20_POLY1305, key)
235 .map_err(|e| OnionError::EncryptionError(e.to_string()))?;
236 let sealing_key = LessSafeKey::new(unbound_key);
237
238 let mut encrypted_data = data.to_vec();
239 sealing_key
240 .seal_in_place_append_tag(
241 Nonce::assume_unique_for_key(*nonce),
242 Aad::empty(),
243 &mut encrypted_data,
244 )
245 .map_err(|e| OnionError::EncryptionError(e.to_string()))?;
246
247 Ok(encrypted_data)
248 }
249
250 fn decrypt_aead(
252 &self,
253 key: &[u8; 32],
254 nonce: &[u8; 12],
255 encrypted_data: &mut [u8],
256 ) -> Result<Vec<u8>, OnionError> {
257 let unbound_key = UnboundKey::new(&CHACHA20_POLY1305, key)
258 .map_err(|e| OnionError::DecryptionError(e.to_string()))?;
259 let opening_key = LessSafeKey::new(unbound_key);
260
261 let decrypted = opening_key
262 .open_in_place(
263 Nonce::assume_unique_for_key(*nonce),
264 Aad::empty(),
265 encrypted_data,
266 )
267 .map_err(|e| OnionError::DecryptionError(e.to_string()))?;
268
269 Ok(decrypted.to_vec())
270 }
271
272 #[allow(dead_code)]
274 async fn add_timing_obfuscation(&self) {
275 let delay_ms = (thread_rng().next_u32() % 90) + 10;
277 tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
278 }
279}
280
281impl MLKEMOnionRouter {
282 pub async fn encrypt_layers(
284 &self,
285 message: Vec<u8>,
286 route: Vec<Vec<u8>>,
287 ) -> Result<Vec<OnionLayer>, OnionError> {
288 if route.is_empty() {
289 return Err(OnionError::RouteError("empty route".into()));
290 }
291
292 let mut layers = Vec::new();
293 let mut current_payload = message;
294
295 for (i, _hop_pubkey) in route.iter().rev().enumerate() {
297 let mut nonce = [0u8; 12];
299 self.rng
300 .fill(&mut nonce)
301 .map_err(|e| OnionError::RngError(e.to_string()))?;
302
303 let hop_public_key = self
305 .directory_client
306 .get_public_key(&route[route.len() - i - 1])
307 .await
308 .map_err(|e| OnionError::RouteError(format!("Failed to get public key: {}", e)))?;
309
310 let (kem_ciphertext, shared_secret) = MlKem768::encapsulate(&hop_public_key)
312 .map_err(|e| OnionError::MLKEMError(format!("Encapsulation failed: {:?}", e)))?;
313
314 let symmetric_key = self.derive_symmetric_key(shared_secret.as_bytes())?;
316
317 let circuit_id = thread_rng().next_u64();
319 let hop_info = HopMetadata {
320 circuit_id,
321 hop_number: i as u8,
322 next_hop: if i == 0 {
323 None
324 } else {
325 Some(route[route.len() - i].clone())
326 },
327 flags: LayerFlags::default(),
328 };
329 let metadata = self.create_metadata(
330 bincode::serialize(&hop_info)
331 .map_err(|e| OnionError::InvalidFormat(e.to_string()))?,
332 )?;
333
334 let next_hop = if i == 0 {
336 Vec::new() } else {
338 route[route.len() - i].clone()
339 };
340
341 let mut layer = OnionLayer::new(next_hop, current_payload.clone(), metadata);
343 layer.kem_ciphertext = kem_ciphertext.as_bytes().to_vec();
344 layer.nonce = nonce;
345
346 let encrypted_payload = self.encrypt_aead(&symmetric_key, &nonce, ¤t_payload)?;
348 layer.payload = encrypted_payload;
349
350 layer.normalize_size(self.standard_layer_size);
352
353 layer.validate()?;
355
356 current_payload = bincode::serialize(&layer)
358 .map_err(|e| OnionError::EncryptionError(e.to_string()))?;
359
360 layers.push(layer);
361 }
362
363 layers.reverse();
365 Ok(layers)
366 }
367
368 pub fn decrypt_layer(
370 &self,
371 layer: OnionLayer,
372 ) -> Result<(Vec<u8>, Option<OnionLayer>), OnionError> {
373 layer.validate()?;
375
376 let kem_ciphertext = qudag_crypto::kem::Ciphertext::from_bytes(&layer.kem_ciphertext)
378 .map_err(|_| OnionError::MLKEMError("Invalid KEM ciphertext".into()))?;
379
380 let shared_secret = MlKem768::decapsulate(&self.ml_kem_secret_key, &kem_ciphertext)
381 .map_err(|e| OnionError::MLKEMError(format!("Decapsulation failed: {:?}", e)))?;
382
383 let symmetric_key = self.derive_symmetric_key(shared_secret.as_bytes())?;
385
386 let mut encrypted_payload = layer.payload.clone();
388 let decrypted_payload =
389 self.decrypt_aead(&symmetric_key, &layer.nonce, &mut encrypted_payload)?;
390
391 if !layer.next_hop.is_empty() {
393 match bincode::deserialize::<OnionLayer>(&decrypted_payload) {
394 Ok(next_layer) => Ok((decrypted_payload, Some(next_layer))),
395 Err(_) => {
396 Ok((decrypted_payload, None))
398 }
399 }
400 } else {
401 Ok((decrypted_payload, None))
403 }
404 }
405
406 fn create_metadata(&self, route_info: Vec<u8>) -> Result<Vec<u8>, OnionError> {
407 let timestamp = SystemTime::now()
409 .duration_since(UNIX_EPOCH)
410 .unwrap()
411 .as_millis() as u64;
412
413 let mut metadata = Vec::new();
414 metadata.extend_from_slice(×tamp.to_le_bytes());
415 metadata.extend_from_slice(&route_info);
416
417 let mut padding = vec![0u8; thread_rng().next_u32() as usize % 128];
419 thread_rng().fill_bytes(&mut padding);
420 metadata.extend(padding);
421
422 Ok(metadata)
423 }
424}
425
426impl OnionRouter for MLKEMOnionRouter {
427 fn encrypt_layers(
428 &self,
429 message: Vec<u8>,
430 route: Vec<Vec<u8>>,
431 ) -> Result<Vec<OnionLayer>, OnionError> {
432 tokio::task::block_in_place(|| {
434 tokio::runtime::Handle::current().block_on(self.encrypt_layers(message, route))
435 })
436 }
437
438 fn decrypt_layer(
439 &self,
440 layer: OnionLayer,
441 ) -> Result<(Vec<u8>, Option<OnionLayer>), OnionError> {
442 self.decrypt_layer(layer)
443 }
444
445 fn create_metadata(&self, route_info: Vec<u8>) -> Result<Vec<u8>, OnionError> {
446 self.create_metadata(route_info)
447 }
448}
449
450#[derive(Debug)]
452pub struct MixNode {
453 pub id: Vec<u8>,
455 config: MixConfig,
457 message_buffer: Vec<MixMessage>,
459 last_flush: SystemTime,
461 dummy_generator: DummyTrafficGenerator,
463 traffic_shaper: TrafficShaper,
465}
466
467#[derive(Debug, Clone)]
469pub struct MixConfig {
470 pub batch_size: usize,
472 pub batch_timeout: Duration,
474 pub target_rate: f64,
476 pub dummy_probability: f64,
478 pub timing_obfuscation: bool,
480}
481
482impl Default for MixConfig {
483 fn default() -> Self {
484 Self {
485 batch_size: 100,
486 batch_timeout: Duration::from_millis(500),
487 target_rate: 50.0, dummy_probability: 0.1, timing_obfuscation: true,
490 }
491 }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize)]
496pub struct MixMessage {
497 pub content: Vec<u8>,
499 pub priority: u8,
501 pub timestamp: u64,
503 pub message_type: MixMessageType,
505 pub normalized_size: usize,
507}
508
509#[derive(Debug, Clone, Serialize, Deserialize)]
511pub enum MixMessageType {
512 Real,
514 Dummy,
516 Heartbeat,
518}
519
520impl MixNode {
521 pub fn new(id: Vec<u8>) -> Self {
523 Self::with_config(id, MixConfig::default())
524 }
525
526 pub fn with_config(id: Vec<u8>, config: MixConfig) -> Self {
528 Self {
529 id,
530 config: config.clone(),
531 message_buffer: Vec::with_capacity(config.batch_size * 2),
532 last_flush: SystemTime::now(),
533 dummy_generator: DummyTrafficGenerator::new(config.dummy_probability),
534 traffic_shaper: TrafficShaper::new(config.target_rate),
535 }
536 }
537
538 pub async fn add_message(&mut self, mut message: MixMessage) -> Result<(), OnionError> {
540 message.normalized_size = self.normalize_message_size(&message);
542
543 self.message_buffer.push(message);
544
545 if self.should_flush() {
547 self.flush_batch().await?;
548 }
549
550 Ok(())
551 }
552
553 fn should_flush(&self) -> bool {
555 self.message_buffer.len() >= self.config.batch_size
556 || self.last_flush.elapsed().unwrap_or(Duration::ZERO) >= self.config.batch_timeout
557 }
558
559 pub async fn flush_batch(&mut self) -> Result<Vec<MixMessage>, OnionError> {
561 if self.message_buffer.is_empty() {
562 return Ok(Vec::new());
563 }
564
565 self.add_dummy_messages();
567
568 use rand::seq::SliceRandom;
570 self.message_buffer.shuffle(&mut thread_rng());
571
572 self.traffic_shaper.apply_shaping().await;
574
575 if self.config.timing_obfuscation {
577 self.apply_timing_obfuscation().await;
578 }
579
580 let batch = std::mem::take(&mut self.message_buffer);
582 self.last_flush = SystemTime::now();
583
584 Ok(batch)
585 }
586
587 fn add_dummy_messages(&mut self) {
589 while self.message_buffer.len() < self.config.batch_size {
590 if let Some(dummy_msg) = self.dummy_generator.generate_dummy() {
591 self.message_buffer.push(dummy_msg);
592 } else {
593 break; }
595 }
596 }
597
598 fn normalize_message_size(&self, message: &MixMessage) -> usize {
600 let standard_sizes = [512, 1024, 2048, 4096, 8192];
602 let content_size = message.content.len();
603
604 for &size in &standard_sizes {
606 if content_size <= size {
607 return size;
608 }
609 }
610
611 content_size.div_ceil(1024) * 1024
613 }
614
615 async fn apply_timing_obfuscation(&self) {
617 let delay_ms = (thread_rng().next_u32() % 100) + 50;
619 tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
620 }
621
622 pub fn get_stats(&self) -> MixNodeStats {
624 MixNodeStats {
625 buffer_size: self.message_buffer.len(),
626 last_flush_elapsed: self.last_flush.elapsed().unwrap_or(Duration::ZERO),
627 dummy_ratio: self.dummy_generator.get_dummy_ratio(),
628 target_rate: self.config.target_rate,
629 }
630 }
631}
632
633#[derive(Debug, Clone)]
635pub struct MixNodeStats {
636 pub buffer_size: usize,
638 pub last_flush_elapsed: Duration,
640 pub dummy_ratio: f64,
642 pub target_rate: f64,
644}
645
646#[derive(Debug)]
648struct DummyTrafficGenerator {
649 probability: f64,
651 dummy_count: usize,
653 total_count: usize,
655}
656
657impl DummyTrafficGenerator {
658 fn new(probability: f64) -> Self {
659 Self {
660 probability: probability.clamp(0.0, 1.0),
661 dummy_count: 0,
662 total_count: 0,
663 }
664 }
665
666 fn generate_dummy(&mut self) -> Option<MixMessage> {
668 self.total_count += 1;
669
670 if thread_rng().gen::<f64>() < self.probability {
671 self.dummy_count += 1;
672
673 let size = (thread_rng().next_u32() % 4096) + 256; let mut content = vec![0u8; size as usize];
676 thread_rng().fill_bytes(&mut content);
677
678 Some(MixMessage {
679 content,
680 priority: 0,
681 timestamp: SystemTime::now()
682 .duration_since(UNIX_EPOCH)
683 .unwrap()
684 .as_millis() as u64,
685 message_type: MixMessageType::Dummy,
686 normalized_size: 0, })
688 } else {
689 None
690 }
691 }
692
693 fn get_dummy_ratio(&self) -> f64 {
695 if self.total_count == 0 {
696 0.0
697 } else {
698 self.dummy_count as f64 / self.total_count as f64
699 }
700 }
701}
702
703#[derive(Debug)]
705#[allow(dead_code)]
706struct TrafficShaper {
707 target_rate: f64,
709 last_message_time: SystemTime,
711 message_interval: Duration,
713}
714
715impl TrafficShaper {
716 fn new(target_rate: f64) -> Self {
717 let message_interval = Duration::from_secs_f64(1.0 / target_rate.max(0.1));
718
719 Self {
720 target_rate,
721 last_message_time: SystemTime::now(),
722 message_interval,
723 }
724 }
725
726 async fn apply_shaping(&mut self) {
728 let now = SystemTime::now();
729 let elapsed = now
730 .duration_since(self.last_message_time)
731 .unwrap_or(Duration::ZERO);
732
733 if elapsed < self.message_interval {
734 let delay = self.message_interval - elapsed;
735 tokio::time::sleep(delay).await;
736 }
737
738 self.last_message_time = SystemTime::now();
739 }
740}
741
742pub struct MetadataProtector {
744 rng: SystemRandom,
746 config: MetadataConfig,
748}
749
750#[derive(Debug, Clone)]
752pub struct MetadataConfig {
753 pub anonymize_ip: bool,
755 pub obfuscate_timing: bool,
757 pub normalize_size: bool,
759 pub randomize_headers: bool,
761 pub timing_bucket_ms: u64,
763}
764
765impl Default for MetadataConfig {
766 fn default() -> Self {
767 Self {
768 anonymize_ip: true,
769 obfuscate_timing: true,
770 normalize_size: true,
771 randomize_headers: true,
772 timing_bucket_ms: 100, }
774 }
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
779pub struct ProtectedMetadata {
780 pub obfuscated_timestamp: u64,
782 pub random_headers: Vec<(String, Vec<u8>)>,
784 pub normalized_size: usize,
786 pub anonymous_ids: Vec<Vec<u8>>,
788 pub padding: Vec<u8>,
790}
791
792impl Default for MetadataProtector {
793 fn default() -> Self {
794 Self::new()
795 }
796}
797
798impl MetadataProtector {
799 pub fn new() -> Self {
801 Self::with_config(MetadataConfig::default())
802 }
803
804 pub fn with_config(config: MetadataConfig) -> Self {
806 Self {
807 rng: SystemRandom::new(),
808 config,
809 }
810 }
811
812 pub fn protect_metadata(
814 &self,
815 original_metadata: &[u8],
816 ) -> Result<ProtectedMetadata, OnionError> {
817 let timestamp = if self.config.obfuscate_timing {
818 self.obfuscate_timestamp()?
819 } else {
820 SystemTime::now()
821 .duration_since(UNIX_EPOCH)
822 .unwrap()
823 .as_millis() as u64
824 };
825
826 let random_headers = if self.config.randomize_headers {
827 self.generate_random_headers()?
828 } else {
829 Vec::new()
830 };
831
832 let normalized_size = if self.config.normalize_size {
833 self.normalize_packet_size(original_metadata.len())
834 } else {
835 original_metadata.len()
836 };
837
838 let anonymous_ids = self.generate_anonymous_ids()?;
839 let padding = self.generate_padding(normalized_size, original_metadata.len())?;
840
841 Ok(ProtectedMetadata {
842 obfuscated_timestamp: timestamp,
843 random_headers,
844 normalized_size,
845 anonymous_ids,
846 padding,
847 })
848 }
849
850 fn obfuscate_timestamp(&self) -> Result<u64, OnionError> {
852 let now = SystemTime::now()
853 .duration_since(UNIX_EPOCH)
854 .unwrap()
855 .as_millis() as u64;
856
857 let bucket_size = self.config.timing_bucket_ms;
859 let obfuscated = (now / bucket_size) * bucket_size;
860
861 let mut jitter_bytes = [0u8; 8];
863 self.rng
864 .fill(&mut jitter_bytes)
865 .map_err(|e| OnionError::RngError(e.to_string()))?;
866 let jitter = u64::from_le_bytes(jitter_bytes) % bucket_size;
867
868 Ok(obfuscated + jitter)
869 }
870
871 fn generate_random_headers(&self) -> Result<Vec<(String, Vec<u8>)>, OnionError> {
873 let header_names = [
874 "X-Request-ID",
875 "X-Correlation-ID",
876 "X-Session-ID",
877 "X-Trace-ID",
878 "X-Custom-Header",
879 "X-Client-ID",
880 ];
881
882 let mut headers = Vec::new();
883 let num_headers = (thread_rng().next_u32() % 4) + 2; for _ in 0..num_headers {
886 let name = header_names[thread_rng().next_u32() as usize % header_names.len()];
887 let mut value = vec![0u8; 16];
888 self.rng
889 .fill(&mut value)
890 .map_err(|e| OnionError::RngError(e.to_string()))?;
891 headers.push((name.to_string(), value));
892 }
893
894 Ok(headers)
895 }
896
897 fn normalize_packet_size(&self, original_size: usize) -> usize {
899 let standard_sizes = [512, 1024, 1536, 2048, 3072, 4096, 6144, 8192, 12288, 16384];
901
902 for &size in &standard_sizes {
904 if original_size <= size {
905 return size;
906 }
907 }
908
909 original_size.div_ceil(4096) * 4096
911 }
912
913 fn generate_anonymous_ids(&self) -> Result<Vec<Vec<u8>>, OnionError> {
915 let mut ids = Vec::new();
916 let num_ids = (thread_rng().next_u32() % 3) + 1; for _ in 0..num_ids {
919 let mut id = vec![0u8; 32]; self.rng
921 .fill(&mut id)
922 .map_err(|e| OnionError::RngError(e.to_string()))?;
923 ids.push(id);
924 }
925
926 Ok(ids)
927 }
928
929 fn generate_padding(
931 &self,
932 target_size: usize,
933 current_size: usize,
934 ) -> Result<Vec<u8>, OnionError> {
935 if target_size <= current_size {
936 return Ok(Vec::new());
937 }
938
939 let padding_size = target_size - current_size;
940 let mut padding = vec![0u8; padding_size];
941 self.rng
942 .fill(&mut padding)
943 .map_err(|e| OnionError::RngError(e.to_string()))?;
944
945 Ok(padding)
946 }
947
948 pub fn anonymize_ip(&self, original_ip: &str) -> Result<String, OnionError> {
950 if !self.config.anonymize_ip {
951 return Ok(original_ip.to_string());
952 }
953
954 let mut ip_bytes = [0u8; 4];
957 self.rng
958 .fill(&mut ip_bytes)
959 .map_err(|e| OnionError::RngError(e.to_string()))?;
960
961 ip_bytes[0] = 10; Ok(format!(
965 "{}.{}.{}.{}",
966 ip_bytes[0], ip_bytes[1], ip_bytes[2], ip_bytes[3]
967 ))
968 }
969
970 pub fn scrub_packet_headers(&self, packet: &mut Vec<u8>) -> Result<(), OnionError> {
972 let mut dummy_headers = vec![0u8; 20]; self.rng
981 .fill(&mut dummy_headers)
982 .map_err(|e| OnionError::RngError(e.to_string()))?;
983
984 let mut new_packet = dummy_headers;
986 new_packet.extend_from_slice(packet);
987 *packet = new_packet;
988
989 Ok(())
990 }
991}
992
993#[derive(Debug)]
995pub struct CircuitManager {
996 circuits: HashMap<u64, Circuit>,
998 creation_rate: f64,
1000 last_creation: Instant,
1002 max_circuits: usize,
1004 circuit_lifetime: Duration,
1006 rotation_interval: Duration,
1008}
1009
1010#[derive(Debug, Clone)]
1012pub struct Circuit {
1013 pub id: u64,
1015 pub hops: Vec<Vec<u8>>,
1017 pub state: CircuitState,
1019 pub created_at: Instant,
1021 pub last_activity: Instant,
1023 pub bandwidth_used: u64,
1025 pub quality_score: f64,
1027}
1028
1029#[derive(Debug, Clone, PartialEq)]
1031pub enum CircuitState {
1032 Building,
1034 Active,
1036 TearingDown,
1038 Closed,
1040 Failed(String),
1042}
1043
1044impl CircuitManager {
1045 pub fn new() -> Self {
1047 Self {
1048 circuits: HashMap::new(),
1049 creation_rate: 1.0, last_creation: Instant::now(),
1051 max_circuits: 100,
1052 circuit_lifetime: Duration::from_secs(600), rotation_interval: Duration::from_secs(300), }
1055 }
1056
1057 pub async fn build_circuit(
1059 &mut self,
1060 hops: usize,
1061 directory: &DirectoryClient,
1062 ) -> Result<u64, OnionError> {
1063 let elapsed = self.last_creation.elapsed().as_secs_f64();
1065 if elapsed < 1.0 / self.creation_rate {
1066 return Err(OnionError::RouteError(
1067 "Circuit creation rate limit exceeded".into(),
1068 ));
1069 }
1070
1071 if self.circuits.len() >= self.max_circuits {
1073 self.cleanup_inactive_circuits();
1075 if self.circuits.len() >= self.max_circuits {
1076 return Err(OnionError::RouteError("Maximum circuits reached".into()));
1077 }
1078 }
1079
1080 let nodes = directory.select_random_nodes(hops).await?;
1082
1083 let circuit_id = thread_rng().next_u64();
1085 let circuit = Circuit {
1086 id: circuit_id,
1087 hops: nodes,
1088 state: CircuitState::Building,
1089 created_at: Instant::now(),
1090 last_activity: Instant::now(),
1091 bandwidth_used: 0,
1092 quality_score: 1.0,
1093 };
1094
1095 self.circuits.insert(circuit_id, circuit);
1096 self.last_creation = Instant::now();
1097
1098 Ok(circuit_id)
1099 }
1100
1101 pub fn activate_circuit(&mut self, circuit_id: u64) -> Result<(), OnionError> {
1103 let circuit = self
1104 .circuits
1105 .get_mut(&circuit_id)
1106 .ok_or_else(|| OnionError::RouteError("Circuit not found".into()))?;
1107
1108 if circuit.state != CircuitState::Building {
1109 return Err(OnionError::RouteError(
1110 "Circuit not in building state".into(),
1111 ));
1112 }
1113
1114 circuit.state = CircuitState::Active;
1115 circuit.last_activity = Instant::now();
1116 Ok(())
1117 }
1118
1119 pub async fn teardown_circuit(&mut self, circuit_id: u64) -> Result<(), OnionError> {
1121 let circuit = self
1122 .circuits
1123 .get_mut(&circuit_id)
1124 .ok_or_else(|| OnionError::RouteError("Circuit not found".into()))?;
1125
1126 circuit.state = CircuitState::TearingDown;
1127
1128 circuit.state = CircuitState::Closed;
1131
1132 Ok(())
1133 }
1134
1135 pub fn get_active_circuit(&mut self) -> Option<&mut Circuit> {
1137 self.circuits
1139 .values_mut()
1140 .filter(|c| c.state == CircuitState::Active)
1141 .filter(|c| c.created_at.elapsed() < self.circuit_lifetime)
1142 .max_by(|a, b| a.quality_score.partial_cmp(&b.quality_score).unwrap())
1143 }
1144
1145 pub fn update_circuit_metrics(&mut self, circuit_id: u64, bytes_sent: u64, success: bool) {
1147 if let Some(circuit) = self.circuits.get_mut(&circuit_id) {
1148 circuit.last_activity = Instant::now();
1149 circuit.bandwidth_used += bytes_sent;
1150
1151 if success {
1153 circuit.quality_score = (circuit.quality_score * 0.95 + 1.0 * 0.05).min(1.0);
1154 } else {
1155 circuit.quality_score = (circuit.quality_score * 0.95).max(0.0);
1156 }
1157 }
1158 }
1159
1160 pub fn needs_rotation(&self) -> bool {
1162 self.circuits
1163 .values()
1164 .filter(|c| c.state == CircuitState::Active)
1165 .any(|c| c.created_at.elapsed() >= self.rotation_interval)
1166 }
1167
1168 pub fn cleanup_inactive_circuits(&mut self) {
1170 let _now = Instant::now();
1171 self.circuits.retain(|_, circuit| match circuit.state {
1172 CircuitState::Closed | CircuitState::Failed(_) => false,
1173 _ => circuit.created_at.elapsed() < self.circuit_lifetime * 2,
1174 });
1175 }
1176
1177 pub fn get_stats(&self) -> CircuitStats {
1179 let active_count = self
1180 .circuits
1181 .values()
1182 .filter(|c| c.state == CircuitState::Active)
1183 .count();
1184
1185 let total_bandwidth = self.circuits.values().map(|c| c.bandwidth_used).sum();
1186
1187 let avg_quality = if active_count > 0 {
1188 self.circuits
1189 .values()
1190 .filter(|c| c.state == CircuitState::Active)
1191 .map(|c| c.quality_score)
1192 .sum::<f64>()
1193 / active_count as f64
1194 } else {
1195 0.0
1196 };
1197
1198 CircuitStats {
1199 total_circuits: self.circuits.len(),
1200 active_circuits: active_count,
1201 total_bandwidth,
1202 average_quality: avg_quality,
1203 }
1204 }
1205}
1206
1207#[derive(Debug, Clone)]
1209pub struct CircuitStats {
1210 pub total_circuits: usize,
1212 pub active_circuits: usize,
1214 pub total_bandwidth: u64,
1216 pub average_quality: f64,
1218}
1219
1220#[derive(Debug)]
1222pub struct DirectoryClient {
1223 nodes: Arc<TokioMutex<HashMap<Vec<u8>, NodeInfo>>>,
1225 #[allow(dead_code)]
1227 directory_servers: Vec<String>,
1228 last_update: Arc<TokioMutex<Instant>>,
1230 update_interval: Duration,
1232}
1233
1234#[derive(Debug, Clone)]
1236pub struct NodeInfo {
1237 pub id: Vec<u8>,
1239 pub public_key: KEMPublicKey,
1241 pub address: String,
1243 pub bandwidth: u64,
1245 pub uptime: Duration,
1247 pub flags: NodeFlags,
1249 pub last_seen: Instant,
1251}
1252
1253#[derive(Debug, Clone, Default)]
1255pub struct NodeFlags {
1256 pub guard: bool,
1258 pub exit: bool,
1260 pub directory: bool,
1262 pub fast: bool,
1264 pub stable: bool,
1266}
1267
1268impl DirectoryClient {
1269 pub fn new() -> Self {
1271 Self {
1272 nodes: Arc::new(TokioMutex::new(HashMap::new())),
1273 directory_servers: vec![
1274 "dir1.qudag.net:9030".to_string(),
1275 "dir2.qudag.net:9030".to_string(),
1276 "dir3.qudag.net:9030".to_string(),
1277 ],
1278 last_update: Arc::new(TokioMutex::new(Instant::now())),
1279 update_interval: Duration::from_secs(3600), }
1281 }
1282
1283 pub async fn get_public_key(&self, node_id: &[u8]) -> Result<KEMPublicKey, String> {
1285 let nodes = self.nodes.lock().await;
1286 nodes
1287 .get(node_id)
1288 .map(|info| info.public_key.clone())
1289 .ok_or_else(|| "Node not found in directory".to_string())
1290 }
1291
1292 pub async fn select_random_nodes(&self, count: usize) -> Result<Vec<Vec<u8>>, OnionError> {
1294 self.update_directory_if_needed().await?;
1296
1297 let nodes = self.nodes.lock().await;
1298
1299 let active_nodes: Vec<_> = nodes
1301 .values()
1302 .filter(|n| n.last_seen.elapsed() < Duration::from_secs(3600))
1303 .filter(|n| n.flags.stable)
1304 .collect();
1305
1306 if active_nodes.len() < count {
1307 return Err(OnionError::RouteError("Not enough active nodes".into()));
1308 }
1309
1310 let mut selected = Vec::new();
1312 let mut available = active_nodes.clone();
1313
1314 for i in 0..count {
1315 if i == 0 {
1317 let guards: Vec<_> = available
1318 .iter()
1319 .filter(|n| n.flags.guard)
1320 .copied()
1321 .collect();
1322
1323 if !guards.is_empty() {
1324 available = guards;
1325 }
1326 }
1327
1328 if i == count - 1 {
1330 available.retain(|n| n.flags.exit);
1331 if available.is_empty() {
1332 return Err(OnionError::RouteError("No exit nodes available".into()));
1333 }
1334 }
1335
1336 let total_bandwidth: u64 = available.iter().map(|n| n.bandwidth).sum();
1338 let mut target = thread_rng().gen_range(0..total_bandwidth);
1339
1340 for (idx, node) in available.iter().enumerate() {
1341 if target < node.bandwidth {
1342 selected.push(node.public_key.as_bytes().to_vec());
1343 available.remove(idx);
1344 break;
1345 }
1346 target -= node.bandwidth;
1347 }
1348 }
1349
1350 Ok(selected)
1351 }
1352
1353 async fn update_directory_if_needed(&self) -> Result<(), OnionError> {
1355 let last_update = *self.last_update.lock().await;
1356
1357 if last_update.elapsed() < self.update_interval {
1358 return Ok(());
1359 }
1360
1361 self.populate_dummy_nodes().await;
1364
1365 *self.last_update.lock().await = Instant::now();
1366 Ok(())
1367 }
1368
1369 async fn populate_dummy_nodes(&self) {
1371 let mut nodes = self.nodes.lock().await;
1372
1373 for i in 0..20 {
1374 let (public_key, _) = MlKem768::keygen().unwrap();
1375 let node_id = vec![i as u8; 32];
1376
1377 let info = NodeInfo {
1378 id: node_id.clone(),
1379 public_key,
1380 address: format!("node{}.qudag.net:9001", i),
1381 bandwidth: 1_000_000 * (i as u64 + 1), uptime: Duration::from_secs(3600 * (i as u64 + 1)),
1383 flags: NodeFlags {
1384 guard: i < 5,
1385 exit: i >= 15,
1386 directory: i % 5 == 0,
1387 fast: i % 2 == 0,
1388 stable: true,
1389 },
1390 last_seen: Instant::now(),
1391 };
1392
1393 nodes.insert(node_id, info);
1394 }
1395 }
1396
1397 pub async fn measure_bandwidth(&self, node_id: &[u8]) -> Result<u64, OnionError> {
1399 let nodes = self.nodes.lock().await;
1402 nodes
1403 .get(node_id)
1404 .map(|info| info.bandwidth)
1405 .ok_or_else(|| OnionError::RouteError("Node not found".into()))
1406 }
1407
1408 pub async fn get_load_balancing_weights(&self) -> HashMap<Vec<u8>, f64> {
1410 let nodes = self.nodes.lock().await;
1411 let total_bandwidth: u64 = nodes.values().map(|n| n.bandwidth).sum();
1412
1413 nodes
1414 .iter()
1415 .map(|(id, info)| {
1416 let weight = info.bandwidth as f64 / total_bandwidth as f64;
1417 (id.clone(), weight)
1418 })
1419 .collect()
1420 }
1421}
1422
1423#[derive(Debug, Clone, Serialize, Deserialize)]
1425pub struct HopMetadata {
1426 pub circuit_id: u64,
1428 pub hop_number: u8,
1430 pub next_hop: Option<Vec<u8>>,
1432 pub flags: LayerFlags,
1434}
1435
1436#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1438pub struct LayerFlags {
1439 pub is_exit: bool,
1441 pub padding_enabled: bool,
1443 pub timing_obfuscation: bool,
1445 pub ack_required: bool,
1447}
1448
1449pub struct TrafficAnalysisResistance {
1451 config: TrafficAnalysisConfig,
1453 pattern_db: TrafficPatternDatabase,
1455}
1456
1457#[derive(Debug, Clone)]
1459pub struct TrafficAnalysisConfig {
1460 pub enable_pattern_mimicking: bool,
1462 pub enable_burst_obfuscation: bool,
1464 pub enable_flow_correlation_resistance: bool,
1466 pub min_inter_packet_delay: u64,
1468 pub max_inter_packet_delay: u64,
1470}
1471
1472impl Default for TrafficAnalysisConfig {
1473 fn default() -> Self {
1474 Self {
1475 enable_pattern_mimicking: true,
1476 enable_burst_obfuscation: true,
1477 enable_flow_correlation_resistance: true,
1478 min_inter_packet_delay: 10,
1479 max_inter_packet_delay: 100,
1480 }
1481 }
1482}
1483
1484#[derive(Debug)]
1486struct TrafficPatternDatabase {
1487 patterns: Vec<TrafficPattern>,
1489}
1490
1491#[derive(Debug, Clone)]
1493struct TrafficPattern {
1494 packet_sizes: Vec<usize>,
1496 inter_packet_delays: Vec<u64>,
1498 weight: f64,
1500}
1501
1502impl Default for TrafficAnalysisResistance {
1503 fn default() -> Self {
1504 Self::new()
1505 }
1506}
1507
1508impl TrafficAnalysisResistance {
1509 pub fn new() -> Self {
1511 Self::with_config(TrafficAnalysisConfig::default())
1512 }
1513
1514 pub fn with_config(config: TrafficAnalysisConfig) -> Self {
1516 Self {
1517 config,
1518 pattern_db: TrafficPatternDatabase::new(),
1519 }
1520 }
1521
1522 pub async fn apply_resistance(&self, messages: &mut [MixMessage]) -> Result<(), OnionError> {
1524 if self.config.enable_pattern_mimicking {
1525 self.apply_pattern_mimicking(messages).await?;
1526 }
1527
1528 if self.config.enable_burst_obfuscation {
1529 self.apply_burst_obfuscation(messages).await?;
1530 }
1531
1532 if self.config.enable_flow_correlation_resistance {
1533 self.apply_flow_correlation_resistance(messages).await?;
1534 }
1535
1536 Ok(())
1537 }
1538
1539 async fn apply_pattern_mimicking(&self, messages: &mut [MixMessage]) -> Result<(), OnionError> {
1541 let pattern = self.pattern_db.select_random_pattern();
1542
1543 for (i, message) in messages.iter_mut().enumerate() {
1545 if let Some(&target_size) = pattern.packet_sizes.get(i % pattern.packet_sizes.len()) {
1546 message.normalized_size = target_size;
1547
1548 if message.content.len() < target_size {
1550 let padding_size = target_size - message.content.len();
1551 let mut padding = vec![0u8; padding_size];
1552 thread_rng().fill_bytes(&mut padding);
1553 message.content.extend(padding);
1554 } else if message.content.len() > target_size {
1555 message.content.truncate(target_size);
1556 }
1557 }
1558 }
1559
1560 for (i, &delay) in pattern.inter_packet_delays.iter().enumerate() {
1562 if i > 0 && i <= messages.len() {
1563 tokio::time::sleep(Duration::from_millis(delay)).await;
1564 }
1565 }
1566
1567 Ok(())
1568 }
1569
1570 async fn apply_burst_obfuscation(
1572 &self,
1573 _messages: &mut [MixMessage],
1574 ) -> Result<(), OnionError> {
1575 let burst_delay = thread_rng().next_u64()
1577 % (self.config.max_inter_packet_delay - self.config.min_inter_packet_delay)
1578 + self.config.min_inter_packet_delay;
1579
1580 tokio::time::sleep(Duration::from_millis(burst_delay)).await;
1581 Ok(())
1582 }
1583
1584 async fn apply_flow_correlation_resistance(
1586 &self,
1587 messages: &mut [MixMessage],
1588 ) -> Result<(), OnionError> {
1589 use rand::seq::SliceRandom;
1591 messages.shuffle(&mut thread_rng());
1592
1593 for _ in 0..messages.len() {
1595 let delay = thread_rng().next_u64()
1596 % (self.config.max_inter_packet_delay - self.config.min_inter_packet_delay)
1597 + self.config.min_inter_packet_delay;
1598 tokio::time::sleep(Duration::from_millis(delay)).await;
1599 }
1600
1601 Ok(())
1602 }
1603}
1604
1605impl TrafficPatternDatabase {
1606 fn new() -> Self {
1607 let patterns = vec![
1609 TrafficPattern {
1610 packet_sizes: vec![1024, 1024, 512, 2048, 1024],
1611 inter_packet_delays: vec![50, 75, 25, 100, 30],
1612 weight: 1.0,
1613 },
1614 TrafficPattern {
1615 packet_sizes: vec![512, 512, 1024, 512, 4096],
1616 inter_packet_delays: vec![25, 25, 50, 25, 200],
1617 weight: 0.8,
1618 },
1619 TrafficPattern {
1620 packet_sizes: vec![2048, 1024, 1024, 1024, 2048],
1621 inter_packet_delays: vec![100, 50, 50, 50, 150],
1622 weight: 0.6,
1623 },
1624 ];
1625
1626 Self { patterns }
1627 }
1628
1629 fn select_random_pattern(&self) -> &TrafficPattern {
1630 let total_weight: f64 = self.patterns.iter().map(|p| p.weight).sum();
1632 let mut target = thread_rng().gen::<f64>() * total_weight;
1633
1634 for pattern in &self.patterns {
1635 target -= pattern.weight;
1636 if target <= 0.0 {
1637 return pattern;
1638 }
1639 }
1640
1641 &self.patterns[0]
1643 }
1644}