use qudag_crypto::kem::{PublicKey as KEMPublicKey, SecretKey as KEMSecretKey};
use qudag_crypto::ml_kem::MlKem768;
use rand::{thread_rng, Rng, RngCore};
use ring::aead::{Aad, LessSafeKey, Nonce, UnboundKey, CHACHA20_POLY1305};
use ring::rand::{SecureRandom, SystemRandom};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use thiserror::Error;
use tokio::sync::Mutex as TokioMutex;
#[derive(Error, Debug)]
pub enum OnionError {
#[error("layer encryption failed: {0}")]
EncryptionError(String),
#[error("layer decryption failed: {0}")]
DecryptionError(String),
#[error("invalid layer format: {0}")]
InvalidFormat(String),
#[error("ML-KEM error: {0}")]
MLKEMError(String),
#[error("RNG error: {0}")]
RngError(String),
#[error("route construction failed: {0}")]
RouteError(String),
#[error("timing constraint violated: {0}")]
TimingError(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OnionLayer {
pub next_hop: Vec<u8>,
pub payload: Vec<u8>,
pub metadata: Vec<u8>,
pub kem_ciphertext: Vec<u8>,
pub nonce: [u8; 12],
pub auth_tag: Vec<u8>,
pub timestamp: u64,
pub padding: Vec<u8>,
}
impl OnionLayer {
pub fn new(next_hop: Vec<u8>, payload: Vec<u8>, metadata: Vec<u8>) -> Self {
let rng = SystemRandom::new();
let mut nonce = [0u8; 12];
rng.fill(&mut nonce).expect("RNG failure");
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let mut padding = vec![0u8; thread_rng().next_u32() as usize % 256];
thread_rng().fill_bytes(&mut padding);
Self {
next_hop,
payload,
metadata,
kem_ciphertext: Vec::new(),
nonce,
auth_tag: Vec::new(),
timestamp,
padding,
}
}
pub fn validate(&self) -> Result<(), OnionError> {
if self.next_hop.is_empty() {
return Err(OnionError::InvalidFormat("empty next hop key".into()));
}
if self.payload.is_empty() {
return Err(OnionError::InvalidFormat("empty payload".into()));
}
if self.kem_ciphertext.is_empty() {
return Err(OnionError::InvalidFormat("missing KEM ciphertext".into()));
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if now.saturating_sub(self.timestamp) > 300_000 {
return Err(OnionError::TimingError("layer too old".into()));
}
Ok(())
}
pub fn total_size(&self) -> usize {
self.next_hop.len()
+ self.payload.len()
+ self.metadata.len()
+ self.kem_ciphertext.len()
+ self.auth_tag.len()
+ self.padding.len()
+ 12
+ 8 }
pub fn normalize_size(&mut self, target_size: usize) {
let current_size = self.total_size();
if current_size < target_size {
let padding_needed = target_size - current_size;
let mut additional_padding = vec![0u8; padding_needed];
thread_rng().fill_bytes(&mut additional_padding);
self.padding.extend(additional_padding);
}
}
}
pub trait OnionRouter: Send + Sync {
fn encrypt_layers(
&self,
message: Vec<u8>,
route: Vec<Vec<u8>>,
) -> Result<Vec<OnionLayer>, OnionError>;
fn decrypt_layer(&self, layer: OnionLayer)
-> Result<(Vec<u8>, Option<OnionLayer>), OnionError>;
fn create_metadata(&self, route_info: Vec<u8>) -> Result<Vec<u8>, OnionError>;
}
pub struct MLKEMOnionRouter {
ml_kem_secret_key: KEMSecretKey,
ml_kem_public_key: KEMPublicKey,
rng: SystemRandom,
standard_layer_size: usize,
#[allow(dead_code)]
circuit_manager: Arc<TokioMutex<CircuitManager>>,
directory_client: Arc<DirectoryClient>,
}
impl MLKEMOnionRouter {
pub async fn new() -> Result<Self, OnionError> {
let (public_key, secret_key) = MlKem768::keygen()
.map_err(|e| OnionError::MLKEMError(format!("Key generation failed: {:?}", e)))?;
let circuit_manager = Arc::new(TokioMutex::new(CircuitManager::new()));
let directory_client = Arc::new(DirectoryClient::new());
Ok(Self {
ml_kem_secret_key: secret_key,
ml_kem_public_key: public_key,
rng: SystemRandom::new(),
standard_layer_size: 4096, circuit_manager,
directory_client,
})
}
pub async fn with_layer_size(layer_size: usize) -> Result<Self, OnionError> {
let mut router = Self::new().await?;
router.standard_layer_size = layer_size;
Ok(router)
}
pub fn get_public_key(&self) -> &KEMPublicKey {
&self.ml_kem_public_key
}
fn derive_symmetric_key(&self, shared_secret: &[u8]) -> Result<[u8; 32], OnionError> {
use ring::hkdf;
let salt = ring::hkdf::Salt::new(hkdf::HKDF_SHA256, b"QuDAG-Onion-v1");
let prk = salt.extract(shared_secret);
let mut key = [0u8; 32];
let info = [&b"symmetric-key"[..]];
prk.expand(&info[..], hkdf::HKDF_SHA256)
.map_err(|_| OnionError::EncryptionError("Key derivation failed".into()))?
.fill(&mut key)
.map_err(|_| OnionError::EncryptionError("Key derivation failed".into()))?;
Ok(key)
}
fn encrypt_aead(
&self,
key: &[u8; 32],
nonce: &[u8; 12],
data: &[u8],
) -> Result<Vec<u8>, OnionError> {
let unbound_key = UnboundKey::new(&CHACHA20_POLY1305, key)
.map_err(|e| OnionError::EncryptionError(e.to_string()))?;
let sealing_key = LessSafeKey::new(unbound_key);
let mut encrypted_data = data.to_vec();
sealing_key
.seal_in_place_append_tag(
Nonce::assume_unique_for_key(*nonce),
Aad::empty(),
&mut encrypted_data,
)
.map_err(|e| OnionError::EncryptionError(e.to_string()))?;
Ok(encrypted_data)
}
fn decrypt_aead(
&self,
key: &[u8; 32],
nonce: &[u8; 12],
encrypted_data: &mut [u8],
) -> Result<Vec<u8>, OnionError> {
let unbound_key = UnboundKey::new(&CHACHA20_POLY1305, key)
.map_err(|e| OnionError::DecryptionError(e.to_string()))?;
let opening_key = LessSafeKey::new(unbound_key);
let decrypted = opening_key
.open_in_place(
Nonce::assume_unique_for_key(*nonce),
Aad::empty(),
encrypted_data,
)
.map_err(|e| OnionError::DecryptionError(e.to_string()))?;
Ok(decrypted.to_vec())
}
#[allow(dead_code)]
async fn add_timing_obfuscation(&self) {
let delay_ms = (thread_rng().next_u32() % 90) + 10;
tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
}
}
impl MLKEMOnionRouter {
pub async fn encrypt_layers(
&self,
message: Vec<u8>,
route: Vec<Vec<u8>>,
) -> Result<Vec<OnionLayer>, OnionError> {
if route.is_empty() {
return Err(OnionError::RouteError("empty route".into()));
}
let mut layers = Vec::new();
let mut current_payload = message;
for (i, _hop_pubkey) in route.iter().rev().enumerate() {
let mut nonce = [0u8; 12];
self.rng
.fill(&mut nonce)
.map_err(|e| OnionError::RngError(e.to_string()))?;
let hop_public_key = self
.directory_client
.get_public_key(&route[route.len() - i - 1])
.await
.map_err(|e| OnionError::RouteError(format!("Failed to get public key: {}", e)))?;
let (kem_ciphertext, shared_secret) = MlKem768::encapsulate(&hop_public_key)
.map_err(|e| OnionError::MLKEMError(format!("Encapsulation failed: {:?}", e)))?;
let symmetric_key = self.derive_symmetric_key(shared_secret.as_bytes())?;
let circuit_id = thread_rng().next_u64();
let hop_info = HopMetadata {
circuit_id,
hop_number: i as u8,
next_hop: if i == 0 {
None
} else {
Some(route[route.len() - i].clone())
},
flags: LayerFlags::default(),
};
let metadata = self.create_metadata(
bincode::serialize(&hop_info)
.map_err(|e| OnionError::InvalidFormat(e.to_string()))?,
)?;
let next_hop = if i == 0 {
Vec::new() } else {
route[route.len() - i].clone()
};
let mut layer = OnionLayer::new(next_hop, current_payload.clone(), metadata);
layer.kem_ciphertext = kem_ciphertext.as_bytes().to_vec();
layer.nonce = nonce;
let encrypted_payload = self.encrypt_aead(&symmetric_key, &nonce, ¤t_payload)?;
layer.payload = encrypted_payload;
layer.normalize_size(self.standard_layer_size);
layer.validate()?;
current_payload = bincode::serialize(&layer)
.map_err(|e| OnionError::EncryptionError(e.to_string()))?;
layers.push(layer);
}
layers.reverse();
Ok(layers)
}
pub fn decrypt_layer(
&self,
layer: OnionLayer,
) -> Result<(Vec<u8>, Option<OnionLayer>), OnionError> {
layer.validate()?;
let kem_ciphertext = qudag_crypto::kem::Ciphertext::from_bytes(&layer.kem_ciphertext)
.map_err(|_| OnionError::MLKEMError("Invalid KEM ciphertext".into()))?;
let shared_secret = MlKem768::decapsulate(&self.ml_kem_secret_key, &kem_ciphertext)
.map_err(|e| OnionError::MLKEMError(format!("Decapsulation failed: {:?}", e)))?;
let symmetric_key = self.derive_symmetric_key(shared_secret.as_bytes())?;
let mut encrypted_payload = layer.payload.clone();
let decrypted_payload =
self.decrypt_aead(&symmetric_key, &layer.nonce, &mut encrypted_payload)?;
if !layer.next_hop.is_empty() {
match bincode::deserialize::<OnionLayer>(&decrypted_payload) {
Ok(next_layer) => Ok((decrypted_payload, Some(next_layer))),
Err(_) => {
Ok((decrypted_payload, None))
}
}
} else {
Ok((decrypted_payload, None))
}
}
fn create_metadata(&self, route_info: Vec<u8>) -> Result<Vec<u8>, OnionError> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let mut metadata = Vec::new();
metadata.extend_from_slice(×tamp.to_le_bytes());
metadata.extend_from_slice(&route_info);
let mut padding = vec![0u8; thread_rng().next_u32() as usize % 128];
thread_rng().fill_bytes(&mut padding);
metadata.extend(padding);
Ok(metadata)
}
}
impl OnionRouter for MLKEMOnionRouter {
fn encrypt_layers(
&self,
message: Vec<u8>,
route: Vec<Vec<u8>>,
) -> Result<Vec<OnionLayer>, OnionError> {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(self.encrypt_layers(message, route))
})
}
fn decrypt_layer(
&self,
layer: OnionLayer,
) -> Result<(Vec<u8>, Option<OnionLayer>), OnionError> {
self.decrypt_layer(layer)
}
fn create_metadata(&self, route_info: Vec<u8>) -> Result<Vec<u8>, OnionError> {
self.create_metadata(route_info)
}
}
#[derive(Debug)]
pub struct MixNode {
pub id: Vec<u8>,
config: MixConfig,
message_buffer: Vec<MixMessage>,
last_flush: SystemTime,
dummy_generator: DummyTrafficGenerator,
traffic_shaper: TrafficShaper,
}
#[derive(Debug, Clone)]
pub struct MixConfig {
pub batch_size: usize,
pub batch_timeout: Duration,
pub target_rate: f64,
pub dummy_probability: f64,
pub timing_obfuscation: bool,
}
impl Default for MixConfig {
fn default() -> Self {
Self {
batch_size: 100,
batch_timeout: Duration::from_millis(500),
target_rate: 50.0, dummy_probability: 0.1, timing_obfuscation: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MixMessage {
pub content: Vec<u8>,
pub priority: u8,
pub timestamp: u64,
pub message_type: MixMessageType,
pub normalized_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MixMessageType {
Real,
Dummy,
Heartbeat,
}
impl MixNode {
pub fn new(id: Vec<u8>) -> Self {
Self::with_config(id, MixConfig::default())
}
pub fn with_config(id: Vec<u8>, config: MixConfig) -> Self {
Self {
id,
config: config.clone(),
message_buffer: Vec::with_capacity(config.batch_size * 2),
last_flush: SystemTime::now(),
dummy_generator: DummyTrafficGenerator::new(config.dummy_probability),
traffic_shaper: TrafficShaper::new(config.target_rate),
}
}
pub async fn add_message(&mut self, mut message: MixMessage) -> Result<(), OnionError> {
message.normalized_size = self.normalize_message_size(&message);
self.message_buffer.push(message);
if self.should_flush() {
self.flush_batch().await?;
}
Ok(())
}
fn should_flush(&self) -> bool {
self.message_buffer.len() >= self.config.batch_size
|| self.last_flush.elapsed().unwrap_or(Duration::ZERO) >= self.config.batch_timeout
}
pub async fn flush_batch(&mut self) -> Result<Vec<MixMessage>, OnionError> {
if self.message_buffer.is_empty() {
return Ok(Vec::new());
}
self.add_dummy_messages();
use rand::seq::SliceRandom;
self.message_buffer.shuffle(&mut thread_rng());
self.traffic_shaper.apply_shaping().await;
if self.config.timing_obfuscation {
self.apply_timing_obfuscation().await;
}
let batch = std::mem::take(&mut self.message_buffer);
self.last_flush = SystemTime::now();
Ok(batch)
}
fn add_dummy_messages(&mut self) {
while self.message_buffer.len() < self.config.batch_size {
if let Some(dummy_msg) = self.dummy_generator.generate_dummy() {
self.message_buffer.push(dummy_msg);
} else {
break; }
}
}
fn normalize_message_size(&self, message: &MixMessage) -> usize {
let standard_sizes = [512, 1024, 2048, 4096, 8192];
let content_size = message.content.len();
for &size in &standard_sizes {
if content_size <= size {
return size;
}
}
content_size.div_ceil(1024) * 1024
}
async fn apply_timing_obfuscation(&self) {
let delay_ms = (thread_rng().next_u32() % 100) + 50;
tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
}
pub fn get_stats(&self) -> MixNodeStats {
MixNodeStats {
buffer_size: self.message_buffer.len(),
last_flush_elapsed: self.last_flush.elapsed().unwrap_or(Duration::ZERO),
dummy_ratio: self.dummy_generator.get_dummy_ratio(),
target_rate: self.config.target_rate,
}
}
}
#[derive(Debug, Clone)]
pub struct MixNodeStats {
pub buffer_size: usize,
pub last_flush_elapsed: Duration,
pub dummy_ratio: f64,
pub target_rate: f64,
}
#[derive(Debug)]
struct DummyTrafficGenerator {
probability: f64,
dummy_count: usize,
total_count: usize,
}
impl DummyTrafficGenerator {
fn new(probability: f64) -> Self {
Self {
probability: probability.clamp(0.0, 1.0),
dummy_count: 0,
total_count: 0,
}
}
fn generate_dummy(&mut self) -> Option<MixMessage> {
self.total_count += 1;
if thread_rng().gen::<f64>() < self.probability {
self.dummy_count += 1;
let size = (thread_rng().next_u32() % 4096) + 256; let mut content = vec![0u8; size as usize];
thread_rng().fill_bytes(&mut content);
Some(MixMessage {
content,
priority: 0,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
message_type: MixMessageType::Dummy,
normalized_size: 0, })
} else {
None
}
}
fn get_dummy_ratio(&self) -> f64 {
if self.total_count == 0 {
0.0
} else {
self.dummy_count as f64 / self.total_count as f64
}
}
}
#[derive(Debug)]
#[allow(dead_code)]
struct TrafficShaper {
target_rate: f64,
last_message_time: SystemTime,
message_interval: Duration,
}
impl TrafficShaper {
fn new(target_rate: f64) -> Self {
let message_interval = Duration::from_secs_f64(1.0 / target_rate.max(0.1));
Self {
target_rate,
last_message_time: SystemTime::now(),
message_interval,
}
}
async fn apply_shaping(&mut self) {
let now = SystemTime::now();
let elapsed = now
.duration_since(self.last_message_time)
.unwrap_or(Duration::ZERO);
if elapsed < self.message_interval {
let delay = self.message_interval - elapsed;
tokio::time::sleep(delay).await;
}
self.last_message_time = SystemTime::now();
}
}
pub struct MetadataProtector {
rng: SystemRandom,
config: MetadataConfig,
}
#[derive(Debug, Clone)]
pub struct MetadataConfig {
pub anonymize_ip: bool,
pub obfuscate_timing: bool,
pub normalize_size: bool,
pub randomize_headers: bool,
pub timing_bucket_ms: u64,
}
impl Default for MetadataConfig {
fn default() -> Self {
Self {
anonymize_ip: true,
obfuscate_timing: true,
normalize_size: true,
randomize_headers: true,
timing_bucket_ms: 100, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProtectedMetadata {
pub obfuscated_timestamp: u64,
pub random_headers: Vec<(String, Vec<u8>)>,
pub normalized_size: usize,
pub anonymous_ids: Vec<Vec<u8>>,
pub padding: Vec<u8>,
}
impl Default for MetadataProtector {
fn default() -> Self {
Self::new()
}
}
impl MetadataProtector {
pub fn new() -> Self {
Self::with_config(MetadataConfig::default())
}
pub fn with_config(config: MetadataConfig) -> Self {
Self {
rng: SystemRandom::new(),
config,
}
}
pub fn protect_metadata(
&self,
original_metadata: &[u8],
) -> Result<ProtectedMetadata, OnionError> {
let timestamp = if self.config.obfuscate_timing {
self.obfuscate_timestamp()?
} else {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
};
let random_headers = if self.config.randomize_headers {
self.generate_random_headers()?
} else {
Vec::new()
};
let normalized_size = if self.config.normalize_size {
self.normalize_packet_size(original_metadata.len())
} else {
original_metadata.len()
};
let anonymous_ids = self.generate_anonymous_ids()?;
let padding = self.generate_padding(normalized_size, original_metadata.len())?;
Ok(ProtectedMetadata {
obfuscated_timestamp: timestamp,
random_headers,
normalized_size,
anonymous_ids,
padding,
})
}
fn obfuscate_timestamp(&self) -> Result<u64, OnionError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let bucket_size = self.config.timing_bucket_ms;
let obfuscated = (now / bucket_size) * bucket_size;
let mut jitter_bytes = [0u8; 8];
self.rng
.fill(&mut jitter_bytes)
.map_err(|e| OnionError::RngError(e.to_string()))?;
let jitter = u64::from_le_bytes(jitter_bytes) % bucket_size;
Ok(obfuscated + jitter)
}
fn generate_random_headers(&self) -> Result<Vec<(String, Vec<u8>)>, OnionError> {
let header_names = [
"X-Request-ID",
"X-Correlation-ID",
"X-Session-ID",
"X-Trace-ID",
"X-Custom-Header",
"X-Client-ID",
];
let mut headers = Vec::new();
let num_headers = (thread_rng().next_u32() % 4) + 2;
for _ in 0..num_headers {
let name = header_names[thread_rng().next_u32() as usize % header_names.len()];
let mut value = vec![0u8; 16];
self.rng
.fill(&mut value)
.map_err(|e| OnionError::RngError(e.to_string()))?;
headers.push((name.to_string(), value));
}
Ok(headers)
}
fn normalize_packet_size(&self, original_size: usize) -> usize {
let standard_sizes = [512, 1024, 1536, 2048, 3072, 4096, 6144, 8192, 12288, 16384];
for &size in &standard_sizes {
if original_size <= size {
return size;
}
}
original_size.div_ceil(4096) * 4096
}
fn generate_anonymous_ids(&self) -> Result<Vec<Vec<u8>>, OnionError> {
let mut ids = Vec::new();
let num_ids = (thread_rng().next_u32() % 3) + 1;
for _ in 0..num_ids {
let mut id = vec![0u8; 32]; self.rng
.fill(&mut id)
.map_err(|e| OnionError::RngError(e.to_string()))?;
ids.push(id);
}
Ok(ids)
}
fn generate_padding(
&self,
target_size: usize,
current_size: usize,
) -> Result<Vec<u8>, OnionError> {
if target_size <= current_size {
return Ok(Vec::new());
}
let padding_size = target_size - current_size;
let mut padding = vec![0u8; padding_size];
self.rng
.fill(&mut padding)
.map_err(|e| OnionError::RngError(e.to_string()))?;
Ok(padding)
}
pub fn anonymize_ip(&self, original_ip: &str) -> Result<String, OnionError> {
if !self.config.anonymize_ip {
return Ok(original_ip.to_string());
}
let mut ip_bytes = [0u8; 4];
self.rng
.fill(&mut ip_bytes)
.map_err(|e| OnionError::RngError(e.to_string()))?;
ip_bytes[0] = 10;
Ok(format!(
"{}.{}.{}.{}",
ip_bytes[0], ip_bytes[1], ip_bytes[2], ip_bytes[3]
))
}
pub fn scrub_packet_headers(&self, packet: &mut Vec<u8>) -> Result<(), OnionError> {
let mut dummy_headers = vec![0u8; 20]; self.rng
.fill(&mut dummy_headers)
.map_err(|e| OnionError::RngError(e.to_string()))?;
let mut new_packet = dummy_headers;
new_packet.extend_from_slice(packet);
*packet = new_packet;
Ok(())
}
}
#[derive(Debug)]
pub struct CircuitManager {
circuits: HashMap<u64, Circuit>,
creation_rate: f64,
last_creation: Instant,
max_circuits: usize,
circuit_lifetime: Duration,
rotation_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct Circuit {
pub id: u64,
pub hops: Vec<Vec<u8>>,
pub state: CircuitState,
pub created_at: Instant,
pub last_activity: Instant,
pub bandwidth_used: u64,
pub quality_score: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitState {
Building,
Active,
TearingDown,
Closed,
Failed(String),
}
impl CircuitManager {
pub fn new() -> Self {
Self {
circuits: HashMap::new(),
creation_rate: 1.0, last_creation: Instant::now(),
max_circuits: 100,
circuit_lifetime: Duration::from_secs(600), rotation_interval: Duration::from_secs(300), }
}
pub async fn build_circuit(
&mut self,
hops: usize,
directory: &DirectoryClient,
) -> Result<u64, OnionError> {
let elapsed = self.last_creation.elapsed().as_secs_f64();
if elapsed < 1.0 / self.creation_rate {
return Err(OnionError::RouteError(
"Circuit creation rate limit exceeded".into(),
));
}
if self.circuits.len() >= self.max_circuits {
self.cleanup_inactive_circuits();
if self.circuits.len() >= self.max_circuits {
return Err(OnionError::RouteError("Maximum circuits reached".into()));
}
}
let nodes = directory.select_random_nodes(hops).await?;
let circuit_id = thread_rng().next_u64();
let circuit = Circuit {
id: circuit_id,
hops: nodes,
state: CircuitState::Building,
created_at: Instant::now(),
last_activity: Instant::now(),
bandwidth_used: 0,
quality_score: 1.0,
};
self.circuits.insert(circuit_id, circuit);
self.last_creation = Instant::now();
Ok(circuit_id)
}
pub fn activate_circuit(&mut self, circuit_id: u64) -> Result<(), OnionError> {
let circuit = self
.circuits
.get_mut(&circuit_id)
.ok_or_else(|| OnionError::RouteError("Circuit not found".into()))?;
if circuit.state != CircuitState::Building {
return Err(OnionError::RouteError(
"Circuit not in building state".into(),
));
}
circuit.state = CircuitState::Active;
circuit.last_activity = Instant::now();
Ok(())
}
pub async fn teardown_circuit(&mut self, circuit_id: u64) -> Result<(), OnionError> {
let circuit = self
.circuits
.get_mut(&circuit_id)
.ok_or_else(|| OnionError::RouteError("Circuit not found".into()))?;
circuit.state = CircuitState::TearingDown;
circuit.state = CircuitState::Closed;
Ok(())
}
pub fn get_active_circuit(&mut self) -> Option<&mut Circuit> {
self.circuits
.values_mut()
.filter(|c| c.state == CircuitState::Active)
.filter(|c| c.created_at.elapsed() < self.circuit_lifetime)
.max_by(|a, b| a.quality_score.partial_cmp(&b.quality_score).unwrap())
}
pub fn update_circuit_metrics(&mut self, circuit_id: u64, bytes_sent: u64, success: bool) {
if let Some(circuit) = self.circuits.get_mut(&circuit_id) {
circuit.last_activity = Instant::now();
circuit.bandwidth_used += bytes_sent;
if success {
circuit.quality_score = (circuit.quality_score * 0.95 + 1.0 * 0.05).min(1.0);
} else {
circuit.quality_score = (circuit.quality_score * 0.95).max(0.0);
}
}
}
pub fn needs_rotation(&self) -> bool {
self.circuits
.values()
.filter(|c| c.state == CircuitState::Active)
.any(|c| c.created_at.elapsed() >= self.rotation_interval)
}
pub fn cleanup_inactive_circuits(&mut self) {
let _now = Instant::now();
self.circuits.retain(|_, circuit| match circuit.state {
CircuitState::Closed | CircuitState::Failed(_) => false,
_ => circuit.created_at.elapsed() < self.circuit_lifetime * 2,
});
}
pub fn get_stats(&self) -> CircuitStats {
let active_count = self
.circuits
.values()
.filter(|c| c.state == CircuitState::Active)
.count();
let total_bandwidth = self.circuits.values().map(|c| c.bandwidth_used).sum();
let avg_quality = if active_count > 0 {
self.circuits
.values()
.filter(|c| c.state == CircuitState::Active)
.map(|c| c.quality_score)
.sum::<f64>()
/ active_count as f64
} else {
0.0
};
CircuitStats {
total_circuits: self.circuits.len(),
active_circuits: active_count,
total_bandwidth,
average_quality: avg_quality,
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitStats {
pub total_circuits: usize,
pub active_circuits: usize,
pub total_bandwidth: u64,
pub average_quality: f64,
}
#[derive(Debug)]
pub struct DirectoryClient {
nodes: Arc<TokioMutex<HashMap<Vec<u8>, NodeInfo>>>,
#[allow(dead_code)]
directory_servers: Vec<String>,
last_update: Arc<TokioMutex<Instant>>,
update_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct NodeInfo {
pub id: Vec<u8>,
pub public_key: KEMPublicKey,
pub address: String,
pub bandwidth: u64,
pub uptime: Duration,
pub flags: NodeFlags,
pub last_seen: Instant,
}
#[derive(Debug, Clone, Default)]
pub struct NodeFlags {
pub guard: bool,
pub exit: bool,
pub directory: bool,
pub fast: bool,
pub stable: bool,
}
impl DirectoryClient {
pub fn new() -> Self {
Self {
nodes: Arc::new(TokioMutex::new(HashMap::new())),
directory_servers: vec![
"dir1.qudag.net:9030".to_string(),
"dir2.qudag.net:9030".to_string(),
"dir3.qudag.net:9030".to_string(),
],
last_update: Arc::new(TokioMutex::new(Instant::now())),
update_interval: Duration::from_secs(3600), }
}
pub async fn get_public_key(&self, node_id: &[u8]) -> Result<KEMPublicKey, String> {
let nodes = self.nodes.lock().await;
nodes
.get(node_id)
.map(|info| info.public_key.clone())
.ok_or_else(|| "Node not found in directory".to_string())
}
pub async fn select_random_nodes(&self, count: usize) -> Result<Vec<Vec<u8>>, OnionError> {
self.update_directory_if_needed().await?;
let nodes = self.nodes.lock().await;
let active_nodes: Vec<_> = nodes
.values()
.filter(|n| n.last_seen.elapsed() < Duration::from_secs(3600))
.filter(|n| n.flags.stable)
.collect();
if active_nodes.len() < count {
return Err(OnionError::RouteError("Not enough active nodes".into()));
}
let mut selected = Vec::new();
let mut available = active_nodes.clone();
for i in 0..count {
if i == 0 {
let guards: Vec<_> = available
.iter()
.filter(|n| n.flags.guard)
.copied()
.collect();
if !guards.is_empty() {
available = guards;
}
}
if i == count - 1 {
available.retain(|n| n.flags.exit);
if available.is_empty() {
return Err(OnionError::RouteError("No exit nodes available".into()));
}
}
let total_bandwidth: u64 = available.iter().map(|n| n.bandwidth).sum();
let mut target = thread_rng().gen_range(0..total_bandwidth);
for (idx, node) in available.iter().enumerate() {
if target < node.bandwidth {
selected.push(node.public_key.as_bytes().to_vec());
available.remove(idx);
break;
}
target -= node.bandwidth;
}
}
Ok(selected)
}
async fn update_directory_if_needed(&self) -> Result<(), OnionError> {
let last_update = *self.last_update.lock().await;
if last_update.elapsed() < self.update_interval {
return Ok(());
}
self.populate_dummy_nodes().await;
*self.last_update.lock().await = Instant::now();
Ok(())
}
async fn populate_dummy_nodes(&self) {
let mut nodes = self.nodes.lock().await;
for i in 0..20 {
let (public_key, _) = MlKem768::keygen().unwrap();
let node_id = vec![i as u8; 32];
let info = NodeInfo {
id: node_id.clone(),
public_key,
address: format!("node{}.qudag.net:9001", i),
bandwidth: 1_000_000 * (i as u64 + 1), uptime: Duration::from_secs(3600 * (i as u64 + 1)),
flags: NodeFlags {
guard: i < 5,
exit: i >= 15,
directory: i % 5 == 0,
fast: i % 2 == 0,
stable: true,
},
last_seen: Instant::now(),
};
nodes.insert(node_id, info);
}
}
pub async fn measure_bandwidth(&self, node_id: &[u8]) -> Result<u64, OnionError> {
let nodes = self.nodes.lock().await;
nodes
.get(node_id)
.map(|info| info.bandwidth)
.ok_or_else(|| OnionError::RouteError("Node not found".into()))
}
pub async fn get_load_balancing_weights(&self) -> HashMap<Vec<u8>, f64> {
let nodes = self.nodes.lock().await;
let total_bandwidth: u64 = nodes.values().map(|n| n.bandwidth).sum();
nodes
.iter()
.map(|(id, info)| {
let weight = info.bandwidth as f64 / total_bandwidth as f64;
(id.clone(), weight)
})
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HopMetadata {
pub circuit_id: u64,
pub hop_number: u8,
pub next_hop: Option<Vec<u8>>,
pub flags: LayerFlags,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LayerFlags {
pub is_exit: bool,
pub padding_enabled: bool,
pub timing_obfuscation: bool,
pub ack_required: bool,
}
pub struct TrafficAnalysisResistance {
config: TrafficAnalysisConfig,
pattern_db: TrafficPatternDatabase,
}
#[derive(Debug, Clone)]
pub struct TrafficAnalysisConfig {
pub enable_pattern_mimicking: bool,
pub enable_burst_obfuscation: bool,
pub enable_flow_correlation_resistance: bool,
pub min_inter_packet_delay: u64,
pub max_inter_packet_delay: u64,
}
impl Default for TrafficAnalysisConfig {
fn default() -> Self {
Self {
enable_pattern_mimicking: true,
enable_burst_obfuscation: true,
enable_flow_correlation_resistance: true,
min_inter_packet_delay: 10,
max_inter_packet_delay: 100,
}
}
}
#[derive(Debug)]
struct TrafficPatternDatabase {
patterns: Vec<TrafficPattern>,
}
#[derive(Debug, Clone)]
struct TrafficPattern {
packet_sizes: Vec<usize>,
inter_packet_delays: Vec<u64>,
weight: f64,
}
impl Default for TrafficAnalysisResistance {
fn default() -> Self {
Self::new()
}
}
impl TrafficAnalysisResistance {
pub fn new() -> Self {
Self::with_config(TrafficAnalysisConfig::default())
}
pub fn with_config(config: TrafficAnalysisConfig) -> Self {
Self {
config,
pattern_db: TrafficPatternDatabase::new(),
}
}
pub async fn apply_resistance(&self, messages: &mut [MixMessage]) -> Result<(), OnionError> {
if self.config.enable_pattern_mimicking {
self.apply_pattern_mimicking(messages).await?;
}
if self.config.enable_burst_obfuscation {
self.apply_burst_obfuscation(messages).await?;
}
if self.config.enable_flow_correlation_resistance {
self.apply_flow_correlation_resistance(messages).await?;
}
Ok(())
}
async fn apply_pattern_mimicking(&self, messages: &mut [MixMessage]) -> Result<(), OnionError> {
let pattern = self.pattern_db.select_random_pattern();
for (i, message) in messages.iter_mut().enumerate() {
if let Some(&target_size) = pattern.packet_sizes.get(i % pattern.packet_sizes.len()) {
message.normalized_size = target_size;
if message.content.len() < target_size {
let padding_size = target_size - message.content.len();
let mut padding = vec![0u8; padding_size];
thread_rng().fill_bytes(&mut padding);
message.content.extend(padding);
} else if message.content.len() > target_size {
message.content.truncate(target_size);
}
}
}
for (i, &delay) in pattern.inter_packet_delays.iter().enumerate() {
if i > 0 && i <= messages.len() {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
Ok(())
}
async fn apply_burst_obfuscation(
&self,
_messages: &mut [MixMessage],
) -> Result<(), OnionError> {
let burst_delay = thread_rng().next_u64()
% (self.config.max_inter_packet_delay - self.config.min_inter_packet_delay)
+ self.config.min_inter_packet_delay;
tokio::time::sleep(Duration::from_millis(burst_delay)).await;
Ok(())
}
async fn apply_flow_correlation_resistance(
&self,
messages: &mut [MixMessage],
) -> Result<(), OnionError> {
use rand::seq::SliceRandom;
messages.shuffle(&mut thread_rng());
for _ in 0..messages.len() {
let delay = thread_rng().next_u64()
% (self.config.max_inter_packet_delay - self.config.min_inter_packet_delay)
+ self.config.min_inter_packet_delay;
tokio::time::sleep(Duration::from_millis(delay)).await;
}
Ok(())
}
}
impl TrafficPatternDatabase {
fn new() -> Self {
let patterns = vec![
TrafficPattern {
packet_sizes: vec![1024, 1024, 512, 2048, 1024],
inter_packet_delays: vec![50, 75, 25, 100, 30],
weight: 1.0,
},
TrafficPattern {
packet_sizes: vec![512, 512, 1024, 512, 4096],
inter_packet_delays: vec![25, 25, 50, 25, 200],
weight: 0.8,
},
TrafficPattern {
packet_sizes: vec![2048, 1024, 1024, 1024, 2048],
inter_packet_delays: vec![100, 50, 50, 50, 150],
weight: 0.6,
},
];
Self { patterns }
}
fn select_random_pattern(&self) -> &TrafficPattern {
let total_weight: f64 = self.patterns.iter().map(|p| p.weight).sum();
let mut target = thread_rng().gen::<f64>() * total_weight;
for pattern in &self.patterns {
target -= pattern.weight;
if target <= 0.0 {
return pattern;
}
}
&self.patterns[0]
}
}