qudag_network/
routing.rs

1use crate::discovery::{
2    DiscoveredPeer, GeoPreferences, LoadBalancer, LoadBalancingAlgorithm, PeerScoringConfig,
3    PeerSelector,
4};
5use crate::shadow_address::{ShadowAddress, ShadowAddressError, ShadowAddressResolver};
6use libp2p::PeerId as LibP2PPeerId;
7use rand::seq::SliceRandom;
8use rand::thread_rng;
9use std::collections::{HashMap, HashSet};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12use thiserror::Error;
13use tokio::sync::{mpsc, RwLock};
14
15/// Errors that can occur during routing operations
16#[derive(Error, Debug)]
17pub enum RoutingError {
18    #[error("No route available to destination")]
19    NoRoute,
20    #[error("Message too large: {size} bytes exceeds limit of {limit} bytes")]
21    MessageTooLarge { size: usize, limit: usize },
22    #[error("Channel send error")]
23    ChannelError,
24    #[error("Shadow address error: {0}")]
25    ShadowAddressError(#[from] ShadowAddressError),
26    #[error("Load balancer error: {0}")]
27    LoadBalancerError(String),
28    #[error("Peer selection error: {0}")]
29    PeerSelectionError(String),
30    #[error("Route optimization failed: {0}")]
31    RouteOptimizationError(String),
32    #[error("Dark addressing not available")]
33    DarkAddressingUnavailable,
34    #[error("Circuit breaker is open for peer {peer_id}")]
35    CircuitBreakerOpen { peer_id: String },
36    #[error("All peers are overloaded")]
37    AllPeersOverloaded,
38    #[error("Geographic constraints cannot be satisfied")]
39    GeographicConstraintsUnsatisfied,
40    #[error("Network topology insufficient for routing")]
41    TopologyInsufficient,
42}
43
44/// Message destination type
45#[derive(Debug, Clone)]
46pub enum Destination {
47    /// Direct peer routing
48    Peer(LibP2PPeerId),
49    /// Shadow address routing
50    Shadow(ShadowAddress),
51}
52
53impl From<LibP2PPeerId> for Destination {
54    fn from(peer_id: LibP2PPeerId) -> Self {
55        Destination::Peer(peer_id)
56    }
57}
58
59impl From<ShadowAddress> for Destination {
60    fn from(addr: ShadowAddress) -> Self {
61        Destination::Shadow(addr)
62    }
63}
64
65/// Enhanced route path with comprehensive metrics and optimization
66#[derive(Clone, Debug)]
67pub struct RoutePath {
68    /// Sequence of peer hops
69    hops: Vec<LibP2PPeerId>,
70    /// Expected end-to-end latency
71    latency: Duration,
72    /// Route reliability score (0.0 to 1.0)
73    reliability: f64,
74    /// Bandwidth capacity in bytes per second
75    bandwidth_capacity: Option<u64>,
76    /// Load factor for this route (0.0 to 1.0)
77    #[allow(dead_code)]
78    load_factor: f64,
79    /// Geographic diversity score
80    #[allow(dead_code)]
81    geographic_diversity: f64,
82    /// Security level (based on encryption and peer reputation)
83    #[allow(dead_code)]
84    security_level: SecurityLevel,
85    /// Route cost (for optimization algorithms)
86    #[allow(dead_code)]
87    cost: f64,
88    /// Route creation timestamp
89    created_at: Instant,
90    /// Route last used timestamp
91    #[allow(dead_code)]
92    last_used: Option<Instant>,
93    /// Usage count
94    #[allow(dead_code)]
95    usage_count: u64,
96    /// Success rate for this route
97    #[allow(dead_code)]
98    success_rate: f64,
99    /// Dark addressing support
100    supports_dark_addressing: bool,
101    /// Onion routing capability
102    supports_onion_routing: bool,
103}
104
105/// Security levels for routing paths
106#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
107pub enum SecurityLevel {
108    /// Basic security (standard encryption)
109    Basic,
110    /// Enhanced security (quantum-resistant)
111    Enhanced,
112    /// Maximum security (onion routing + dark addressing)
113    Maximum,
114}
115
116/// Route selection criteria for different use cases
117#[derive(Debug, Clone)]
118pub struct RouteSelectionCriteria {
119    /// Maximum acceptable latency
120    max_latency: Option<Duration>,
121    /// Minimum reliability requirement
122    min_reliability: f64,
123    /// Required security level
124    #[allow(dead_code)]
125    required_security: SecurityLevel,
126    /// Bandwidth requirements in bps
127    min_bandwidth: Option<u64>,
128    /// Geographic constraints
129    #[allow(dead_code)]
130    geographic_constraints: GeographicConstraints,
131    /// Load balancing preferences
132    #[allow(dead_code)]
133    load_balancing_preference: LoadBalancingPreference,
134    /// Redundancy requirements
135    redundancy_level: RedundancyLevel,
136    /// Dark addressing requirement
137    require_dark_addressing: bool,
138    /// Onion routing requirement
139    require_onion_routing: bool,
140}
141
142/// Geographic constraints for routing
143#[derive(Debug, Clone)]
144#[allow(dead_code)]
145pub struct GeographicConstraints {
146    /// Preferred regions (ISO country codes)
147    preferred_regions: Vec<String>,
148    /// Excluded regions
149    excluded_regions: Vec<String>,
150    /// Maximum distance from source in kilometers
151    max_distance_km: Option<f64>,
152    /// Require geographic diversity in multi-hop routes
153    require_diversity: bool,
154}
155
156/// Load balancing preferences for route selection
157#[derive(Debug, Clone)]
158pub enum LoadBalancingPreference {
159    /// Prefer routes with lowest load
160    LowLoad,
161    /// Distribute load evenly across available routes
162    EvenDistribution,
163    /// Use weighted distribution based on peer capacity
164    WeightedCapacity,
165    /// Adaptive based on current network conditions
166    Adaptive,
167}
168
169/// Redundancy levels for fault tolerance
170#[derive(Debug, Clone)]
171pub enum RedundancyLevel {
172    /// No redundancy required
173    None,
174    /// Basic redundancy (backup route)
175    Basic,
176    /// High redundancy (multiple disjoint paths)
177    High,
178    /// Maximum redundancy (full mesh of paths)
179    Maximum,
180}
181
182impl RedundancyLevel {
183    /// Get the number of paths required for this redundancy level
184    pub fn path_count(&self) -> usize {
185        match self {
186            RedundancyLevel::None => 1,
187            RedundancyLevel::Basic => 2,
188            RedundancyLevel::High => 3,
189            RedundancyLevel::Maximum => 5,
190        }
191    }
192}
193
194/// Production-ready multi-path router with advanced load balancing and dark addressing
195pub struct Router {
196    /// Simple peer connectivity graph
197    peer_connections: Arc<RwLock<HashMap<LibP2PPeerId, HashSet<LibP2PPeerId>>>>,
198    /// Discovered peers with comprehensive metrics
199    peers: Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
200    /// Route cache with performance metrics
201    route_cache: Arc<RwLock<HashMap<LibP2PPeerId, Vec<RoutePath>>>>,
202    /// Message channel for routing
203    message_tx: mpsc::Sender<Vec<u8>>,
204    /// Shadow address resolver
205    shadow_resolver: Option<Box<dyn ShadowAddressResolver + Send + Sync>>,
206    /// Load balancer for peer selection
207    load_balancer: Arc<Mutex<LoadBalancer>>,
208    /// Peer selector with geographic awareness
209    peer_selector: Arc<Mutex<PeerSelector>>,
210    /// Scoring configuration
211    scoring_config: PeerScoringConfig,
212    /// Route optimization settings
213    optimization_config: RouteOptimizationConfig,
214    /// Dark addressing configuration
215    dark_addressing_config: DarkAddressingConfig,
216    /// Performance monitoring
217    performance_metrics: Arc<Mutex<RouterPerformanceMetrics>>,
218    /// Route usage statistics
219    route_stats: Arc<Mutex<HashMap<LibP2PPeerId, RouteStatistics>>>,
220    /// Maximum message size
221    max_message_size: usize,
222    /// Circuit breaker states
223    circuit_breakers: Arc<RwLock<HashMap<LibP2PPeerId, CircuitBreakerState>>>,
224}
225
226/// Route optimization configuration
227#[derive(Debug, Clone)]
228pub struct RouteOptimizationConfig {
229    /// Enable automatic route optimization
230    #[allow(dead_code)]
231    enable_optimization: bool,
232    /// Optimization interval
233    #[allow(dead_code)]
234    optimization_interval: Duration,
235    /// Route cache size
236    #[allow(dead_code)]
237    cache_size: usize,
238    /// Route cache TTL
239    cache_ttl: Duration,
240    /// Prefer shorter paths
241    #[allow(dead_code)]
242    prefer_shorter_paths: bool,
243    /// Weight factors for route selection
244    weight_factors: RouteWeightFactors,
245    /// Enable adaptive routing
246    #[allow(dead_code)]
247    enable_adaptive_routing: bool,
248}
249
250impl Default for RouteOptimizationConfig {
251    fn default() -> Self {
252        Self {
253            enable_optimization: true,
254            optimization_interval: Duration::from_secs(60),
255            cache_size: 1000,
256            cache_ttl: Duration::from_secs(300),
257            prefer_shorter_paths: true,
258            weight_factors: RouteWeightFactors::default(),
259            enable_adaptive_routing: true,
260        }
261    }
262}
263
264/// Weight factors for route scoring
265#[derive(Debug, Clone)]
266pub struct RouteWeightFactors {
267    /// Latency weight (0.0 to 1.0)
268    latency_weight: f64,
269    /// Reliability weight (0.0 to 1.0)
270    reliability_weight: f64,
271    /// Load weight (0.0 to 1.0)
272    load_weight: f64,
273    /// Security weight (0.0 to 1.0)
274    #[allow(dead_code)]
275    security_weight: f64,
276    /// Geographic diversity weight (0.0 to 1.0)
277    #[allow(dead_code)]
278    diversity_weight: f64,
279}
280
281impl Default for RouteWeightFactors {
282    fn default() -> Self {
283        Self {
284            latency_weight: 0.3,
285            reliability_weight: 0.3,
286            load_weight: 0.2,
287            security_weight: 0.1,
288            diversity_weight: 0.1,
289        }
290    }
291}
292
293/// Dark addressing configuration for routing
294#[derive(Debug, Clone)]
295pub struct DarkAddressingConfig {
296    /// Enable dark addressing support
297    enabled: bool,
298    /// Preferred dark address resolution timeout
299    #[allow(dead_code)]
300    resolution_timeout: Duration,
301    /// Maximum resolution attempts
302    #[allow(dead_code)]
303    max_resolution_attempts: usize,
304    /// Cache resolved addresses
305    #[allow(dead_code)]
306    enable_caching: bool,
307    /// Dark address cache TTL
308    #[allow(dead_code)]
309    cache_ttl: Duration,
310}
311
312impl Default for DarkAddressingConfig {
313    fn default() -> Self {
314        Self {
315            enabled: true,
316            resolution_timeout: Duration::from_secs(10),
317            max_resolution_attempts: 3,
318            enable_caching: true,
319            cache_ttl: Duration::from_secs(600),
320        }
321    }
322}
323
324/// Router performance metrics
325#[derive(Debug, Clone, Default)]
326pub struct RouterPerformanceMetrics {
327    /// Total messages routed
328    total_messages: u64,
329    /// Successful routings
330    successful_routings: u64,
331    /// Failed routings
332    #[allow(dead_code)]
333    failed_routings: u64,
334    /// Average routing latency
335    #[allow(dead_code)]
336    avg_routing_latency: Duration,
337    /// Route cache hit rate
338    cache_hit_rate: f64,
339    /// Load balancing effectiveness
340    #[allow(dead_code)]
341    load_balancing_score: f64,
342    /// Dark addressing usage rate
343    #[allow(dead_code)]
344    dark_addressing_usage: f64,
345}
346
347/// Route usage statistics for optimization
348#[derive(Debug, Clone, Default)]
349#[allow(dead_code)]
350pub struct RouteStatistics {
351    /// Route usage count
352    usage_count: u64,
353    /// Success count
354    success_count: u64,
355    /// Failure count
356    failure_count: u64,
357    /// Average latency
358    avg_latency: Duration,
359    /// Last used timestamp
360    last_used: Option<Instant>,
361    /// Bandwidth utilization
362    bandwidth_utilization: f64,
363}
364
365/// Circuit breaker state for fault tolerance
366#[derive(Debug, Clone, PartialEq)]
367pub enum CircuitBreakerState {
368    /// Circuit closed (normal operation)
369    Closed,
370    /// Circuit open (failures detected)
371    Open {
372        opened_at: Instant,
373        failure_count: usize,
374    },
375    /// Circuit half-open (testing recovery)
376    HalfOpen {
377        test_count: usize,
378        success_count: usize,
379    },
380}
381
382impl Router {
383    /// Creates a new router instance with default configuration
384    pub fn new(message_tx: mpsc::Sender<Vec<u8>>) -> Self {
385        Self::new_with_config(
386            message_tx,
387            PeerScoringConfig::default(),
388            RouteOptimizationConfig::default(),
389            DarkAddressingConfig::default(),
390            16 * 1024 * 1024, // 16MB default
391        )
392    }
393
394    /// Creates a new production-ready router instance with custom configuration
395    pub fn new_with_config(
396        message_tx: mpsc::Sender<Vec<u8>>,
397        scoring_config: PeerScoringConfig,
398        optimization_config: RouteOptimizationConfig,
399        dark_addressing_config: DarkAddressingConfig,
400        max_message_size: usize,
401    ) -> Self {
402        Self {
403            peer_connections: Arc::new(RwLock::new(HashMap::new())),
404            peers: Arc::new(RwLock::new(HashMap::new())),
405            route_cache: Arc::new(RwLock::new(HashMap::new())),
406            message_tx,
407            shadow_resolver: None,
408            load_balancer: Arc::new(Mutex::new(LoadBalancer::new(
409                LoadBalancingAlgorithm::WeightedRoundRobin,
410            ))),
411            peer_selector: Arc::new(Mutex::new(PeerSelector::new(GeoPreferences::default()))),
412            scoring_config,
413            optimization_config,
414            dark_addressing_config,
415            performance_metrics: Arc::new(Mutex::new(RouterPerformanceMetrics::default())),
416            route_stats: Arc::new(Mutex::new(HashMap::new())),
417            max_message_size,
418            circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
419        }
420    }
421
422    /// Set the shadow address resolver
423    pub fn set_shadow_resolver(&mut self, resolver: Box<dyn ShadowAddressResolver + Send + Sync>) {
424        self.shadow_resolver = Some(resolver);
425    }
426
427    /// Find paths for a shadow address
428    async fn find_shadow_paths(
429        &self,
430        addr: &ShadowAddress,
431    ) -> Result<Vec<RoutePath>, RoutingError> {
432        // Resolve shadow address to onetime address
433        let _resolved = if let Some(resolver) = &self.shadow_resolver {
434            resolver.resolve_address(addr)?
435        } else {
436            return Err(RoutingError::DarkAddressingUnavailable);
437        };
438
439        // Get available peers
440        let peers = self.peers.read().await;
441        let mut available_peers: Vec<_> = peers.keys().cloned().collect();
442
443        if available_peers.is_empty() {
444            return Err(RoutingError::NoRoute);
445        }
446
447        // Find random set of peers to use as intermediaries
448        let mut rng = thread_rng();
449        available_peers.shuffle(&mut rng);
450
451        let peer_count = 3; // Use 3 intermediate hops
452        let selected_peers: Vec<_> = available_peers.into_iter().take(peer_count).collect();
453
454        if selected_peers.len() < peer_count {
455            return Err(RoutingError::NoRoute);
456        }
457
458        // Create path through selected peers with dark addressing support
459        Ok(vec![RoutePath {
460            hops: selected_peers,
461            latency: Duration::from_millis(50),
462            reliability: 0.95,
463            bandwidth_capacity: None,
464            load_factor: 0.5,
465            geographic_diversity: 1.0,
466            security_level: SecurityLevel::Maximum,
467            cost: 1.0,
468            created_at: Instant::now(),
469            last_used: None,
470            usage_count: 0,
471            success_rate: 0.0,
472            supports_dark_addressing: true,
473            supports_onion_routing: true,
474        }])
475    }
476
477    /// Adds a peer connection to the routing table
478    pub fn add_peer_connection(&mut self, from: LibP2PPeerId, to: LibP2PPeerId) {
479        let mut connections = self.peer_connections.blocking_write();
480        connections
481            .entry(from)
482            .or_insert_with(HashSet::new)
483            .insert(to);
484    }
485
486    /// Removes a peer connection from the routing table
487    pub fn remove_peer_connection(&mut self, from: LibP2PPeerId, to: LibP2PPeerId) {
488        let mut connections = self.peer_connections.blocking_write();
489        if let Some(peer_connections) = connections.get_mut(&from) {
490            peer_connections.remove(&to);
491            if peer_connections.is_empty() {
492                connections.remove(&from);
493            }
494        }
495    }
496
497    /// Adds a discovered peer with full metrics
498    pub async fn add_discovered_peer(&self, peer_id: LibP2PPeerId, peer: DiscoveredPeer) {
499        self.peers.write().await.insert(peer_id, peer);
500    }
501
502    /// Removes a discovered peer
503    pub async fn remove_discovered_peer(&self, peer_id: LibP2PPeerId) {
504        self.peers.write().await.remove(&peer_id);
505    }
506
507    /// Updates path metrics for a peer
508    pub async fn update_path_metrics(&self, peer_id: LibP2PPeerId, path: RoutePath) {
509        let mut cache = self.route_cache.write().await;
510        cache.entry(peer_id).or_insert_with(Vec::new).push(path);
511
512        // Keep only the most recent paths
513        if let Some(paths) = cache.get_mut(&peer_id) {
514            if paths.len() > 10 {
515                paths.remove(0);
516            }
517        }
518    }
519
520    /// Finds multiple disjoint paths to a destination
521    pub async fn find_paths(
522        &self,
523        destination: LibP2PPeerId,
524        criteria: &RouteSelectionCriteria,
525    ) -> Result<Vec<RoutePath>, RoutingError> {
526        // Check cache first
527        let cache = self.route_cache.read().await;
528        if let Some(cached_paths) = cache.get(&destination) {
529            let valid_paths: Vec<_> = cached_paths
530                .iter()
531                .filter(|p| p.created_at.elapsed() < self.optimization_config.cache_ttl)
532                .filter(|p| self.meets_criteria(p, criteria))
533                .cloned()
534                .collect();
535
536            if !valid_paths.is_empty() {
537                let mut metrics = self.performance_metrics.lock().unwrap();
538                metrics.cache_hit_rate = (metrics.cache_hit_rate + 1.0) / 2.0;
539                return Ok(valid_paths);
540            }
541        }
542        drop(cache);
543
544        // Find new paths
545        let peers = self.peers.read().await;
546        let available_peers: Vec<_> = peers.values().filter(|p| p.is_healthy()).collect();
547
548        if available_peers.is_empty() {
549            return Err(RoutingError::NoRoute);
550        }
551
552        // Use peer selector to find suitable peers
553        let mut peer_selector = self.peer_selector.lock().unwrap();
554        let candidates: Vec<DiscoveredPeer> = available_peers.into_iter().cloned().collect();
555        let selected_peer_ids = peer_selector.select_peers(
556            &candidates,
557            criteria.redundancy_level.path_count(),
558            &self.scoring_config,
559        );
560
561        // Build paths based on redundancy level
562        let mut paths = Vec::new();
563        for peer_id in selected_peer_ids {
564            let path = self.build_path_to_peer(peer_id, &peers, criteria).await?;
565            paths.push(path);
566        }
567
568        // Cache the paths
569        let mut cache = self.route_cache.write().await;
570        cache.insert(destination, paths.clone());
571
572        // Update metrics
573        let mut metrics = self.performance_metrics.lock().unwrap();
574        metrics.total_messages += 1;
575
576        Ok(paths)
577    }
578
579    /// Build a path to a specific peer
580    async fn build_path_to_peer(
581        &self,
582        destination: LibP2PPeerId,
583        peers: &HashMap<LibP2PPeerId, DiscoveredPeer>,
584        criteria: &RouteSelectionCriteria,
585    ) -> Result<RoutePath, RoutingError> {
586        let hops = if criteria.require_onion_routing {
587            // Build multi-hop path for onion routing
588            self.select_onion_hops(destination, peers, 3)?
589        } else {
590            // Direct path
591            vec![destination]
592        };
593
594        let latency = self.calculate_path_latency(&hops, peers);
595        let reliability = self.calculate_path_reliability(&hops, peers);
596        let bandwidth = self.calculate_path_bandwidth(&hops, peers);
597        let load_factor = self.calculate_path_load(&hops, peers);
598        let security_level = if criteria.require_onion_routing {
599            SecurityLevel::Maximum
600        } else if criteria.require_dark_addressing {
601            SecurityLevel::Enhanced
602        } else {
603            SecurityLevel::Basic
604        };
605
606        Ok(RoutePath {
607            hops,
608            latency,
609            reliability,
610            bandwidth_capacity: bandwidth,
611            load_factor,
612            geographic_diversity: 1.0, // TODO: Calculate actual diversity
613            security_level,
614            cost: self.calculate_path_cost(latency, reliability, load_factor),
615            created_at: Instant::now(),
616            last_used: None,
617            usage_count: 0,
618            success_rate: 0.0,
619            supports_dark_addressing: criteria.require_dark_addressing,
620            supports_onion_routing: criteria.require_onion_routing,
621        })
622    }
623
624    /// Select hops for onion routing
625    fn select_onion_hops(
626        &self,
627        destination: LibP2PPeerId,
628        peers: &HashMap<LibP2PPeerId, DiscoveredPeer>,
629        hop_count: usize,
630    ) -> Result<Vec<LibP2PPeerId>, RoutingError> {
631        let mut available: Vec<_> = peers
632            .iter()
633            .filter(|(id, p)| **id != destination && p.capabilities.can_relay)
634            .map(|(id, _)| *id)
635            .collect();
636
637        if available.len() < hop_count {
638            return Err(RoutingError::TopologyInsufficient);
639        }
640
641        let mut rng = thread_rng();
642        available.shuffle(&mut rng);
643
644        let mut hops = available.into_iter().take(hop_count).collect::<Vec<_>>();
645        hops.push(destination);
646
647        Ok(hops)
648    }
649
650    /// Check if a path meets the selection criteria
651    fn meets_criteria(&self, path: &RoutePath, criteria: &RouteSelectionCriteria) -> bool {
652        if let Some(max_latency) = criteria.max_latency {
653            if path.latency > max_latency {
654                return false;
655            }
656        }
657
658        if path.reliability < criteria.min_reliability {
659            return false;
660        }
661
662        if let Some(min_bandwidth) = criteria.min_bandwidth {
663            if let Some(bandwidth) = path.bandwidth_capacity {
664                if bandwidth < min_bandwidth {
665                    return false;
666                }
667            } else {
668                return false;
669            }
670        }
671
672        if criteria.require_dark_addressing && !path.supports_dark_addressing {
673            return false;
674        }
675
676        if criteria.require_onion_routing && !path.supports_onion_routing {
677            return false;
678        }
679
680        true
681    }
682
683    /// Calculate latency for a path
684    fn calculate_path_latency(
685        &self,
686        hops: &[LibP2PPeerId],
687        peers: &HashMap<LibP2PPeerId, DiscoveredPeer>,
688    ) -> Duration {
689        let mut total_latency = Duration::ZERO;
690
691        for hop in hops {
692            if let Some(peer) = peers.get(hop) {
693                total_latency += peer.performance_metrics.avg_response_time;
694            } else {
695                total_latency += Duration::from_millis(50); // Default estimate
696            }
697        }
698
699        total_latency
700    }
701
702    /// Calculate reliability for a path
703    fn calculate_path_reliability(
704        &self,
705        hops: &[LibP2PPeerId],
706        peers: &HashMap<LibP2PPeerId, DiscoveredPeer>,
707    ) -> f64 {
708        let mut reliability = 1.0;
709
710        for hop in hops {
711            if let Some(peer) = peers.get(hop) {
712                reliability *= peer.connection_quality.reliability_score;
713            } else {
714                reliability *= 0.9; // Default estimate
715            }
716        }
717
718        reliability
719    }
720
721    /// Calculate bandwidth for a path
722    fn calculate_path_bandwidth(
723        &self,
724        hops: &[LibP2PPeerId],
725        peers: &HashMap<LibP2PPeerId, DiscoveredPeer>,
726    ) -> Option<u64> {
727        let mut min_bandwidth = u64::MAX;
728
729        for hop in hops {
730            if let Some(peer) = peers.get(hop) {
731                if let Some(bw) = peer.capabilities.bandwidth_capacity {
732                    min_bandwidth = min_bandwidth.min(bw);
733                } else {
734                    return None;
735                }
736            } else {
737                return None;
738            }
739        }
740
741        if min_bandwidth == u64::MAX {
742            None
743        } else {
744            Some(min_bandwidth)
745        }
746    }
747
748    /// Calculate load factor for a path
749    fn calculate_path_load(
750        &self,
751        hops: &[LibP2PPeerId],
752        peers: &HashMap<LibP2PPeerId, DiscoveredPeer>,
753    ) -> f64 {
754        let mut total_load = 0.0;
755
756        for hop in hops {
757            if let Some(peer) = peers.get(hop) {
758                total_load += peer.load_metrics.load_score / 100.0;
759            } else {
760                total_load += 0.5; // Default estimate
761            }
762        }
763
764        total_load / hops.len() as f64
765    }
766
767    /// Calculate path cost
768    fn calculate_path_cost(&self, latency: Duration, reliability: f64, load_factor: f64) -> f64 {
769        let weights = &self.optimization_config.weight_factors;
770
771        let latency_cost = latency.as_millis() as f64 / 1000.0; // Convert to seconds
772        let reliability_cost = 1.0 - reliability;
773        let load_cost = load_factor;
774
775        latency_cost * weights.latency_weight
776            + reliability_cost * weights.reliability_weight
777            + load_cost * weights.load_weight
778    }
779
780    /// Routes a message through multiple paths using either PeerId or ShadowAddress
781    pub async fn route_message(
782        &self,
783        destination: impl Into<Destination>,
784        message: Vec<u8>,
785    ) -> Result<(), RoutingError> {
786        // Check message size
787        if message.len() > self.max_message_size {
788            return Err(RoutingError::MessageTooLarge {
789                size: message.len(),
790                limit: self.max_message_size,
791            });
792        }
793
794        let dest = destination.into();
795        let criteria = RouteSelectionCriteria {
796            max_latency: None,
797            min_reliability: 0.5,
798            required_security: SecurityLevel::Basic,
799            min_bandwidth: None,
800            geographic_constraints: GeographicConstraints {
801                preferred_regions: vec![],
802                excluded_regions: vec![],
803                max_distance_km: None,
804                require_diversity: false,
805            },
806            load_balancing_preference: LoadBalancingPreference::Adaptive,
807            redundancy_level: RedundancyLevel::Basic,
808            require_dark_addressing: false,
809            require_onion_routing: false,
810        };
811
812        // Get routing paths based on destination type
813        let paths = match dest {
814            Destination::Peer(peer_id) => {
815                // Check circuit breaker
816                let breakers = self.circuit_breakers.read().await;
817                if let Some(state) = breakers.get(&peer_id) {
818                    if matches!(state, CircuitBreakerState::Open { .. }) {
819                        return Err(RoutingError::CircuitBreakerOpen {
820                            peer_id: peer_id.to_string(),
821                        });
822                    }
823                }
824                drop(breakers);
825
826                self.find_paths(peer_id, &criteria).await?
827            }
828            Destination::Shadow(shadow_addr) => {
829                if !self.dark_addressing_config.enabled {
830                    return Err(RoutingError::DarkAddressingUnavailable);
831                }
832                self.find_shadow_paths(&shadow_addr).await?
833            }
834        };
835
836        if paths.is_empty() {
837            return Err(RoutingError::NoRoute);
838        }
839
840        // Update performance metrics
841        let mut metrics = self.performance_metrics.lock().unwrap();
842        metrics.total_messages += 1;
843
844        // Use load balancer to select best path
845        let mut load_balancer = self.load_balancer.lock().unwrap();
846        let selected_path = if paths.len() == 1 {
847            &paths[0]
848        } else {
849            // Get peer IDs from paths
850            let peer_ids: Vec<_> = paths
851                .iter()
852                .filter_map(|p| p.hops.first())
853                .copied()
854                .collect();
855
856            if let Some(selected_peer) = load_balancer.select_peer(&peer_ids) {
857                paths
858                    .iter()
859                    .find(|p| p.hops.first() == Some(&selected_peer))
860                    .unwrap_or(&paths[0])
861            } else {
862                &paths[0]
863            }
864        };
865
866        // Update route statistics
867        if let Some(first_hop) = selected_path.hops.first() {
868            let mut route_stats = self.route_stats.lock().unwrap();
869            let stats = route_stats.entry(*first_hop).or_default();
870            stats.usage_count += 1;
871            stats.last_used = Some(Instant::now());
872        }
873
874        // Build routing header
875        let mut routed_message = Vec::new();
876
877        // Header format:
878        // - Path length (4 bytes)
879        // - Path hops (variable)
880        // - Message data
881
882        routed_message.extend_from_slice(&(selected_path.hops.len() as u32).to_le_bytes());
883
884        for hop in &selected_path.hops {
885            routed_message.extend_from_slice(hop.to_bytes().as_slice());
886        }
887
888        routed_message.extend_from_slice(&message);
889
890        // Send through channel
891        self.message_tx
892            .send(routed_message)
893            .await
894            .map_err(|_| RoutingError::ChannelError)?;
895
896        metrics.successful_routings += 1;
897        Ok(())
898    }
899}
900
901#[cfg(test)]
902mod tests {
903    use super::*;
904    use crate::shadow_address::{NetworkType, ShadowMetadata};
905    use tokio::sync::mpsc;
906
907    // Mock shadow address resolver for testing
908    struct MockResolver;
909
910    impl ShadowAddressResolver for MockResolver {
911        fn resolve_address(&self, _: &ShadowAddress) -> Result<Vec<u8>, ShadowAddressError> {
912            Ok(vec![1, 2, 3, 4])
913        }
914
915        fn check_address(
916            &self,
917            _: &ShadowAddress,
918            onetime: &[u8],
919        ) -> Result<bool, ShadowAddressError> {
920            Ok(onetime == &[1, 2, 3, 4])
921        }
922    }
923
924    fn setup_test_router() -> (Router, mpsc::Receiver<Vec<u8>>) {
925        let (tx, rx) = mpsc::channel(128);
926        let mut router = Router::new_with_config(
927            tx,
928            PeerScoringConfig::default(),
929            RouteOptimizationConfig::default(),
930            DarkAddressingConfig::default(),
931            1024 * 1024, // 1MB max message size
932        );
933        router.set_shadow_resolver(Box::new(MockResolver));
934        (router, rx)
935    }
936
937    fn create_test_shadow_address() -> ShadowAddress {
938        ShadowAddress {
939            view_key: vec![1, 2, 3, 4],
940            spend_key: vec![5, 6, 7, 8],
941            payment_id: None,
942            metadata: ShadowMetadata {
943                version: 1,
944                network: NetworkType::Testnet,
945                expires_at: None,
946                flags: 0,
947            },
948        }
949    }
950
951    #[tokio::test]
952    async fn test_add_remove_peer() {
953        let (router, _) = setup_test_router();
954        let peer1 = LibP2PPeerId::random();
955        let peer2 = DiscoveredPeer::new(
956            LibP2PPeerId::random(),
957            "127.0.0.1:8000".parse().unwrap(),
958            DiscoveryMethod::Static,
959        );
960
961        router.add_peer_connection(peer1, peer2.clone()).await;
962        assert!(router.peers.read().await.contains_key(&peer1));
963
964        router.remove_peer_connection(peer1).await;
965        assert!(!router.peers.read().await.contains_key(&peer1));
966    }
967
968    #[tokio::test]
969    async fn test_route_message() {
970        let (router, mut rx) = setup_test_router();
971        let peer1 = LibP2PPeerId::random();
972        let peer2 = LibP2PPeerId::random();
973        let peer3 = LibP2PPeerId::random();
974
975        // Set up peers
976        let discovered_peer1 = DiscoveredPeer::new(
977            peer1,
978            "127.0.0.1:8001".parse().unwrap(),
979            DiscoveryMethod::Static,
980        );
981        let discovered_peer2 = DiscoveredPeer::new(
982            peer2,
983            "127.0.0.1:8002".parse().unwrap(),
984            DiscoveryMethod::Static,
985        );
986        let discovered_peer3 = DiscoveredPeer::new(
987            peer3,
988            "127.0.0.1:8003".parse().unwrap(),
989            DiscoveryMethod::Static,
990        );
991
992        router.add_peer_connection(peer1, discovered_peer1).await;
993        router.add_peer_connection(peer2, discovered_peer2).await;
994        router.add_peer_connection(peer3, discovered_peer3).await;
995
996        let test_msg = vec![1, 2, 3, 4];
997        router.route_message(peer3, test_msg.clone()).await.unwrap();
998
999        // Verify message was sent
1000        let received = rx.recv().await.unwrap();
1001        assert!(!received.is_empty());
1002    }
1003
1004    #[tokio::test]
1005    async fn test_find_paths() {
1006        let (router, _) = setup_test_router();
1007        let peer1 = LibP2PPeerId::random();
1008        let peer2 = LibP2PPeerId::random();
1009        let peer3 = LibP2PPeerId::random();
1010
1011        // Set up peers
1012        let discovered_peer1 = DiscoveredPeer::new(
1013            peer1,
1014            "127.0.0.1:8001".parse().unwrap(),
1015            DiscoveryMethod::Static,
1016        );
1017        let discovered_peer2 = DiscoveredPeer::new(
1018            peer2,
1019            "127.0.0.1:8002".parse().unwrap(),
1020            DiscoveryMethod::Static,
1021        );
1022        let discovered_peer3 = DiscoveredPeer::new(
1023            peer3,
1024            "127.0.0.1:8003".parse().unwrap(),
1025            DiscoveryMethod::Static,
1026        );
1027
1028        router.add_peer_connection(peer1, discovered_peer1).await;
1029        router.add_peer_connection(peer2, discovered_peer2).await;
1030        router.add_peer_connection(peer3, discovered_peer3).await;
1031
1032        let criteria = RouteSelectionCriteria {
1033            max_latency: None,
1034            min_reliability: 0.5,
1035            required_security: SecurityLevel::Basic,
1036            min_bandwidth: None,
1037            geographic_constraints: GeographicConstraints {
1038                preferred_regions: vec![],
1039                excluded_regions: vec![],
1040                max_distance_km: None,
1041                require_diversity: false,
1042            },
1043            load_balancing_preference: LoadBalancingPreference::Adaptive,
1044            redundancy_level: RedundancyLevel::Basic,
1045            require_dark_addressing: false,
1046            require_onion_routing: false,
1047        };
1048        let paths = router.find_paths(peer3, &criteria).await.unwrap();
1049        assert!(!paths.is_empty());
1050    }
1051
1052    #[tokio::test]
1053    async fn test_route_shadow_message() {
1054        let (router, mut rx) = setup_test_router();
1055        let peer1 = LibP2PPeerId::random();
1056        let peer2 = LibP2PPeerId::random();
1057        let peer3 = LibP2PPeerId::random();
1058
1059        // Set up some peers
1060        let discovered_peer1 = DiscoveredPeer::new(
1061            peer1,
1062            "127.0.0.1:8001".parse().unwrap(),
1063            DiscoveryMethod::Static,
1064        );
1065        let discovered_peer2 = DiscoveredPeer::new(
1066            peer2,
1067            "127.0.0.1:8002".parse().unwrap(),
1068            DiscoveryMethod::Static,
1069        );
1070        let discovered_peer3 = DiscoveredPeer::new(
1071            peer3,
1072            "127.0.0.1:8003".parse().unwrap(),
1073            DiscoveryMethod::Static,
1074        );
1075
1076        router.add_peer_connection(peer1, discovered_peer1).await;
1077        router.add_peer_connection(peer2, discovered_peer2).await;
1078        router.add_peer_connection(peer3, discovered_peer3).await;
1079
1080        // Try routing to a shadow address
1081        let shadow_addr = create_test_shadow_address();
1082        let test_msg = vec![1, 2, 3, 4];
1083        router
1084            .route_message(shadow_addr, test_msg.clone())
1085            .await
1086            .unwrap();
1087
1088        // Verify message was sent
1089        let received = rx.recv().await.unwrap();
1090        assert!(!received.is_empty());
1091    }
1092}