Skip to main content

peat_mesh/transport/
manager.rs

1//! Transport Manager for multi-transport coordination
2//!
3//! This module provides the `TransportManager` which coordinates multiple
4//! transport implementations, selecting the best one for each message
5//! based on requirements and current conditions.
6//!
7//! ## Architecture (ADR-032 + ADR-042)
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────────────┐
11//! │                           Application Layer                              │
12//! │         ┌────────────────────────────────────┐                           │
13//! │         │        Transport Manager           │ ◄── Transport Selection   │
14//! │         │   (Multi-Transport Coordinator)    │     Message Requirements  │
15//! │         └──────────────┬─────────────────────┘                           │
16//! │                        │                                                 │
17//! │    ┌───────────────────┼───────────────────────┐                         │
18//! │    ▼                   ▼              ▼        ▼                          │
19//! │ ┌──────────┐    ┌────────────┐ ┌──────────┐ ┌────────────┐               │
20//! │ │  UDP     │    │   QUIC     │ │ Bluetooth│ │   LoRa     │               │
21//! │ │ Bypass   │    │  (Iroh)    │ │    LE    │ │            │               │
22//! │ │(ADR-042) │    └────────────┘ └──────────┘ └────────────┘               │
23//! │ └──────────┘                                                              │
24//! └─────────────────────────────────────────────────────────────────────────┘
25//! ```
26//!
27//! ## Example
28//!
29//! ```ignore
30//! use peat_mesh::transport::{
31//!     TransportManager, TransportManagerConfig,
32//!     MessageRequirements, MessagePriority, TransportType,
33//! };
34//!
35//! // Create manager with configuration
36//! let config = TransportManagerConfig::default();
37//! let mut manager = TransportManager::new(config);
38//!
39//! // Register transports
40//! manager.register(Arc::new(quic_transport));
41//! manager.register(Arc::new(ble_transport));
42//!
43//! // Select best transport for message
44//! let requirements = MessageRequirements {
45//!     reliable: true,
46//!     priority: MessagePriority::High,
47//!     ..Default::default()
48//! };
49//!
50//! if let Some(transport_type) = manager.select_transport(&peer_id, &requirements) {
51//!     println!("Selected transport: {}", transport_type);
52//! }
53//!
54//! // Send via bypass for low-latency delivery (ADR-042)
55//! let bypass_req = MessageRequirements::bypass(5);
56//! manager.send_bypass("position_updates", &position_bytes, None).await?;
57//! ```
58
59use std::collections::HashMap;
60use std::net::SocketAddr;
61use std::sync::{Arc, RwLock};
62
63use super::bypass::{
64    BypassMessage, BypassTarget, BypassTransport, MessageEncoding, UdpBypassChannel,
65};
66use super::capabilities::{
67    MessagePriority, MessageRequirements, PaceLevel, PeerDistance, RangeMode, Transport,
68    TransportId, TransportInstance, TransportMode, TransportPolicy, TransportType,
69};
70use super::{NodeId, Result, TransportError};
71use serde::{Deserialize, Serialize};
72use std::collections::HashSet;
73use tokio::sync::broadcast;
74use tokio::sync::RwLock as TokioRwLock;
75
76/// Storage type for registered transport instances
77type TransportInstanceMap = HashMap<TransportId, (TransportInstance, Arc<dyn Transport>)>;
78
79// =============================================================================
80// Transport Manager Configuration
81// =============================================================================
82
83/// Configuration for TransportManager
84#[derive(Debug, Clone)]
85pub struct TransportManagerConfig {
86    /// Transport preference order (first = highest preference)
87    /// Used for legacy TransportType-based selection
88    pub preference_order: Vec<TransportType>,
89
90    /// Enable automatic transport fallback on failure
91    pub enable_fallback: bool,
92
93    /// Cache transport selection per peer
94    pub cache_peer_transport: bool,
95
96    /// Minimum score difference to switch transports
97    pub switch_threshold: i32,
98
99    /// Default PACE policy for transport selection (ADR-032)
100    /// If set, takes precedence over preference_order
101    pub default_policy: Option<TransportPolicy>,
102
103    /// Transport mode (Single, Redundant, Bonded, LoadBalanced)
104    pub transport_mode: TransportMode,
105
106    /// Per-collection transport routing table
107    ///
108    /// Collections listed here are routed to their configured transport
109    /// instead of going through legacy scoring. Collections not in this
110    /// table fall through to `route_message()`.
111    pub collection_routes: CollectionRouteTable,
112}
113
114impl Default for TransportManagerConfig {
115    fn default() -> Self {
116        Self {
117            preference_order: vec![
118                TransportType::Quic,
119                TransportType::WifiDirect,
120                TransportType::BluetoothLE,
121                TransportType::LoRa,
122            ],
123            enable_fallback: true,
124            cache_peer_transport: true,
125            switch_threshold: 10,
126            default_policy: None,
127            transport_mode: TransportMode::Single,
128            collection_routes: CollectionRouteTable::default(),
129        }
130    }
131}
132
133impl TransportManagerConfig {
134    /// Create config with a PACE policy
135    pub fn with_policy(policy: TransportPolicy) -> Self {
136        Self {
137            default_policy: Some(policy),
138            ..Default::default()
139        }
140    }
141
142    /// Set the transport mode
143    pub fn with_mode(mut self, mode: TransportMode) -> Self {
144        self.transport_mode = mode;
145        self
146    }
147}
148
149// =============================================================================
150// Transport Manager
151// =============================================================================
152
153/// Manages multiple transports and handles transport selection
154///
155/// TransportManager coordinates multiple transport implementations,
156/// selecting the best one for each message based on:
157/// - Message requirements (reliability, latency, size)
158/// - Transport capabilities
159/// - Current availability and signal quality
160/// - PACE policy (Primary, Alternate, Contingency, Emergency)
161/// - Historical success with peer
162///
163/// Also manages the UDP bypass channel (ADR-042) for low-latency,
164/// high-frequency data that bypasses CRDT synchronization.
165///
166/// ## PACE Policy (ADR-032)
167///
168/// When a PACE policy is configured, transport selection follows:
169/// 1. Try primary transports first
170/// 2. Fall back to alternate if no primary available
171/// 3. Use contingency for degraded operation
172/// 4. Emergency as last resort
173///
174/// ```ignore
175/// let policy = TransportPolicy::new("tactical")
176///     .primary(vec!["iroh-eth0", "iroh-wlan0"])
177///     .alternate(vec!["iroh-starlink"])
178///     .contingency(vec!["lora-primary"])
179///     .emergency(vec!["ble-mesh"]);
180///
181/// let config = TransportManagerConfig::with_policy(policy);
182/// let manager = TransportManager::new(config);
183/// ```
184pub struct TransportManager {
185    /// Registered transports by type (legacy API)
186    transports: HashMap<TransportType, Arc<dyn Transport>>,
187
188    /// Registered transports by ID (ADR-032 PACE API)
189    transport_instances: RwLock<TransportInstanceMap>,
190
191    /// Active transport per peer (learned from successful deliveries)
192    peer_transports: RwLock<HashMap<NodeId, TransportType>>,
193
194    /// Active transport ID per peer (PACE-based)
195    peer_transport_ids: RwLock<HashMap<NodeId, TransportId>>,
196
197    /// Peer distance estimates
198    peer_distances: RwLock<HashMap<NodeId, PeerDistance>>,
199
200    /// Configuration
201    config: TransportManagerConfig,
202
203    /// UDP bypass channel for low-latency ephemeral data (ADR-042)
204    ///
205    /// When set, the manager can route messages with `bypass_sync: true`
206    /// through this channel instead of CRDT transports.
207    bypass_channel: Option<Arc<TokioRwLock<UdpBypassChannel>>>,
208}
209
210impl TransportManager {
211    /// Create a new TransportManager with the given configuration
212    pub fn new(config: TransportManagerConfig) -> Self {
213        Self {
214            transports: HashMap::new(),
215            transport_instances: RwLock::new(HashMap::new()),
216            peer_transports: RwLock::new(HashMap::new()),
217            peer_transport_ids: RwLock::new(HashMap::new()),
218            peer_distances: RwLock::new(HashMap::new()),
219            config,
220            bypass_channel: None,
221        }
222    }
223
224    /// Create a new TransportManager with bypass channel support (ADR-042)
225    pub fn with_bypass(config: TransportManagerConfig, bypass: UdpBypassChannel) -> Self {
226        Self {
227            transports: HashMap::new(),
228            transport_instances: RwLock::new(HashMap::new()),
229            peer_transports: RwLock::new(HashMap::new()),
230            peer_transport_ids: RwLock::new(HashMap::new()),
231            peer_distances: RwLock::new(HashMap::new()),
232            config,
233            bypass_channel: Some(Arc::new(TokioRwLock::new(bypass))),
234        }
235    }
236
237    /// Set the bypass channel after construction
238    pub fn set_bypass_channel(&mut self, bypass: UdpBypassChannel) {
239        self.bypass_channel = Some(Arc::new(TokioRwLock::new(bypass)));
240    }
241
242    /// Check if bypass channel is available
243    pub fn has_bypass_channel(&self) -> bool {
244        self.bypass_channel.is_some()
245    }
246
247    /// Check if a collection is configured for bypass
248    pub async fn is_bypass_collection(&self, collection: &str) -> bool {
249        if let Some(ref bypass) = self.bypass_channel {
250            bypass.read().await.is_bypass_collection(collection)
251        } else {
252            false
253        }
254    }
255
256    /// Register a transport
257    ///
258    /// The transport will be available for selection based on its capabilities.
259    pub fn register(&mut self, transport: Arc<dyn Transport>) {
260        let transport_type = transport.capabilities().transport_type;
261        self.transports.insert(transport_type, transport);
262    }
263
264    /// Unregister a transport
265    ///
266    /// Returns the removed transport, if it was registered.
267    pub fn unregister(&mut self, transport_type: TransportType) -> Option<Arc<dyn Transport>> {
268        self.transports.remove(&transport_type)
269    }
270
271    /// Get a registered transport by type
272    pub fn get_transport(&self, transport_type: TransportType) -> Option<&Arc<dyn Transport>> {
273        self.transports.get(&transport_type)
274    }
275
276    /// Get all registered transport types
277    pub fn registered_transports(&self) -> Vec<TransportType> {
278        self.transports.keys().copied().collect()
279    }
280
281    /// Get transports that are currently available and can reach the peer
282    pub fn available_transports(&self, peer_id: &NodeId) -> Vec<TransportType> {
283        self.transports
284            .iter()
285            .filter(|(_, t)| t.is_available() && t.can_reach(peer_id))
286            .map(|(tt, _)| *tt)
287            .collect()
288    }
289
290    // =========================================================================
291    // PACE Transport Instance API (ADR-032)
292    // =========================================================================
293
294    /// Register a transport instance by ID
295    ///
296    /// This is the preferred API for multi-instance transports (e.g., multiple NICs).
297    ///
298    /// # Arguments
299    ///
300    /// * `instance` - Transport instance metadata
301    /// * `transport` - The transport implementation
302    ///
303    /// # Example
304    ///
305    /// ```ignore
306    /// let instance = TransportInstance::new("iroh-eth0", TransportType::Quic, caps)
307    ///     .with_interface("eth0");
308    /// manager.register_instance(instance, Arc::new(transport));
309    /// ```
310    pub fn register_instance(&self, instance: TransportInstance, transport: Arc<dyn Transport>) {
311        let id = instance.id.clone();
312        self.transport_instances
313            .write()
314            .unwrap()
315            .insert(id, (instance, transport));
316    }
317
318    /// Unregister a transport instance by ID
319    pub fn unregister_instance(
320        &self,
321        id: &TransportId,
322    ) -> Option<(TransportInstance, Arc<dyn Transport>)> {
323        self.transport_instances.write().unwrap().remove(id)
324    }
325
326    /// Get a transport instance by ID
327    pub fn get_instance(&self, id: &TransportId) -> Option<Arc<dyn Transport>> {
328        self.transport_instances
329            .read()
330            .unwrap()
331            .get(id)
332            .map(|(_, t)| Arc::clone(t))
333    }
334
335    /// Get all registered instance IDs
336    pub fn registered_instance_ids(&self) -> Vec<TransportId> {
337        self.transport_instances
338            .read()
339            .unwrap()
340            .keys()
341            .cloned()
342            .collect()
343    }
344
345    /// Get IDs of available transport instances
346    pub fn available_instance_ids(&self) -> HashSet<TransportId> {
347        self.transport_instances
348            .read()
349            .unwrap()
350            .iter()
351            .filter(|(_, (inst, transport))| inst.available && transport.is_available())
352            .map(|(id, _)| id.clone())
353            .collect()
354    }
355
356    /// Get IDs of available transports that can reach a peer
357    pub fn available_instances_for_peer(&self, peer_id: &NodeId) -> Vec<TransportId> {
358        self.transport_instances
359            .read()
360            .unwrap()
361            .iter()
362            .filter(|(_, (inst, transport))| {
363                inst.available && transport.is_available() && transport.can_reach(peer_id)
364            })
365            .map(|(id, _)| id.clone())
366            .collect()
367    }
368
369    /// Get the current PACE level based on available transports
370    ///
371    /// Returns the best PACE level for which at least one transport is available.
372    pub fn current_pace_level(&self) -> PaceLevel {
373        match &self.config.default_policy {
374            Some(policy) => policy.current_level(&self.available_instance_ids()),
375            None => {
376                // No policy - if any transport available, consider it "Primary"
377                if !self.available_instance_ids().is_empty() {
378                    PaceLevel::Primary
379                } else {
380                    PaceLevel::None
381                }
382            }
383        }
384    }
385
386    /// Select transport(s) using PACE policy
387    ///
388    /// Returns transport IDs in priority order based on PACE policy and availability.
389    /// The number of transports returned depends on the configured TransportMode:
390    /// - Single: Returns at most one transport
391    /// - Redundant: Returns multiple for simultaneous send
392    /// - LoadBalanced: Returns all available for distribution
393    ///
394    /// # Arguments
395    ///
396    /// * `peer_id` - Target peer
397    /// * `requirements` - Message requirements
398    ///
399    /// # Returns
400    ///
401    /// Vector of transport IDs in priority order
402    pub fn select_transports_pace(
403        &self,
404        peer_id: &NodeId,
405        requirements: &MessageRequirements,
406    ) -> Vec<TransportId> {
407        let policy = match &self.config.default_policy {
408            Some(p) => p,
409            None => return Vec::new(), // No PACE policy configured
410        };
411
412        let instances = self.transport_instances.read().unwrap();
413        let available_for_peer: HashSet<_> = instances
414            .iter()
415            .filter(|(_, (inst, transport))| {
416                inst.available
417                    && transport.is_available()
418                    && transport.can_reach(peer_id)
419                    && transport.capabilities().meets_requirements(requirements)
420            })
421            .map(|(id, _)| id.clone())
422            .collect();
423
424        // Get candidates in PACE order
425        let candidates: Vec<TransportId> = policy
426            .ordered()
427            .filter(|id| available_for_peer.contains(*id))
428            .cloned()
429            .collect();
430
431        // Apply transport mode
432        match &self.config.transport_mode {
433            TransportMode::Single => candidates.into_iter().take(1).collect(),
434            TransportMode::Redundant {
435                min_paths,
436                max_paths,
437            } => {
438                let min = *min_paths as usize;
439                let max = max_paths.map(|m| m as usize).unwrap_or(candidates.len());
440                candidates.into_iter().take(max.max(min)).collect()
441            }
442            TransportMode::Bonded => candidates, // All for bandwidth aggregation
443            TransportMode::LoadBalanced { .. } => candidates, // All for distribution
444        }
445    }
446
447    /// Select the best single transport using PACE policy
448    ///
449    /// Convenience wrapper that returns just the first (best) transport.
450    pub fn select_transport_pace(
451        &self,
452        peer_id: &NodeId,
453        requirements: &MessageRequirements,
454    ) -> Option<TransportId> {
455        self.select_transports_pace(peer_id, requirements)
456            .into_iter()
457            .next()
458    }
459
460    /// Record successful transport use for a peer (PACE version)
461    pub fn record_success_pace(&self, peer_id: &NodeId, transport_id: TransportId) {
462        if self.config.cache_peer_transport {
463            self.peer_transport_ids
464                .write()
465                .unwrap()
466                .insert(peer_id.clone(), transport_id);
467        }
468    }
469
470    /// Clear cached transport for a peer (PACE version)
471    pub fn clear_cache_pace(&self, peer_id: &NodeId) {
472        self.peer_transport_ids.write().unwrap().remove(peer_id);
473    }
474
475    /// Select the best transport for a peer and message requirements
476    ///
477    /// Returns the transport type that best matches the requirements,
478    /// or `None` if no suitable transport is available.
479    ///
480    /// # Selection Algorithm
481    ///
482    /// 1. Filter transports by availability and reachability
483    /// 2. Filter by hard requirements (reliability, bandwidth, message size)
484    /// 3. Score remaining transports based on:
485    ///    - Latency (for high-priority messages)
486    ///    - Power consumption (if power-sensitive)
487    ///    - User preference order
488    ///    - Signal quality (for wireless)
489    /// 4. Return highest-scoring transport
490    pub fn select_transport(
491        &self,
492        peer_id: &NodeId,
493        requirements: &MessageRequirements,
494    ) -> Option<TransportType> {
495        // Check cache first if enabled
496        if self.config.cache_peer_transport {
497            if let Some(&cached) = self.peer_transports.read().unwrap().get(peer_id) {
498                // Verify cached transport still valid
499                if let Some(transport) = self.transports.get(&cached) {
500                    if transport.is_available()
501                        && transport.can_reach(peer_id)
502                        && transport.capabilities().meets_requirements(requirements)
503                    {
504                        return Some(cached);
505                    }
506                }
507            }
508        }
509
510        // Find available transports that meet requirements
511        let candidates: Vec<_> = self
512            .available_transports(peer_id)
513            .into_iter()
514            .filter_map(|tt| {
515                let transport = self.transports.get(&tt)?;
516                let caps = transport.capabilities();
517
518                // Check hard requirements
519                if !caps.meets_requirements(requirements) {
520                    return None;
521                }
522
523                // Check latency requirement
524                if let Some(max_latency) = requirements.max_latency_ms {
525                    let est_delivery = transport.estimate_delivery_ms(requirements.message_size);
526                    if est_delivery > max_latency {
527                        return None;
528                    }
529                }
530
531                // Calculate preference bonus
532                let preference_bonus = self
533                    .config
534                    .preference_order
535                    .iter()
536                    .position(|&t| t == tt)
537                    .map(|idx| 20 - (idx as i32 * 5))
538                    .unwrap_or(0);
539
540                let score = transport.calculate_score(requirements, preference_bonus);
541                Some((tt, score))
542            })
543            .collect();
544
545        // Return highest-scoring transport
546        candidates
547            .into_iter()
548            .max_by_key(|(_, score)| *score)
549            .map(|(tt, _)| tt)
550    }
551
552    /// Select transport with distance-based range mode adaptation
553    ///
554    /// Returns the best transport type and optionally a recommended range mode
555    /// if the transport supports dynamic range configuration.
556    pub fn select_transport_for_distance(
557        &self,
558        peer_id: &NodeId,
559        requirements: &MessageRequirements,
560    ) -> Option<(TransportType, Option<RangeMode>)> {
561        let transport_type = self.select_transport(peer_id, requirements)?;
562
563        // Get distance estimate if available
564        let distance = self
565            .peer_distances
566            .read()
567            .unwrap()
568            .get(peer_id)
569            .map(|d| d.distance_meters);
570
571        // If we have a configurable transport, get recommended mode
572        let range_mode = if let Some(_dist) = distance {
573            // This would need runtime trait casting - for now return None
574            // In a full implementation, we'd use trait objects with downcast
575            None // Placeholder - see implementation note below
576        } else {
577            None
578        };
579
580        Some((transport_type, range_mode))
581    }
582
583    /// Record successful transport use for a peer
584    ///
585    /// This updates the peer transport cache for future selections.
586    pub fn record_success(&self, peer_id: &NodeId, transport_type: TransportType) {
587        if self.config.cache_peer_transport {
588            self.peer_transports
589                .write()
590                .unwrap()
591                .insert(peer_id.clone(), transport_type);
592        }
593    }
594
595    /// Clear cached transport for a peer
596    ///
597    /// Call this when a transport fails for a peer.
598    pub fn clear_cache(&self, peer_id: &NodeId) {
599        self.peer_transports.write().unwrap().remove(peer_id);
600    }
601
602    /// Update distance estimate for a peer
603    pub fn update_peer_distance(&self, distance: PeerDistance) {
604        self.peer_distances
605            .write()
606            .unwrap()
607            .insert(distance.peer_id.clone(), distance);
608    }
609
610    /// Get current distance estimate for a peer
611    pub fn get_peer_distance(&self, peer_id: &NodeId) -> Option<PeerDistance> {
612        self.peer_distances.read().unwrap().get(peer_id).cloned()
613    }
614
615    /// Connect to a peer using the best available transport
616    ///
617    /// This is a convenience method that selects the transport and connects.
618    pub async fn connect(
619        &self,
620        peer_id: &NodeId,
621        requirements: &MessageRequirements,
622    ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
623        let transport_type = self
624            .select_transport(peer_id, requirements)
625            .ok_or_else(|| {
626                TransportError::PeerNotFound(format!("No suitable transport for {}", peer_id))
627            })?;
628
629        let transport = self
630            .transports
631            .get(&transport_type)
632            .ok_or(TransportError::NotStarted)?;
633
634        let connection = transport.connect(peer_id).await?;
635
636        // Record successful connection
637        self.record_success(peer_id, transport_type);
638
639        Ok((transport_type, connection))
640    }
641
642    /// Connect with fallback to alternative transports
643    ///
644    /// Tries the primary transport first, then falls back to others if enabled.
645    pub async fn connect_with_fallback(
646        &self,
647        peer_id: &NodeId,
648        requirements: &MessageRequirements,
649    ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
650        // Get all candidate transports sorted by score
651        let candidates: Vec<_> = self
652            .available_transports(peer_id)
653            .into_iter()
654            .filter_map(|tt| {
655                let transport = self.transports.get(&tt)?;
656                if !transport.capabilities().meets_requirements(requirements) {
657                    return None;
658                }
659                let preference_bonus = self
660                    .config
661                    .preference_order
662                    .iter()
663                    .position(|&t| t == tt)
664                    .map(|idx| 20 - (idx as i32 * 5))
665                    .unwrap_or(0);
666                let score = transport.calculate_score(requirements, preference_bonus);
667                Some((tt, score))
668            })
669            .collect();
670
671        let mut sorted: Vec<_> = candidates;
672        sorted.sort_by(|a, b| b.1.cmp(&a.1)); // Sort descending by score
673
674        if sorted.is_empty() {
675            return Err(TransportError::PeerNotFound(format!(
676                "No suitable transport for {}",
677                peer_id
678            )));
679        }
680
681        let mut last_error = None;
682
683        for (transport_type, _) in sorted {
684            let transport = match self.transports.get(&transport_type) {
685                Some(t) => t,
686                None => continue,
687            };
688
689            match transport.connect(peer_id).await {
690                Ok(conn) => {
691                    self.record_success(peer_id, transport_type);
692                    return Ok((transport_type, conn));
693                }
694                Err(e) => {
695                    if !self.config.enable_fallback {
696                        return Err(e);
697                    }
698                    last_error = Some(e);
699                    self.clear_cache(peer_id);
700                }
701            }
702        }
703
704        Err(last_error.unwrap_or_else(|| {
705            TransportError::PeerNotFound(format!("All transports failed for {}", peer_id))
706        }))
707    }
708
709    // =========================================================================
710    // Bypass Channel Methods (ADR-042)
711    // =========================================================================
712
713    /// Send data via the UDP bypass channel
714    ///
715    /// Sends data directly via UDP, bypassing CRDT synchronization.
716    /// Use for high-frequency, low-latency, or ephemeral data.
717    ///
718    /// # Arguments
719    ///
720    /// * `collection` - Collection name (must be configured for bypass)
721    /// * `data` - Raw data to send (already serialized)
722    /// * `target` - Optional target for unicast; uses collection config if None
723    ///
724    /// # Returns
725    ///
726    /// * `Ok(())` - Data sent successfully
727    /// * `Err(TransportError)` - Send failed or bypass not available
728    ///
729    /// # Example
730    ///
731    /// ```ignore
732    /// // Send position update via bypass
733    /// manager.send_bypass("position_updates", &position_bytes, None).await?;
734    ///
735    /// // Send to specific peer via unicast
736    /// let target = "192.168.1.100:5150".parse().unwrap();
737    /// manager.send_bypass("commands", &cmd_bytes, Some(target)).await?;
738    /// ```
739    pub async fn send_bypass(
740        &self,
741        collection: &str,
742        data: &[u8],
743        target: Option<SocketAddr>,
744    ) -> Result<()> {
745        let bypass = self
746            .bypass_channel
747            .as_ref()
748            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
749
750        bypass
751            .read()
752            .await
753            .send_to_collection(collection, target, data)
754            .await
755            .map_err(|e| TransportError::Other(e.to_string().into()))
756    }
757
758    /// Send data via bypass channel with explicit target
759    ///
760    /// Lower-level method for sending to a specific target.
761    ///
762    /// # Arguments
763    ///
764    /// * `target` - Target address (unicast, multicast, or broadcast)
765    /// * `collection` - Collection name for header
766    /// * `data` - Raw data to send
767    pub async fn send_bypass_to(
768        &self,
769        target: BypassTarget,
770        collection: &str,
771        data: &[u8],
772    ) -> Result<()> {
773        let bypass = self
774            .bypass_channel
775            .as_ref()
776            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
777
778        bypass
779            .read()
780            .await
781            .send(target, collection, data)
782            .await
783            .map_err(|e| TransportError::Other(e.to_string().into()))
784    }
785
786    /// Subscribe to incoming bypass messages
787    ///
788    /// Returns a broadcast receiver for all incoming bypass channel messages.
789    ///
790    /// # Returns
791    ///
792    /// * `Ok(Receiver)` - Subscription successful
793    /// * `Err(TransportError)` - Bypass not available
794    ///
795    /// # Example
796    ///
797    /// ```ignore
798    /// let mut rx = manager.subscribe_bypass().await?;
799    /// while let Ok(msg) = rx.recv().await {
800    ///     println!("Bypass message from {}: {} bytes",
801    ///         msg.source, msg.data.len());
802    /// }
803    /// ```
804    pub async fn subscribe_bypass(&self) -> Result<broadcast::Receiver<BypassMessage>> {
805        let bypass = self
806            .bypass_channel
807            .as_ref()
808            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
809
810        Ok(bypass.read().await.subscribe())
811    }
812
813    /// Subscribe to bypass messages for a specific collection
814    ///
815    /// Returns the collection hash and a receiver. Filter received messages
816    /// by comparing `msg.collection_hash == hash`.
817    ///
818    /// # Arguments
819    ///
820    /// * `collection` - Collection name to subscribe to
821    ///
822    /// # Returns
823    ///
824    /// * `Ok((hash, Receiver))` - Subscription successful with collection hash
825    /// * `Err(TransportError)` - Bypass not available
826    pub async fn subscribe_bypass_collection(
827        &self,
828        collection: &str,
829    ) -> Result<(u32, broadcast::Receiver<BypassMessage>)> {
830        let bypass = self
831            .bypass_channel
832            .as_ref()
833            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
834
835        Ok(bypass.read().await.subscribe_collection(collection))
836    }
837
838    // =========================================================================
839    // Per-Collection Transport Routing (M4 / ADR-032)
840    // =========================================================================
841
842    /// Route a message for a specific collection
843    ///
844    /// Looks up the collection in the route table and returns the appropriate
845    /// routing decision. If the collection is not configured, falls through
846    /// to `route_message()` for legacy scoring.
847    ///
848    /// # Arguments
849    ///
850    /// * `collection` - Collection name
851    /// * `peer_id` - Target peer
852    /// * `requirements` - Message requirements
853    ///
854    /// # Returns
855    ///
856    /// Routing decision for this collection's messages
857    pub fn route_collection(
858        &self,
859        collection: &str,
860        peer_id: &NodeId,
861        requirements: &MessageRequirements,
862    ) -> RouteDecision {
863        let route_config = match self.config.collection_routes.get(collection) {
864            Some(config) => config,
865            None => return self.route_message(peer_id, requirements),
866        };
867
868        match &route_config.route {
869            CollectionTransportRoute::Bypass { .. } => {
870                if self.bypass_channel.is_some() {
871                    RouteDecision::Bypass
872                } else {
873                    RouteDecision::NoRoute
874                }
875            }
876            CollectionTransportRoute::Fixed { transport_type } => {
877                // Check if the fixed transport is registered and can reach the peer
878                if let Some(transport) = self.transports.get(transport_type) {
879                    if transport.is_available() && transport.can_reach(peer_id) {
880                        RouteDecision::Transport(*transport_type)
881                    } else {
882                        RouteDecision::NoRoute
883                    }
884                } else {
885                    RouteDecision::NoRoute
886                }
887            }
888            CollectionTransportRoute::Pace { policy_override } => {
889                match self.select_transport_pace_with_policy(
890                    peer_id,
891                    requirements,
892                    policy_override.as_ref(),
893                ) {
894                    Some(id) => RouteDecision::TransportInstance(id),
895                    None => RouteDecision::NoRoute,
896                }
897            }
898        }
899    }
900
901    /// Select a transport instance using a specific or default PACE policy
902    ///
903    /// If `policy_override` is `Some`, uses that policy. Otherwise falls back
904    /// to the default policy from config.
905    fn select_transport_pace_with_policy(
906        &self,
907        peer_id: &NodeId,
908        requirements: &MessageRequirements,
909        policy_override: Option<&TransportPolicy>,
910    ) -> Option<TransportId> {
911        let policy = policy_override.or(self.config.default_policy.as_ref())?;
912
913        let instances = self.transport_instances.read().unwrap();
914        let available_for_peer: HashSet<_> = instances
915            .iter()
916            .filter(|(_, (inst, transport))| {
917                inst.available
918                    && transport.is_available()
919                    && transport.can_reach(peer_id)
920                    && transport.capabilities().meets_requirements(requirements)
921            })
922            .map(|(id, _)| id.clone())
923            .collect();
924
925        policy
926            .ordered()
927            .find(|id| available_for_peer.contains(*id))
928            .cloned()
929    }
930
931    /// Get the route configuration for a collection
932    pub fn get_collection_route(&self, collection: &str) -> Option<&CollectionRouteConfig> {
933        self.config.collection_routes.get(collection)
934    }
935
936    /// Route a message based on requirements
937    ///
938    /// If `requirements.bypass_sync` is `true` and bypass channel is available,
939    /// returns `RouteDecision::Bypass`. Otherwise returns the selected transport.
940    ///
941    /// # Arguments
942    ///
943    /// * `peer_id` - Target peer (ignored for bypass)
944    /// * `requirements` - Message requirements
945    ///
946    /// # Returns
947    ///
948    /// Decision on how to route the message
949    pub fn route_message(
950        &self,
951        peer_id: &NodeId,
952        requirements: &MessageRequirements,
953    ) -> RouteDecision {
954        // Check if bypass is requested and available
955        if requirements.bypass_sync && self.bypass_channel.is_some() {
956            return RouteDecision::Bypass;
957        }
958        // Fall back to normal transport if bypass not available or not requested
959
960        // Select normal transport
961        match self.select_transport(peer_id, requirements) {
962            Some(transport_type) => RouteDecision::Transport(transport_type),
963            None => RouteDecision::NoRoute,
964        }
965    }
966}
967
968/// Routing decision for a message
969#[derive(Debug, Clone, PartialEq, Eq)]
970pub enum RouteDecision {
971    /// Use UDP bypass channel
972    Bypass,
973    /// Use specified transport type (legacy scoring)
974    Transport(TransportType),
975    /// Use a specific transport instance (PACE selection result)
976    TransportInstance(TransportId),
977    /// No suitable route available
978    NoRoute,
979}
980
981// =============================================================================
982// Per-Collection Transport Routing (M4 / ADR-032)
983// =============================================================================
984
985/// Routing strategy for a specific collection
986///
987/// Determines how messages for a collection are routed to a transport.
988/// This generalizes the `BypassCollectionConfig` pattern to all transports.
989///
990/// # Variants
991///
992/// - `Fixed` — Always use a specific transport type (e.g., Quic, BluetoothLE)
993/// - `Bypass` — Route via UDP bypass channel
994/// - `Pace` — Use PACE-based dynamic selection with optional policy override
995///
996/// # Example
997///
998/// ```
999/// use peat_mesh::transport::{CollectionTransportRoute, TransportType};
1000///
1001/// // Fixed route to BLE
1002/// let ble_route = CollectionTransportRoute::Fixed {
1003///     transport_type: TransportType::BluetoothLE,
1004/// };
1005///
1006/// // PACE route with default policy
1007/// let pace_route = CollectionTransportRoute::Pace {
1008///     policy_override: None,
1009/// };
1010/// ```
1011#[derive(Debug, Clone, Serialize, Deserialize)]
1012#[serde(tag = "transport", rename_all = "snake_case")]
1013pub enum CollectionTransportRoute {
1014    /// Always use a specific transport type (e.g., Quic, BluetoothLE)
1015    Fixed { transport_type: TransportType },
1016    /// Route via UDP bypass channel
1017    Bypass {
1018        encoding: MessageEncoding,
1019        ttl_ms: u64,
1020        bypass_transport: BypassTransport,
1021    },
1022    /// Use PACE-based dynamic selection
1023    Pace {
1024        policy_override: Option<TransportPolicy>,
1025    },
1026}
1027
1028/// Per-collection routing entry
1029///
1030/// Binds a collection name to a routing strategy and message priority.
1031#[derive(Debug, Clone, Serialize, Deserialize)]
1032pub struct CollectionRouteConfig {
1033    /// Collection name (e.g., "position_updates", "sensor_data")
1034    pub collection: String,
1035    /// Routing strategy for this collection
1036    pub route: CollectionTransportRoute,
1037    /// Default message priority for this collection
1038    pub priority: MessagePriority,
1039}
1040
1041/// Lookup table for per-collection transport routing
1042///
1043/// Maps collection names to their transport routing configuration.
1044/// Collections not in this table fall through to legacy scoring
1045/// via `route_message()`.
1046///
1047/// # Example
1048///
1049/// ```
1050/// use peat_mesh::transport::{
1051///     CollectionRouteTable, CollectionRouteConfig, CollectionTransportRoute,
1052///     TransportType, MessagePriority,
1053/// };
1054///
1055/// let table = CollectionRouteTable::new()
1056///     .with_collection(CollectionRouteConfig {
1057///         collection: "telemetry".to_string(),
1058///         route: CollectionTransportRoute::Fixed {
1059///             transport_type: TransportType::BluetoothLE,
1060///         },
1061///         priority: MessagePriority::Normal,
1062///     });
1063///
1064/// assert!(table.has_collection("telemetry"));
1065/// assert!(!table.has_collection("unknown"));
1066/// ```
1067#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1068pub struct CollectionRouteTable {
1069    collections: Vec<CollectionRouteConfig>,
1070}
1071
1072impl CollectionRouteTable {
1073    /// Create an empty route table
1074    pub fn new() -> Self {
1075        Self::default()
1076    }
1077
1078    /// Add a collection route configuration (builder pattern)
1079    pub fn with_collection(mut self, config: CollectionRouteConfig) -> Self {
1080        self.collections.push(config);
1081        self
1082    }
1083
1084    /// Get the route config for a collection
1085    pub fn get(&self, collection: &str) -> Option<&CollectionRouteConfig> {
1086        self.collections.iter().find(|c| c.collection == collection)
1087    }
1088
1089    /// Check if a collection has a route configured
1090    pub fn has_collection(&self, collection: &str) -> bool {
1091        self.collections.iter().any(|c| c.collection == collection)
1092    }
1093
1094    /// Check if a collection is configured for bypass routing
1095    pub fn is_bypass(&self, collection: &str) -> bool {
1096        self.get(collection)
1097            .map(|c| matches!(c.route, CollectionTransportRoute::Bypass { .. }))
1098            .unwrap_or(false)
1099    }
1100}
1101
1102impl std::fmt::Debug for TransportManager {
1103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104        f.debug_struct("TransportManager")
1105            .field("transports", &self.transports.keys().collect::<Vec<_>>())
1106            .field("config", &self.config)
1107            .finish()
1108    }
1109}
1110
1111// =============================================================================
1112// Tests
1113// =============================================================================
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::*;
1118    use crate::transport::bypass::{BypassChannelConfig, UdpBypassChannel};
1119    use crate::transport::capabilities::{MessagePriority, TransportCapabilities};
1120    use crate::transport::{MeshConnection, MeshTransport, PeerEventReceiver};
1121    use async_trait::async_trait;
1122    use std::time::Instant;
1123    use tokio::sync::mpsc;
1124
1125    // Mock transport for testing
1126    struct MockTransport {
1127        caps: TransportCapabilities,
1128        available: bool,
1129        reachable_peers: Vec<NodeId>,
1130        signal: Option<u8>,
1131    }
1132
1133    impl MockTransport {
1134        fn new(caps: TransportCapabilities) -> Self {
1135            Self {
1136                caps,
1137                available: true,
1138                reachable_peers: vec![],
1139                signal: None,
1140            }
1141        }
1142
1143        fn with_peer(mut self, peer: NodeId) -> Self {
1144            self.reachable_peers.push(peer);
1145            self
1146        }
1147
1148        #[allow(dead_code)]
1149        fn with_signal(mut self, signal: u8) -> Self {
1150            self.signal = Some(signal);
1151            self
1152        }
1153
1154        fn unavailable(mut self) -> Self {
1155            self.available = false;
1156            self
1157        }
1158    }
1159
1160    struct MockConnection {
1161        peer_id: NodeId,
1162        connected_at: Instant,
1163    }
1164
1165    impl MeshConnection for MockConnection {
1166        fn peer_id(&self) -> &NodeId {
1167            &self.peer_id
1168        }
1169
1170        fn is_alive(&self) -> bool {
1171            true
1172        }
1173
1174        fn connected_at(&self) -> Instant {
1175            self.connected_at
1176        }
1177    }
1178
1179    #[async_trait]
1180    impl MeshTransport for MockTransport {
1181        async fn start(&self) -> Result<()> {
1182            Ok(())
1183        }
1184
1185        async fn stop(&self) -> Result<()> {
1186            Ok(())
1187        }
1188
1189        async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>> {
1190            if self.reachable_peers.contains(peer_id) {
1191                Ok(Box::new(MockConnection {
1192                    peer_id: peer_id.clone(),
1193                    connected_at: Instant::now(),
1194                }))
1195            } else {
1196                Err(TransportError::PeerNotFound(peer_id.to_string()))
1197            }
1198        }
1199
1200        async fn disconnect(&self, _peer_id: &NodeId) -> Result<()> {
1201            Ok(())
1202        }
1203
1204        fn get_connection(&self, _peer_id: &NodeId) -> Option<Box<dyn MeshConnection>> {
1205            None
1206        }
1207
1208        fn peer_count(&self) -> usize {
1209            0
1210        }
1211
1212        fn connected_peers(&self) -> Vec<NodeId> {
1213            vec![]
1214        }
1215
1216        fn subscribe_peer_events(&self) -> PeerEventReceiver {
1217            let (_tx, rx) = mpsc::channel(1);
1218            rx
1219        }
1220    }
1221
1222    impl Transport for MockTransport {
1223        fn capabilities(&self) -> &TransportCapabilities {
1224            &self.caps
1225        }
1226
1227        fn is_available(&self) -> bool {
1228            self.available
1229        }
1230
1231        fn signal_quality(&self) -> Option<u8> {
1232            self.signal
1233        }
1234
1235        fn can_reach(&self, peer_id: &NodeId) -> bool {
1236            self.reachable_peers.contains(peer_id)
1237        }
1238    }
1239
1240    #[test]
1241    fn test_register_transport() {
1242        let config = TransportManagerConfig::default();
1243        let mut manager = TransportManager::new(config);
1244
1245        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1246        manager.register(transport);
1247
1248        assert!(manager.get_transport(TransportType::Quic).is_some());
1249        assert!(manager.get_transport(TransportType::LoRa).is_none());
1250    }
1251
1252    #[test]
1253    fn test_unregister_transport() {
1254        let config = TransportManagerConfig::default();
1255        let mut manager = TransportManager::new(config);
1256
1257        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1258        manager.register(transport);
1259
1260        let removed = manager.unregister(TransportType::Quic);
1261        assert!(removed.is_some());
1262        assert!(manager.get_transport(TransportType::Quic).is_none());
1263    }
1264
1265    #[test]
1266    fn test_available_transports() {
1267        let config = TransportManagerConfig::default();
1268        let mut manager = TransportManager::new(config);
1269
1270        let peer = NodeId::new("peer-1".to_string());
1271
1272        // QUIC available and can reach peer
1273        let quic =
1274            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1275        manager.register(quic);
1276
1277        // BLE available but can't reach peer
1278        let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1279        manager.register(ble);
1280
1281        // LoRa unavailable
1282        let lora = Arc::new(
1283            MockTransport::new(TransportCapabilities::lora(7))
1284                .unavailable()
1285                .with_peer(peer.clone()),
1286        );
1287        manager.register(lora);
1288
1289        let available = manager.available_transports(&peer);
1290        assert_eq!(available.len(), 1);
1291        assert!(available.contains(&TransportType::Quic));
1292    }
1293
1294    #[test]
1295    fn test_select_transport_by_reliability() {
1296        let config = TransportManagerConfig::default();
1297        let mut manager = TransportManager::new(config);
1298
1299        let peer = NodeId::new("peer-1".to_string());
1300
1301        // QUIC is reliable
1302        let quic =
1303            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1304        manager.register(quic);
1305
1306        // LoRa is not reliable by default
1307        let lora =
1308            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1309        manager.register(lora);
1310
1311        // Require reliability
1312        let requirements = MessageRequirements {
1313            reliable: true,
1314            ..Default::default()
1315        };
1316
1317        let selected = manager.select_transport(&peer, &requirements);
1318        assert_eq!(selected, Some(TransportType::Quic));
1319    }
1320
1321    #[test]
1322    fn test_select_transport_by_preference() {
1323        let config = TransportManagerConfig {
1324            preference_order: vec![TransportType::BluetoothLE, TransportType::Quic],
1325            ..Default::default()
1326        };
1327        let mut manager = TransportManager::new(config);
1328
1329        let peer = NodeId::new("peer-1".to_string());
1330
1331        // Both transports available
1332        let quic =
1333            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1334        manager.register(quic);
1335
1336        let ble = Arc::new(
1337            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1338        );
1339        manager.register(ble);
1340
1341        let requirements = MessageRequirements::default();
1342        let selected = manager.select_transport(&peer, &requirements);
1343
1344        // BLE preferred over QUIC in this config
1345        assert_eq!(selected, Some(TransportType::BluetoothLE));
1346    }
1347
1348    #[test]
1349    fn test_select_transport_by_latency() {
1350        let config = TransportManagerConfig::default();
1351        let mut manager = TransportManager::new(config);
1352
1353        let peer = NodeId::new("peer-1".to_string());
1354
1355        // QUIC has 10ms latency
1356        let quic =
1357            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1358        manager.register(quic);
1359
1360        // LoRa has 100ms+ latency
1361        let mut lora_caps = TransportCapabilities::lora(7);
1362        lora_caps.reliable = true; // Make it reliable for this test
1363        let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1364        manager.register(lora);
1365
1366        // High priority message - should prefer low latency
1367        let requirements = MessageRequirements {
1368            priority: MessagePriority::High,
1369            reliable: true,
1370            ..Default::default()
1371        };
1372
1373        let selected = manager.select_transport(&peer, &requirements);
1374        assert_eq!(selected, Some(TransportType::Quic));
1375    }
1376
1377    #[test]
1378    fn test_select_transport_with_latency_requirement() {
1379        let config = TransportManagerConfig::default();
1380        let mut manager = TransportManager::new(config);
1381
1382        let peer = NodeId::new("peer-1".to_string());
1383
1384        // QUIC: 10ms latency
1385        let quic =
1386            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1387        manager.register(quic);
1388
1389        // LoRa SF12: ~1000ms latency
1390        let mut lora_caps = TransportCapabilities::lora(12);
1391        lora_caps.reliable = true;
1392        let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1393        manager.register(lora);
1394
1395        // Strict latency requirement - should exclude LoRa
1396        let requirements = MessageRequirements {
1397            reliable: true,
1398            max_latency_ms: Some(50),
1399            ..Default::default()
1400        };
1401
1402        let selected = manager.select_transport(&peer, &requirements);
1403        assert_eq!(selected, Some(TransportType::Quic));
1404    }
1405
1406    #[test]
1407    fn test_select_transport_no_match() {
1408        let config = TransportManagerConfig::default();
1409        let mut manager = TransportManager::new(config);
1410
1411        let peer = NodeId::new("peer-1".to_string());
1412
1413        // Only unreliable LoRa available
1414        let lora =
1415            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1416        manager.register(lora);
1417
1418        // Require reliability
1419        let requirements = MessageRequirements {
1420            reliable: true,
1421            ..Default::default()
1422        };
1423
1424        let selected = manager.select_transport(&peer, &requirements);
1425        assert_eq!(selected, None);
1426    }
1427
1428    #[test]
1429    fn test_peer_transport_caching() {
1430        let config = TransportManagerConfig {
1431            cache_peer_transport: true,
1432            ..Default::default()
1433        };
1434        let mut manager = TransportManager::new(config);
1435
1436        let peer = NodeId::new("peer-1".to_string());
1437
1438        let quic =
1439            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1440        manager.register(quic);
1441
1442        let ble = Arc::new(
1443            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1444        );
1445        manager.register(ble);
1446
1447        // Record BLE success
1448        manager.record_success(&peer, TransportType::BluetoothLE);
1449
1450        // Should return cached BLE even though QUIC might score higher
1451        let requirements = MessageRequirements::default();
1452        let selected = manager.select_transport(&peer, &requirements);
1453        assert_eq!(selected, Some(TransportType::BluetoothLE));
1454
1455        // Clear cache
1456        manager.clear_cache(&peer);
1457
1458        // Now should select based on score
1459        let selected = manager.select_transport(&peer, &requirements);
1460        // With default preference order, QUIC should be selected
1461        assert_eq!(selected, Some(TransportType::Quic));
1462    }
1463
1464    #[test]
1465    fn test_power_sensitive_selection() {
1466        // Use empty preference order so only power consumption matters
1467        let config = TransportManagerConfig {
1468            preference_order: vec![],
1469            ..Default::default()
1470        };
1471        let mut manager = TransportManager::new(config);
1472
1473        let peer = NodeId::new("peer-1".to_string());
1474
1475        // QUIC: 20 battery impact
1476        let quic =
1477            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1478        manager.register(quic);
1479
1480        // BLE: 15 battery impact (more efficient)
1481        let ble = Arc::new(
1482            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1483        );
1484        manager.register(ble);
1485
1486        // Power-sensitive requirement
1487        let requirements = MessageRequirements {
1488            power_sensitive: true,
1489            ..Default::default()
1490        };
1491
1492        let selected = manager.select_transport(&peer, &requirements);
1493        // BLE should be preferred due to lower power consumption
1494        assert_eq!(selected, Some(TransportType::BluetoothLE));
1495    }
1496
1497    #[tokio::test]
1498    async fn test_connect_selects_transport() {
1499        let config = TransportManagerConfig::default();
1500        let mut manager = TransportManager::new(config);
1501
1502        let peer = NodeId::new("peer-1".to_string());
1503
1504        let quic =
1505            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1506        manager.register(quic);
1507
1508        let requirements = MessageRequirements::default();
1509        let result = manager.connect(&peer, &requirements).await;
1510
1511        assert!(result.is_ok());
1512        let (transport_type, conn) = result.unwrap();
1513        assert_eq!(transport_type, TransportType::Quic);
1514        assert_eq!(conn.peer_id(), &peer);
1515    }
1516
1517    #[tokio::test]
1518    async fn test_connect_with_fallback() {
1519        let config = TransportManagerConfig {
1520            enable_fallback: true,
1521            ..Default::default()
1522        };
1523        let mut manager = TransportManager::new(config);
1524
1525        let peer = NodeId::new("peer-1".to_string());
1526
1527        // QUIC can't reach peer
1528        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1529        manager.register(quic);
1530
1531        // BLE can reach peer
1532        let ble = Arc::new(
1533            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1534        );
1535        manager.register(ble);
1536
1537        let requirements = MessageRequirements::default();
1538        let result = manager.connect_with_fallback(&peer, &requirements).await;
1539
1540        assert!(result.is_ok());
1541        let (transport_type, _) = result.unwrap();
1542        assert_eq!(transport_type, TransportType::BluetoothLE);
1543    }
1544
1545    #[test]
1546    fn test_distance_tracking() {
1547        let config = TransportManagerConfig::default();
1548        let manager = TransportManager::new(config);
1549
1550        let peer = NodeId::new("peer-1".to_string());
1551
1552        let distance = PeerDistance {
1553            peer_id: peer.clone(),
1554            distance_meters: 500,
1555            source: super::super::capabilities::DistanceSource::Gps {
1556                confidence_meters: 10,
1557            },
1558            last_updated: Instant::now(),
1559        };
1560
1561        manager.update_peer_distance(distance);
1562
1563        let retrieved = manager.get_peer_distance(&peer);
1564        assert!(retrieved.is_some());
1565        assert_eq!(retrieved.unwrap().distance_meters, 500);
1566    }
1567
1568    // =========================================================================
1569    // Bypass Integration Tests (ADR-042)
1570    // =========================================================================
1571
1572    #[tokio::test]
1573    async fn test_no_bypass_channel_by_default() {
1574        let config = TransportManagerConfig::default();
1575        let manager = TransportManager::new(config);
1576
1577        assert!(!manager.has_bypass_channel());
1578        assert!(!manager.is_bypass_collection("test").await);
1579    }
1580
1581    #[test]
1582    fn test_route_message_without_bypass() {
1583        let config = TransportManagerConfig::default();
1584        let mut manager = TransportManager::new(config);
1585
1586        let peer = NodeId::new("peer-1".to_string());
1587
1588        let quic =
1589            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1590        manager.register(quic);
1591
1592        // Normal requirements - should select transport
1593        let requirements = MessageRequirements::default();
1594        let decision = manager.route_message(&peer, &requirements);
1595        assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1596
1597        // Bypass requested but not available - should fall back to transport
1598        // Note: We use a generous latency (100ms) so QUIC (10ms) can be selected
1599        let bypass_req = MessageRequirements {
1600            bypass_sync: true,
1601            max_latency_ms: Some(100), // QUIC has 10ms typical latency
1602            ..Default::default()
1603        };
1604        let decision = manager.route_message(&peer, &bypass_req);
1605        // Falls back to QUIC since bypass not available
1606        assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1607    }
1608
1609    #[tokio::test]
1610    async fn test_subscribe_bypass_not_configured() {
1611        let config = TransportManagerConfig::default();
1612        let manager = TransportManager::new(config);
1613
1614        let result = manager.subscribe_bypass().await;
1615        assert!(result.is_err());
1616    }
1617
1618    #[test]
1619    fn test_route_decision_equality() {
1620        assert_eq!(RouteDecision::Bypass, RouteDecision::Bypass);
1621        assert_eq!(
1622            RouteDecision::Transport(TransportType::Quic),
1623            RouteDecision::Transport(TransportType::Quic)
1624        );
1625        assert_ne!(RouteDecision::Bypass, RouteDecision::NoRoute);
1626        assert_ne!(
1627            RouteDecision::Transport(TransportType::Quic),
1628            RouteDecision::Transport(TransportType::LoRa)
1629        );
1630    }
1631
1632    // =========================================================================
1633    // PACE Instance API Tests
1634    // =========================================================================
1635
1636    #[test]
1637    fn test_register_instance() {
1638        let config = TransportManagerConfig::default();
1639        let manager = TransportManager::new(config);
1640
1641        let peer = NodeId::new("peer-1".to_string());
1642        let instance = TransportInstance::new(
1643            "iroh-eth0",
1644            TransportType::Quic,
1645            TransportCapabilities::quic(),
1646        );
1647        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer));
1648
1649        manager.register_instance(instance, transport);
1650
1651        assert!(manager.get_instance(&"iroh-eth0".to_string()).is_some());
1652        assert!(manager.get_instance(&"nonexistent".to_string()).is_none());
1653    }
1654
1655    #[test]
1656    fn test_unregister_instance() {
1657        let config = TransportManagerConfig::default();
1658        let manager = TransportManager::new(config);
1659
1660        let instance = TransportInstance::new(
1661            "iroh-eth0",
1662            TransportType::Quic,
1663            TransportCapabilities::quic(),
1664        );
1665        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1666
1667        manager.register_instance(instance, transport);
1668
1669        let removed = manager.unregister_instance(&"iroh-eth0".to_string());
1670        assert!(removed.is_some());
1671        let (inst, _) = removed.unwrap();
1672        assert_eq!(inst.id, "iroh-eth0");
1673
1674        // Should be gone now
1675        assert!(manager.get_instance(&"iroh-eth0".to_string()).is_none());
1676
1677        // Unregistering again returns None
1678        let removed_again = manager.unregister_instance(&"iroh-eth0".to_string());
1679        assert!(removed_again.is_none());
1680    }
1681
1682    #[test]
1683    fn test_registered_instance_ids() {
1684        let config = TransportManagerConfig::default();
1685        let manager = TransportManager::new(config);
1686
1687        // Empty initially
1688        assert!(manager.registered_instance_ids().is_empty());
1689
1690        let inst1 = TransportInstance::new(
1691            "iroh-eth0",
1692            TransportType::Quic,
1693            TransportCapabilities::quic(),
1694        );
1695        let inst2 = TransportInstance::new(
1696            "lora-915",
1697            TransportType::LoRa,
1698            TransportCapabilities::lora(7),
1699        );
1700
1701        manager.register_instance(
1702            inst1,
1703            Arc::new(MockTransport::new(TransportCapabilities::quic())),
1704        );
1705        manager.register_instance(
1706            inst2,
1707            Arc::new(MockTransport::new(TransportCapabilities::lora(7))),
1708        );
1709
1710        let ids = manager.registered_instance_ids();
1711        assert_eq!(ids.len(), 2);
1712        assert!(ids.contains(&"iroh-eth0".to_string()));
1713        assert!(ids.contains(&"lora-915".to_string()));
1714    }
1715
1716    #[test]
1717    fn test_available_instance_ids() {
1718        let config = TransportManagerConfig::default();
1719        let manager = TransportManager::new(config);
1720
1721        // Available instance
1722        let inst1 = TransportInstance::new(
1723            "iroh-eth0",
1724            TransportType::Quic,
1725            TransportCapabilities::quic(),
1726        );
1727        let transport1 = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1728        manager.register_instance(inst1, transport1);
1729
1730        // Unavailable instance (transport unavailable)
1731        let inst2 = TransportInstance::new(
1732            "lora-off",
1733            TransportType::LoRa,
1734            TransportCapabilities::lora(7),
1735        );
1736        let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)).unavailable());
1737        manager.register_instance(inst2, transport2);
1738
1739        // Unavailable instance (instance.available = false)
1740        let mut inst3 = TransportInstance::new(
1741            "ble-disabled",
1742            TransportType::BluetoothLE,
1743            TransportCapabilities::bluetooth_le(),
1744        );
1745        inst3.available = false;
1746        let transport3 = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1747        manager.register_instance(inst3, transport3);
1748
1749        let available = manager.available_instance_ids();
1750        assert_eq!(available.len(), 1);
1751        assert!(available.contains("iroh-eth0"));
1752    }
1753
1754    #[test]
1755    fn test_available_instances_for_peer() {
1756        let config = TransportManagerConfig::default();
1757        let manager = TransportManager::new(config);
1758
1759        let peer = NodeId::new("peer-1".to_string());
1760
1761        // Instance that can reach peer
1762        let inst1 = TransportInstance::new(
1763            "iroh-eth0",
1764            TransportType::Quic,
1765            TransportCapabilities::quic(),
1766        );
1767        let transport1 =
1768            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1769        manager.register_instance(inst1, transport1);
1770
1771        // Instance that cannot reach peer
1772        let inst2 = TransportInstance::new(
1773            "lora-915",
1774            TransportType::LoRa,
1775            TransportCapabilities::lora(7),
1776        );
1777        let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1778        manager.register_instance(inst2, transport2);
1779
1780        // Unavailable instance that could reach peer
1781        let inst3 = TransportInstance::new(
1782            "ble-off",
1783            TransportType::BluetoothLE,
1784            TransportCapabilities::bluetooth_le(),
1785        );
1786        let transport3 = Arc::new(
1787            MockTransport::new(TransportCapabilities::bluetooth_le())
1788                .with_peer(peer.clone())
1789                .unavailable(),
1790        );
1791        manager.register_instance(inst3, transport3);
1792
1793        let for_peer = manager.available_instances_for_peer(&peer);
1794        assert_eq!(for_peer.len(), 1);
1795        assert_eq!(for_peer[0], "iroh-eth0");
1796    }
1797
1798    // =========================================================================
1799    // current_pace_level() Tests
1800    // =========================================================================
1801
1802    #[test]
1803    fn test_current_pace_level_no_policy_with_available() {
1804        let config = TransportManagerConfig::default();
1805        let manager = TransportManager::new(config);
1806
1807        // Register an available instance
1808        let inst = TransportInstance::new(
1809            "iroh-eth0",
1810            TransportType::Quic,
1811            TransportCapabilities::quic(),
1812        );
1813        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1814        manager.register_instance(inst, transport);
1815
1816        // No policy: if any transport available, returns Primary
1817        assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1818    }
1819
1820    #[test]
1821    fn test_current_pace_level_no_policy_none_available() {
1822        let config = TransportManagerConfig::default();
1823        let manager = TransportManager::new(config);
1824
1825        // No instances at all
1826        assert_eq!(manager.current_pace_level(), PaceLevel::None);
1827    }
1828
1829    #[test]
1830    fn test_current_pace_level_no_policy_all_unavailable() {
1831        let config = TransportManagerConfig::default();
1832        let manager = TransportManager::new(config);
1833
1834        // Register an unavailable instance
1835        let inst = TransportInstance::new(
1836            "iroh-eth0",
1837            TransportType::Quic,
1838            TransportCapabilities::quic(),
1839        );
1840        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).unavailable());
1841        manager.register_instance(inst, transport);
1842
1843        assert_eq!(manager.current_pace_level(), PaceLevel::None);
1844    }
1845
1846    #[test]
1847    fn test_current_pace_level_with_policy_primary() {
1848        let policy = TransportPolicy::new("test")
1849            .primary(vec!["iroh-eth0"])
1850            .alternate(vec!["lora-915"])
1851            .emergency(vec!["ble-mesh"]);
1852
1853        let config = TransportManagerConfig::with_policy(policy);
1854        let manager = TransportManager::new(config);
1855
1856        // Register iroh-eth0 as available
1857        let inst = TransportInstance::new(
1858            "iroh-eth0",
1859            TransportType::Quic,
1860            TransportCapabilities::quic(),
1861        );
1862        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1863        manager.register_instance(inst, transport);
1864
1865        assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1866    }
1867
1868    #[test]
1869    fn test_current_pace_level_with_policy_alternate() {
1870        let policy = TransportPolicy::new("test")
1871            .primary(vec!["iroh-eth0"])
1872            .alternate(vec!["lora-915"])
1873            .emergency(vec!["ble-mesh"]);
1874
1875        let config = TransportManagerConfig::with_policy(policy);
1876        let manager = TransportManager::new(config);
1877
1878        // Only alternate is available
1879        let inst = TransportInstance::new(
1880            "lora-915",
1881            TransportType::LoRa,
1882            TransportCapabilities::lora(7),
1883        );
1884        let transport = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1885        manager.register_instance(inst, transport);
1886
1887        assert_eq!(manager.current_pace_level(), PaceLevel::Alternate);
1888    }
1889
1890    #[test]
1891    fn test_current_pace_level_with_policy_emergency() {
1892        let policy = TransportPolicy::new("test")
1893            .primary(vec!["iroh-eth0"])
1894            .alternate(vec!["lora-915"])
1895            .emergency(vec!["ble-mesh"]);
1896
1897        let config = TransportManagerConfig::with_policy(policy);
1898        let manager = TransportManager::new(config);
1899
1900        // Only emergency is available
1901        let inst = TransportInstance::new(
1902            "ble-mesh",
1903            TransportType::BluetoothLE,
1904            TransportCapabilities::bluetooth_le(),
1905        );
1906        let transport = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1907        manager.register_instance(inst, transport);
1908
1909        assert_eq!(manager.current_pace_level(), PaceLevel::Emergency);
1910    }
1911
1912    #[test]
1913    fn test_current_pace_level_with_policy_none_available() {
1914        let policy = TransportPolicy::new("test")
1915            .primary(vec!["iroh-eth0"])
1916            .alternate(vec!["lora-915"]);
1917
1918        let config = TransportManagerConfig::with_policy(policy);
1919        let manager = TransportManager::new(config);
1920
1921        // No instances registered
1922        assert_eq!(manager.current_pace_level(), PaceLevel::None);
1923    }
1924
1925    // =========================================================================
1926    // select_transports_pace() Tests
1927    // =========================================================================
1928
1929    #[test]
1930    fn test_select_transports_pace_no_policy() {
1931        let config = TransportManagerConfig::default();
1932        let manager = TransportManager::new(config);
1933
1934        let peer = NodeId::new("peer-1".to_string());
1935        let requirements = MessageRequirements::default();
1936
1937        // No policy => empty vec
1938        let selected = manager.select_transports_pace(&peer, &requirements);
1939        assert!(selected.is_empty());
1940    }
1941
1942    #[test]
1943    fn test_select_transports_pace_single_mode() {
1944        let policy = TransportPolicy::new("test")
1945            .primary(vec!["iroh-eth0", "iroh-wlan0"])
1946            .alternate(vec!["lora-915"]);
1947
1948        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
1949        let manager = TransportManager::new(config);
1950
1951        let peer = NodeId::new("peer-1".to_string());
1952
1953        // Register two available primary instances that can reach peer
1954        let inst1 = TransportInstance::new(
1955            "iroh-eth0",
1956            TransportType::Quic,
1957            TransportCapabilities::quic(),
1958        );
1959        let t1 =
1960            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1961        manager.register_instance(inst1, t1);
1962
1963        let inst2 = TransportInstance::new(
1964            "iroh-wlan0",
1965            TransportType::Quic,
1966            TransportCapabilities::quic(),
1967        );
1968        let t2 =
1969            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1970        manager.register_instance(inst2, t2);
1971
1972        let requirements = MessageRequirements::default();
1973        let selected = manager.select_transports_pace(&peer, &requirements);
1974
1975        // Single mode: at most 1
1976        assert_eq!(selected.len(), 1);
1977        assert_eq!(selected[0], "iroh-eth0");
1978    }
1979
1980    #[test]
1981    fn test_select_transports_pace_redundant_mode() {
1982        let policy = TransportPolicy::new("test")
1983            .primary(vec!["iroh-eth0", "iroh-wlan0"])
1984            .alternate(vec!["lora-915"]);
1985
1986        let config =
1987            TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(2));
1988        let manager = TransportManager::new(config);
1989
1990        let peer = NodeId::new("peer-1".to_string());
1991
1992        let inst1 = TransportInstance::new(
1993            "iroh-eth0",
1994            TransportType::Quic,
1995            TransportCapabilities::quic(),
1996        );
1997        let t1 =
1998            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1999        manager.register_instance(inst1, t1);
2000
2001        let inst2 = TransportInstance::new(
2002            "iroh-wlan0",
2003            TransportType::Quic,
2004            TransportCapabilities::quic(),
2005        );
2006        let t2 =
2007            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2008        manager.register_instance(inst2, t2);
2009
2010        let inst3 = TransportInstance::new(
2011            "lora-915",
2012            TransportType::LoRa,
2013            TransportCapabilities::lora(7),
2014        );
2015        let t3 =
2016            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2017        manager.register_instance(inst3, t3);
2018
2019        let requirements = MessageRequirements::default();
2020        let selected = manager.select_transports_pace(&peer, &requirements);
2021
2022        // Redundant { min_paths: 2, max_paths: None } => takes max(len, min) = all 3
2023        assert!(selected.len() >= 2);
2024    }
2025
2026    #[test]
2027    fn test_select_transports_pace_redundant_bounded() {
2028        let policy = TransportPolicy::new("test").primary(vec!["t1", "t2", "t3", "t4"]);
2029
2030        let config = TransportManagerConfig::with_policy(policy)
2031            .with_mode(TransportMode::redundant_bounded(1, 2));
2032        let manager = TransportManager::new(config);
2033
2034        let peer = NodeId::new("peer-1".to_string());
2035
2036        // Register 4 instances
2037        for name in &["t1", "t2", "t3", "t4"] {
2038            let inst =
2039                TransportInstance::new(*name, TransportType::Quic, TransportCapabilities::quic());
2040            let t =
2041                Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2042            manager.register_instance(inst, t);
2043        }
2044
2045        let requirements = MessageRequirements::default();
2046        let selected = manager.select_transports_pace(&peer, &requirements);
2047
2048        // Redundant { min_paths: 1, max_paths: Some(2) } => takes max(2, 1) = 2
2049        assert_eq!(selected.len(), 2);
2050    }
2051
2052    #[test]
2053    fn test_select_transports_pace_bonded_mode() {
2054        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2055
2056        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2057        let manager = TransportManager::new(config);
2058
2059        let peer = NodeId::new("peer-1".to_string());
2060
2061        let inst1 = TransportInstance::new(
2062            "iroh-eth0",
2063            TransportType::Quic,
2064            TransportCapabilities::quic(),
2065        );
2066        let t1 =
2067            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2068        manager.register_instance(inst1, t1);
2069
2070        let inst2 = TransportInstance::new(
2071            "iroh-wlan0",
2072            TransportType::Quic,
2073            TransportCapabilities::quic(),
2074        );
2075        let t2 =
2076            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2077        manager.register_instance(inst2, t2);
2078
2079        let requirements = MessageRequirements::default();
2080        let selected = manager.select_transports_pace(&peer, &requirements);
2081
2082        // Bonded: returns all candidates
2083        assert_eq!(selected.len(), 2);
2084    }
2085
2086    #[test]
2087    fn test_select_transports_pace_load_balanced_mode() {
2088        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2089
2090        let config = TransportManagerConfig::with_policy(policy)
2091            .with_mode(TransportMode::LoadBalanced { weights: None });
2092        let manager = TransportManager::new(config);
2093
2094        let peer = NodeId::new("peer-1".to_string());
2095
2096        let inst1 = TransportInstance::new(
2097            "iroh-eth0",
2098            TransportType::Quic,
2099            TransportCapabilities::quic(),
2100        );
2101        let t1 =
2102            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2103        manager.register_instance(inst1, t1);
2104
2105        let inst2 = TransportInstance::new(
2106            "iroh-wlan0",
2107            TransportType::Quic,
2108            TransportCapabilities::quic(),
2109        );
2110        let t2 =
2111            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2112        manager.register_instance(inst2, t2);
2113
2114        let requirements = MessageRequirements::default();
2115        let selected = manager.select_transports_pace(&peer, &requirements);
2116
2117        // LoadBalanced: returns all candidates
2118        assert_eq!(selected.len(), 2);
2119    }
2120
2121    #[test]
2122    fn test_select_transports_pace_filters_by_requirements() {
2123        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2124
2125        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2126        let manager = TransportManager::new(config);
2127
2128        let peer = NodeId::new("peer-1".to_string());
2129
2130        // QUIC is reliable
2131        let inst1 = TransportInstance::new(
2132            "iroh-eth0",
2133            TransportType::Quic,
2134            TransportCapabilities::quic(),
2135        );
2136        let t1 =
2137            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2138        manager.register_instance(inst1, t1);
2139
2140        // LoRa is NOT reliable
2141        let inst2 = TransportInstance::new(
2142            "lora-915",
2143            TransportType::LoRa,
2144            TransportCapabilities::lora(7),
2145        );
2146        let t2 =
2147            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2148        manager.register_instance(inst2, t2);
2149
2150        // Require reliability => should filter out LoRa
2151        let requirements = MessageRequirements {
2152            reliable: true,
2153            ..Default::default()
2154        };
2155        let selected = manager.select_transports_pace(&peer, &requirements);
2156
2157        assert_eq!(selected.len(), 1);
2158        assert_eq!(selected[0], "iroh-eth0");
2159    }
2160
2161    #[test]
2162    fn test_select_transports_pace_filters_unreachable_peer() {
2163        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2164
2165        let config = TransportManagerConfig::with_policy(policy);
2166        let manager = TransportManager::new(config);
2167
2168        let peer = NodeId::new("peer-1".to_string());
2169
2170        // Can reach peer
2171        let inst1 = TransportInstance::new(
2172            "iroh-eth0",
2173            TransportType::Quic,
2174            TransportCapabilities::quic(),
2175        );
2176        let t1 =
2177            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2178        manager.register_instance(inst1, t1);
2179
2180        // Cannot reach peer
2181        let inst2 = TransportInstance::new(
2182            "lora-915",
2183            TransportType::LoRa,
2184            TransportCapabilities::lora(7),
2185        );
2186        let t2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
2187        manager.register_instance(inst2, t2);
2188
2189        let requirements = MessageRequirements::default();
2190        let selected = manager.select_transports_pace(&peer, &requirements);
2191
2192        assert_eq!(selected.len(), 1);
2193        assert_eq!(selected[0], "iroh-eth0");
2194    }
2195
2196    // =========================================================================
2197    // select_transport_pace() Tests
2198    // =========================================================================
2199
2200    #[test]
2201    fn test_select_transport_pace_returns_first() {
2202        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2203
2204        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2205        let manager = TransportManager::new(config);
2206
2207        let peer = NodeId::new("peer-1".to_string());
2208
2209        let inst1 = TransportInstance::new(
2210            "iroh-eth0",
2211            TransportType::Quic,
2212            TransportCapabilities::quic(),
2213        );
2214        let t1 =
2215            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2216        manager.register_instance(inst1, t1);
2217
2218        let inst2 = TransportInstance::new(
2219            "iroh-wlan0",
2220            TransportType::Quic,
2221            TransportCapabilities::quic(),
2222        );
2223        let t2 =
2224            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2225        manager.register_instance(inst2, t2);
2226
2227        let requirements = MessageRequirements::default();
2228        let selected = manager.select_transport_pace(&peer, &requirements);
2229
2230        assert_eq!(selected, Some("iroh-eth0".to_string()));
2231    }
2232
2233    #[test]
2234    fn test_select_transport_pace_returns_none_no_policy() {
2235        let config = TransportManagerConfig::default();
2236        let manager = TransportManager::new(config);
2237
2238        let peer = NodeId::new("peer-1".to_string());
2239        let requirements = MessageRequirements::default();
2240
2241        assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2242    }
2243
2244    #[test]
2245    fn test_select_transport_pace_returns_none_no_candidates() {
2246        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
2247
2248        let config = TransportManagerConfig::with_policy(policy);
2249        let manager = TransportManager::new(config);
2250
2251        let peer = NodeId::new("peer-1".to_string());
2252        let requirements = MessageRequirements::default();
2253
2254        // No instances registered
2255        assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2256    }
2257
2258    // =========================================================================
2259    // record_success_pace() and clear_cache_pace() Tests
2260    // =========================================================================
2261
2262    #[test]
2263    fn test_record_success_pace_caching_enabled() {
2264        let config = TransportManagerConfig {
2265            cache_peer_transport: true,
2266            ..Default::default()
2267        };
2268        let manager = TransportManager::new(config);
2269
2270        let peer = NodeId::new("peer-1".to_string());
2271        manager.record_success_pace(&peer, "iroh-eth0".to_string());
2272
2273        let cached = manager.peer_transport_ids.read().unwrap();
2274        assert_eq!(cached.get(&peer), Some(&"iroh-eth0".to_string()));
2275    }
2276
2277    #[test]
2278    fn test_record_success_pace_caching_disabled() {
2279        let config = TransportManagerConfig {
2280            cache_peer_transport: false,
2281            ..Default::default()
2282        };
2283        let manager = TransportManager::new(config);
2284
2285        let peer = NodeId::new("peer-1".to_string());
2286        manager.record_success_pace(&peer, "iroh-eth0".to_string());
2287
2288        let cached = manager.peer_transport_ids.read().unwrap();
2289        assert!(cached.get(&peer).is_none());
2290    }
2291
2292    #[test]
2293    fn test_clear_cache_pace() {
2294        let config = TransportManagerConfig {
2295            cache_peer_transport: true,
2296            ..Default::default()
2297        };
2298        let manager = TransportManager::new(config);
2299
2300        let peer = NodeId::new("peer-1".to_string());
2301        manager.record_success_pace(&peer, "iroh-eth0".to_string());
2302
2303        // Verify it's cached
2304        assert!(manager
2305            .peer_transport_ids
2306            .read()
2307            .unwrap()
2308            .get(&peer)
2309            .is_some());
2310
2311        manager.clear_cache_pace(&peer);
2312
2313        // Verify it's cleared
2314        assert!(manager
2315            .peer_transport_ids
2316            .read()
2317            .unwrap()
2318            .get(&peer)
2319            .is_none());
2320    }
2321
2322    #[test]
2323    fn test_clear_cache_pace_nonexistent_peer() {
2324        let config = TransportManagerConfig::default();
2325        let manager = TransportManager::new(config);
2326
2327        let peer = NodeId::new("nonexistent".to_string());
2328
2329        // Should not panic
2330        manager.clear_cache_pace(&peer);
2331    }
2332
2333    // =========================================================================
2334    // select_transport_for_distance() Tests
2335    // =========================================================================
2336
2337    #[test]
2338    fn test_select_transport_for_distance_no_distance() {
2339        let config = TransportManagerConfig::default();
2340        let mut manager = TransportManager::new(config);
2341
2342        let peer = NodeId::new("peer-1".to_string());
2343        let quic =
2344            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2345        manager.register(quic);
2346
2347        let requirements = MessageRequirements::default();
2348        let result = manager.select_transport_for_distance(&peer, &requirements);
2349
2350        assert!(result.is_some());
2351        let (transport_type, range_mode) = result.unwrap();
2352        assert_eq!(transport_type, TransportType::Quic);
2353        assert!(range_mode.is_none());
2354    }
2355
2356    #[test]
2357    fn test_select_transport_for_distance_with_distance() {
2358        let config = TransportManagerConfig::default();
2359        let mut manager = TransportManager::new(config);
2360
2361        let peer = NodeId::new("peer-1".to_string());
2362        let quic =
2363            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2364        manager.register(quic);
2365
2366        // Set distance for peer
2367        let distance = PeerDistance {
2368            peer_id: peer.clone(),
2369            distance_meters: 1000,
2370            source: super::super::capabilities::DistanceSource::Configured,
2371            last_updated: Instant::now(),
2372        };
2373        manager.update_peer_distance(distance);
2374
2375        let requirements = MessageRequirements::default();
2376        let result = manager.select_transport_for_distance(&peer, &requirements);
2377
2378        assert!(result.is_some());
2379        let (transport_type, range_mode) = result.unwrap();
2380        assert_eq!(transport_type, TransportType::Quic);
2381        // Range mode is None because placeholder logic doesn't do runtime downcasting
2382        assert!(range_mode.is_none());
2383    }
2384
2385    #[test]
2386    fn test_select_transport_for_distance_no_suitable_transport() {
2387        let config = TransportManagerConfig::default();
2388        let manager = TransportManager::new(config);
2389
2390        let peer = NodeId::new("peer-1".to_string());
2391        let requirements = MessageRequirements::default();
2392
2393        let result = manager.select_transport_for_distance(&peer, &requirements);
2394        assert!(result.is_none());
2395    }
2396
2397    // =========================================================================
2398    // TransportManagerConfig builder Tests
2399    // =========================================================================
2400
2401    #[test]
2402    fn test_config_with_policy() {
2403        let policy = TransportPolicy::new("tactical")
2404            .primary(vec!["iroh-eth0"])
2405            .alternate(vec!["lora-915"]);
2406
2407        let config = TransportManagerConfig::with_policy(policy);
2408
2409        assert!(config.default_policy.is_some());
2410        let p = config.default_policy.unwrap();
2411        assert_eq!(p.name, "tactical");
2412        assert_eq!(p.primary.len(), 1);
2413        assert_eq!(p.alternate.len(), 1);
2414        // Verify defaults are preserved
2415        assert!(config.enable_fallback);
2416        assert!(config.cache_peer_transport);
2417        assert_eq!(config.switch_threshold, 10);
2418        assert!(matches!(config.transport_mode, TransportMode::Single));
2419    }
2420
2421    #[test]
2422    fn test_config_with_mode() {
2423        let config = TransportManagerConfig::default().with_mode(TransportMode::Bonded);
2424
2425        assert!(matches!(config.transport_mode, TransportMode::Bonded));
2426    }
2427
2428    #[test]
2429    fn test_config_with_policy_and_mode_chained() {
2430        let policy = TransportPolicy::new("test").primary(vec!["t1"]);
2431        let config =
2432            TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(3));
2433
2434        assert!(config.default_policy.is_some());
2435        assert!(matches!(
2436            config.transport_mode,
2437            TransportMode::Redundant {
2438                min_paths: 3,
2439                max_paths: None
2440            }
2441        ));
2442    }
2443
2444    // =========================================================================
2445    // connect() error paths Tests
2446    // =========================================================================
2447
2448    #[tokio::test]
2449    async fn test_connect_no_suitable_transport() {
2450        let config = TransportManagerConfig::default();
2451        let manager = TransportManager::new(config);
2452
2453        let peer = NodeId::new("peer-1".to_string());
2454        let requirements = MessageRequirements::default();
2455
2456        let result = manager.connect(&peer, &requirements).await;
2457        assert!(result.is_err());
2458        match result {
2459            Err(TransportError::PeerNotFound(_)) => {} // expected
2460            Err(other) => panic!("Expected PeerNotFound, got: {}", other),
2461            Ok(_) => panic!("Expected error but got Ok"),
2462        }
2463    }
2464
2465    #[tokio::test]
2466    async fn test_connect_unreachable_peer() {
2467        let config = TransportManagerConfig::default();
2468        let mut manager = TransportManager::new(config);
2469
2470        // Register QUIC but the peer is not in reachable_peers
2471        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2472        manager.register(quic);
2473
2474        let peer = NodeId::new("unreachable-peer".to_string());
2475        let requirements = MessageRequirements::default();
2476
2477        let result = manager.connect(&peer, &requirements).await;
2478        assert!(result.is_err());
2479    }
2480
2481    // =========================================================================
2482    // connect_with_fallback() Tests
2483    // =========================================================================
2484
2485    #[tokio::test]
2486    async fn test_connect_with_fallback_disabled() {
2487        let config = TransportManagerConfig {
2488            enable_fallback: false,
2489            ..Default::default()
2490        };
2491        let mut manager = TransportManager::new(config);
2492
2493        let peer = NodeId::new("peer-1".to_string());
2494
2495        // QUIC registered but can't reach peer (will fail connect)
2496        let quic =
2497            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2498        manager.register(quic);
2499
2500        // BLE also available
2501        let ble = Arc::new(
2502            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2503        );
2504        manager.register(ble);
2505
2506        // Both can reach, both will succeed, so first should succeed.
2507        // Let's test the error path where the first fails:
2508        // We need a transport that can reach but fails to connect.
2509        // The MockTransport connects if peer is in reachable_peers.
2510        // Actually, both will succeed, so let's just test with no reachable transports.
2511
2512        let peer_unreachable = NodeId::new("nobody".to_string());
2513        let requirements = MessageRequirements::default();
2514
2515        let result = manager
2516            .connect_with_fallback(&peer_unreachable, &requirements)
2517            .await;
2518        assert!(result.is_err());
2519    }
2520
2521    #[tokio::test]
2522    async fn test_connect_with_fallback_no_candidates() {
2523        let config = TransportManagerConfig::default();
2524        let manager = TransportManager::new(config);
2525
2526        let peer = NodeId::new("peer-1".to_string());
2527        let requirements = MessageRequirements::default();
2528
2529        let result = manager.connect_with_fallback(&peer, &requirements).await;
2530        assert!(result.is_err());
2531        match result {
2532            Err(ref e) => {
2533                let err_msg = format!("{}", e);
2534                assert!(err_msg.contains("No suitable transport"));
2535            }
2536            Ok(_) => panic!("Expected error but got Ok"),
2537        }
2538    }
2539
2540    // =========================================================================
2541    // route_message() NoRoute Tests
2542    // =========================================================================
2543
2544    #[test]
2545    fn test_route_message_no_route() {
2546        let config = TransportManagerConfig::default();
2547        let manager = TransportManager::new(config);
2548
2549        let peer = NodeId::new("peer-1".to_string());
2550        let requirements = MessageRequirements::default();
2551
2552        // No transports registered => NoRoute
2553        let decision = manager.route_message(&peer, &requirements);
2554        assert_eq!(decision, RouteDecision::NoRoute);
2555    }
2556
2557    #[test]
2558    fn test_route_message_bypass_requested_no_channel() {
2559        let config = TransportManagerConfig::default();
2560        let manager = TransportManager::new(config);
2561
2562        let peer = NodeId::new("peer-1".to_string());
2563        let requirements = MessageRequirements {
2564            bypass_sync: true,
2565            ..Default::default()
2566        };
2567
2568        // bypass requested but no channel and no transports => NoRoute
2569        let decision = manager.route_message(&peer, &requirements);
2570        assert_eq!(decision, RouteDecision::NoRoute);
2571    }
2572
2573    // =========================================================================
2574    // RouteDecision construction Tests
2575    // =========================================================================
2576
2577    #[test]
2578    fn test_route_decision_no_route() {
2579        let decision = RouteDecision::NoRoute;
2580        assert_eq!(decision, RouteDecision::NoRoute);
2581        assert_ne!(decision, RouteDecision::Bypass);
2582        assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
2583    }
2584
2585    #[test]
2586    fn test_route_decision_debug() {
2587        let bypass = RouteDecision::Bypass;
2588        let transport = RouteDecision::Transport(TransportType::LoRa);
2589        let no_route = RouteDecision::NoRoute;
2590
2591        assert!(format!("{:?}", bypass).contains("Bypass"));
2592        assert!(format!("{:?}", transport).contains("LoRa"));
2593        assert!(format!("{:?}", no_route).contains("NoRoute"));
2594    }
2595
2596    #[test]
2597    fn test_route_decision_clone() {
2598        let original = RouteDecision::Transport(TransportType::BluetoothLE);
2599        let cloned = original.clone();
2600        assert_eq!(original, cloned);
2601    }
2602
2603    // =========================================================================
2604    // TransportManager Debug and misc Tests
2605    // =========================================================================
2606
2607    #[test]
2608    fn test_transport_manager_debug() {
2609        let config = TransportManagerConfig::default();
2610        let mut manager = TransportManager::new(config);
2611
2612        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2613        manager.register(quic);
2614
2615        let debug_str = format!("{:?}", manager);
2616        assert!(debug_str.contains("TransportManager"));
2617        assert!(debug_str.contains("Quic"));
2618    }
2619
2620    #[test]
2621    fn test_registered_transports() {
2622        let config = TransportManagerConfig::default();
2623        let mut manager = TransportManager::new(config);
2624
2625        assert!(manager.registered_transports().is_empty());
2626
2627        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2628        let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
2629        manager.register(quic);
2630        manager.register(ble);
2631
2632        let registered = manager.registered_transports();
2633        assert_eq!(registered.len(), 2);
2634        assert!(registered.contains(&TransportType::Quic));
2635        assert!(registered.contains(&TransportType::BluetoothLE));
2636    }
2637
2638    #[tokio::test]
2639    async fn test_set_bypass_channel() {
2640        let config = TransportManagerConfig::default();
2641        let mut manager = TransportManager::new(config);
2642
2643        assert!(!manager.has_bypass_channel());
2644
2645        let bypass_config = BypassChannelConfig::new();
2646        let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
2647        manager.set_bypass_channel(bypass);
2648
2649        assert!(manager.has_bypass_channel());
2650    }
2651
2652    #[test]
2653    fn test_record_success_caching_disabled() {
2654        let config = TransportManagerConfig {
2655            cache_peer_transport: false,
2656            ..Default::default()
2657        };
2658        let manager = TransportManager::new(config);
2659
2660        let peer = NodeId::new("peer-1".to_string());
2661        manager.record_success(&peer, TransportType::Quic);
2662
2663        // Cache should be empty since caching is disabled
2664        let cached = manager.peer_transports.read().unwrap();
2665        assert!(cached.get(&peer).is_none());
2666    }
2667
2668    #[test]
2669    fn test_select_transport_cached_transport_invalid() {
2670        let config = TransportManagerConfig {
2671            cache_peer_transport: true,
2672            ..Default::default()
2673        };
2674        let mut manager = TransportManager::new(config);
2675
2676        let peer = NodeId::new("peer-1".to_string());
2677
2678        // Register BLE that is available and can reach peer
2679        let ble = Arc::new(
2680            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2681        );
2682        manager.register(ble);
2683
2684        // Cache a transport type that is NOT registered (e.g., LoRa)
2685        manager.record_success(&peer, TransportType::LoRa);
2686
2687        let requirements = MessageRequirements::default();
2688        let selected = manager.select_transport(&peer, &requirements);
2689
2690        // Should fall through cached transport (LoRa not registered) and select BLE
2691        assert_eq!(selected, Some(TransportType::BluetoothLE));
2692    }
2693
2694    #[test]
2695    fn test_select_transport_cached_transport_unavailable() {
2696        let config = TransportManagerConfig {
2697            cache_peer_transport: true,
2698            ..Default::default()
2699        };
2700        let mut manager = TransportManager::new(config);
2701
2702        let peer = NodeId::new("peer-1".to_string());
2703
2704        // Register QUIC that is available
2705        let quic =
2706            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2707        manager.register(quic);
2708
2709        // Register BLE that is unavailable
2710        let ble = Arc::new(
2711            MockTransport::new(TransportCapabilities::bluetooth_le())
2712                .with_peer(peer.clone())
2713                .unavailable(),
2714        );
2715        manager.register(ble);
2716
2717        // Cache BLE (which is unavailable)
2718        manager.record_success(&peer, TransportType::BluetoothLE);
2719
2720        let requirements = MessageRequirements::default();
2721        let selected = manager.select_transport(&peer, &requirements);
2722
2723        // Should fall through cached BLE (unavailable) and select QUIC
2724        assert_eq!(selected, Some(TransportType::Quic));
2725    }
2726
2727    #[test]
2728    fn test_pace_fallback_order() {
2729        // Test that PACE selection follows policy order when primary fails
2730        let policy = TransportPolicy::new("test")
2731            .primary(vec!["dead-transport"])
2732            .alternate(vec!["lora-915"]);
2733
2734        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
2735        let manager = TransportManager::new(config);
2736
2737        let peer = NodeId::new("peer-1".to_string());
2738
2739        // Only register the alternate (primary is not registered)
2740        let inst = TransportInstance::new(
2741            "lora-915",
2742            TransportType::LoRa,
2743            TransportCapabilities::lora(7),
2744        );
2745        let t =
2746            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2747        manager.register_instance(inst, t);
2748
2749        let requirements = MessageRequirements::default();
2750        let selected = manager.select_transports_pace(&peer, &requirements);
2751
2752        // Should fall back to alternate
2753        assert_eq!(selected.len(), 1);
2754        assert_eq!(selected[0], "lora-915");
2755    }
2756
2757    #[test]
2758    fn test_get_peer_distance_none() {
2759        let config = TransportManagerConfig::default();
2760        let manager = TransportManager::new(config);
2761
2762        let peer = NodeId::new("unknown-peer".to_string());
2763        assert!(manager.get_peer_distance(&peer).is_none());
2764    }
2765
2766    #[tokio::test]
2767    async fn test_send_bypass_not_configured() {
2768        let config = TransportManagerConfig::default();
2769        let manager = TransportManager::new(config);
2770
2771        let result = manager.send_bypass("test_collection", b"hello", None).await;
2772        assert!(result.is_err());
2773        let err_msg = format!("{}", result.unwrap_err());
2774        assert!(err_msg.contains("Bypass channel not configured"));
2775    }
2776
2777    #[tokio::test]
2778    async fn test_send_bypass_to_not_configured() {
2779        let config = TransportManagerConfig::default();
2780        let manager = TransportManager::new(config);
2781
2782        let target = BypassTarget::Broadcast { port: 5150 };
2783        let result = manager
2784            .send_bypass_to(target, "test_collection", b"hello")
2785            .await;
2786        assert!(result.is_err());
2787        let err_msg = format!("{}", result.unwrap_err());
2788        assert!(err_msg.contains("Bypass channel not configured"));
2789    }
2790
2791    #[tokio::test]
2792    async fn test_subscribe_bypass_collection_not_configured() {
2793        let config = TransportManagerConfig::default();
2794        let manager = TransportManager::new(config);
2795
2796        let result = manager.subscribe_bypass_collection("test").await;
2797        assert!(result.is_err());
2798    }
2799
2800    // =========================================================================
2801    // CollectionRouteTable Tests
2802    // =========================================================================
2803
2804    #[test]
2805    fn test_route_table_empty_returns_none() {
2806        let table = CollectionRouteTable::new();
2807        assert!(table.get("anything").is_none());
2808        assert!(!table.has_collection("anything"));
2809        assert!(!table.is_bypass("anything"));
2810    }
2811
2812    #[test]
2813    fn test_route_table_builder_and_lookup() {
2814        let table = CollectionRouteTable::new()
2815            .with_collection(CollectionRouteConfig {
2816                collection: "telemetry".to_string(),
2817                route: CollectionTransportRoute::Fixed {
2818                    transport_type: TransportType::BluetoothLE,
2819                },
2820                priority: MessagePriority::Normal,
2821            })
2822            .with_collection(CollectionRouteConfig {
2823                collection: "position".to_string(),
2824                route: CollectionTransportRoute::Bypass {
2825                    encoding: MessageEncoding::Raw,
2826                    ttl_ms: 200,
2827                    bypass_transport: BypassTransport::Broadcast,
2828                },
2829                priority: MessagePriority::High,
2830            });
2831
2832        assert!(table.has_collection("telemetry"));
2833        assert!(table.has_collection("position"));
2834        assert!(!table.has_collection("unknown"));
2835
2836        let telemetry = table.get("telemetry").unwrap();
2837        assert!(matches!(
2838            telemetry.route,
2839            CollectionTransportRoute::Fixed {
2840                transport_type: TransportType::BluetoothLE
2841            }
2842        ));
2843        assert_eq!(telemetry.priority, MessagePriority::Normal);
2844
2845        let position = table.get("position").unwrap();
2846        assert_eq!(position.priority, MessagePriority::High);
2847    }
2848
2849    #[test]
2850    fn test_route_table_is_bypass() {
2851        let table = CollectionRouteTable::new()
2852            .with_collection(CollectionRouteConfig {
2853                collection: "bypass_col".to_string(),
2854                route: CollectionTransportRoute::Bypass {
2855                    encoding: MessageEncoding::Raw,
2856                    ttl_ms: 100,
2857                    bypass_transport: BypassTransport::Unicast,
2858                },
2859                priority: MessagePriority::Normal,
2860            })
2861            .with_collection(CollectionRouteConfig {
2862                collection: "fixed_col".to_string(),
2863                route: CollectionTransportRoute::Fixed {
2864                    transport_type: TransportType::Quic,
2865                },
2866                priority: MessagePriority::Normal,
2867            })
2868            .with_collection(CollectionRouteConfig {
2869                collection: "pace_col".to_string(),
2870                route: CollectionTransportRoute::Pace {
2871                    policy_override: None,
2872                },
2873                priority: MessagePriority::Normal,
2874            });
2875
2876        assert!(table.is_bypass("bypass_col"));
2877        assert!(!table.is_bypass("fixed_col"));
2878        assert!(!table.is_bypass("pace_col"));
2879        assert!(!table.is_bypass("nonexistent"));
2880    }
2881
2882    // =========================================================================
2883    // CollectionTransportRoute Serde Tests
2884    // =========================================================================
2885
2886    #[test]
2887    fn test_serde_fixed_route() {
2888        let route = CollectionTransportRoute::Fixed {
2889            transport_type: TransportType::BluetoothLE,
2890        };
2891        let json = serde_json::to_string(&route).unwrap();
2892        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2893        assert!(matches!(
2894            roundtrip,
2895            CollectionTransportRoute::Fixed {
2896                transport_type: TransportType::BluetoothLE
2897            }
2898        ));
2899    }
2900
2901    #[test]
2902    fn test_serde_bypass_route() {
2903        let route = CollectionTransportRoute::Bypass {
2904            encoding: MessageEncoding::Raw,
2905            ttl_ms: 500,
2906            bypass_transport: BypassTransport::Broadcast,
2907        };
2908        let json = serde_json::to_string(&route).unwrap();
2909        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2910        if let CollectionTransportRoute::Bypass {
2911            encoding,
2912            ttl_ms,
2913            bypass_transport,
2914        } = roundtrip
2915        {
2916            assert_eq!(encoding, MessageEncoding::Raw);
2917            assert_eq!(ttl_ms, 500);
2918            assert_eq!(bypass_transport, BypassTransport::Broadcast);
2919        } else {
2920            panic!("Expected Bypass variant");
2921        }
2922    }
2923
2924    #[test]
2925    fn test_serde_pace_route() {
2926        let route = CollectionTransportRoute::Pace {
2927            policy_override: None,
2928        };
2929        let json = serde_json::to_string(&route).unwrap();
2930        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2931        assert!(matches!(
2932            roundtrip,
2933            CollectionTransportRoute::Pace {
2934                policy_override: None
2935            }
2936        ));
2937    }
2938
2939    #[test]
2940    fn test_serde_pace_route_with_policy() {
2941        let policy = TransportPolicy::new("custom")
2942            .primary(vec!["ble-hci0"])
2943            .alternate(vec!["iroh-wlan0"]);
2944        let route = CollectionTransportRoute::Pace {
2945            policy_override: Some(policy),
2946        };
2947        let json = serde_json::to_string(&route).unwrap();
2948        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2949        if let CollectionTransportRoute::Pace {
2950            policy_override: Some(p),
2951        } = roundtrip
2952        {
2953            assert_eq!(p.name, "custom");
2954            assert_eq!(p.primary, vec!["ble-hci0"]);
2955            assert_eq!(p.alternate, vec!["iroh-wlan0"]);
2956        } else {
2957            panic!("Expected Pace with policy_override");
2958        }
2959    }
2960
2961    #[test]
2962    fn test_serde_collection_route_config() {
2963        let config = CollectionRouteConfig {
2964            collection: "sensors".to_string(),
2965            route: CollectionTransportRoute::Fixed {
2966                transport_type: TransportType::LoRa,
2967            },
2968            priority: MessagePriority::High,
2969        };
2970        let json = serde_json::to_string(&config).unwrap();
2971        let roundtrip: CollectionRouteConfig = serde_json::from_str(&json).unwrap();
2972        assert_eq!(roundtrip.collection, "sensors");
2973        assert_eq!(roundtrip.priority, MessagePriority::High);
2974    }
2975
2976    #[test]
2977    fn test_serde_collection_route_table() {
2978        let table = CollectionRouteTable::new()
2979            .with_collection(CollectionRouteConfig {
2980                collection: "a".to_string(),
2981                route: CollectionTransportRoute::Fixed {
2982                    transport_type: TransportType::Quic,
2983                },
2984                priority: MessagePriority::Normal,
2985            })
2986            .with_collection(CollectionRouteConfig {
2987                collection: "b".to_string(),
2988                route: CollectionTransportRoute::Bypass {
2989                    encoding: MessageEncoding::Json,
2990                    ttl_ms: 1000,
2991                    bypass_transport: BypassTransport::Unicast,
2992                },
2993                priority: MessagePriority::Critical,
2994            });
2995
2996        let json = serde_json::to_string(&table).unwrap();
2997        let roundtrip: CollectionRouteTable = serde_json::from_str(&json).unwrap();
2998        assert!(roundtrip.has_collection("a"));
2999        assert!(roundtrip.has_collection("b"));
3000        assert!(roundtrip.is_bypass("b"));
3001        assert!(!roundtrip.is_bypass("a"));
3002    }
3003
3004    #[test]
3005    fn test_serde_transport_type() {
3006        // Verify all variants round-trip through JSON
3007        let types = vec![
3008            TransportType::Quic,
3009            TransportType::BluetoothClassic,
3010            TransportType::BluetoothLE,
3011            TransportType::WifiDirect,
3012            TransportType::LoRa,
3013            TransportType::TacticalRadio,
3014            TransportType::Satellite,
3015            TransportType::Custom(42),
3016        ];
3017        for tt in types {
3018            let json = serde_json::to_string(&tt).unwrap();
3019            let roundtrip: TransportType = serde_json::from_str(&json).unwrap();
3020            assert_eq!(roundtrip, tt, "Failed round-trip for {:?}", tt);
3021        }
3022    }
3023
3024    // =========================================================================
3025    // route_collection() Tests
3026    // =========================================================================
3027
3028    #[test]
3029    fn test_route_collection_unknown_falls_through() {
3030        let config = TransportManagerConfig::default();
3031        let mut manager = TransportManager::new(config);
3032
3033        let peer = NodeId::new("peer-1".to_string());
3034        let quic =
3035            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3036        manager.register(quic);
3037
3038        let requirements = MessageRequirements::default();
3039        // Unknown collection falls through to route_message()
3040        let decision = manager.route_collection("unknown", &peer, &requirements);
3041        assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
3042    }
3043
3044    #[test]
3045    fn test_route_collection_fixed_routes_correctly() {
3046        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3047            collection: "telemetry".to_string(),
3048            route: CollectionTransportRoute::Fixed {
3049                transport_type: TransportType::BluetoothLE,
3050            },
3051            priority: MessagePriority::Normal,
3052        });
3053
3054        let config = TransportManagerConfig {
3055            collection_routes: table,
3056            ..Default::default()
3057        };
3058        let mut manager = TransportManager::new(config);
3059
3060        let peer = NodeId::new("peer-1".to_string());
3061        let ble = Arc::new(
3062            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3063        );
3064        manager.register(ble);
3065
3066        let requirements = MessageRequirements::default();
3067        let decision = manager.route_collection("telemetry", &peer, &requirements);
3068        assert_eq!(
3069            decision,
3070            RouteDecision::Transport(TransportType::BluetoothLE)
3071        );
3072    }
3073
3074    #[test]
3075    fn test_route_collection_fixed_unavailable_returns_no_route() {
3076        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3077            collection: "telemetry".to_string(),
3078            route: CollectionTransportRoute::Fixed {
3079                transport_type: TransportType::BluetoothLE,
3080            },
3081            priority: MessagePriority::Normal,
3082        });
3083
3084        let config = TransportManagerConfig {
3085            collection_routes: table,
3086            ..Default::default()
3087        };
3088        let mut manager = TransportManager::new(config);
3089
3090        let peer = NodeId::new("peer-1".to_string());
3091
3092        // Register BLE but make it unavailable
3093        let ble = Arc::new(
3094            MockTransport::new(TransportCapabilities::bluetooth_le())
3095                .with_peer(peer.clone())
3096                .unavailable(),
3097        );
3098        manager.register(ble);
3099
3100        let requirements = MessageRequirements::default();
3101        let decision = manager.route_collection("telemetry", &peer, &requirements);
3102        assert_eq!(decision, RouteDecision::NoRoute);
3103    }
3104
3105    #[test]
3106    fn test_route_collection_fixed_not_registered_returns_no_route() {
3107        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3108            collection: "telemetry".to_string(),
3109            route: CollectionTransportRoute::Fixed {
3110                transport_type: TransportType::BluetoothLE,
3111            },
3112            priority: MessagePriority::Normal,
3113        });
3114
3115        let config = TransportManagerConfig {
3116            collection_routes: table,
3117            ..Default::default()
3118        };
3119        let manager = TransportManager::new(config);
3120
3121        let peer = NodeId::new("peer-1".to_string());
3122        // No transports registered at all
3123        let requirements = MessageRequirements::default();
3124        let decision = manager.route_collection("telemetry", &peer, &requirements);
3125        assert_eq!(decision, RouteDecision::NoRoute);
3126    }
3127
3128    #[tokio::test]
3129    async fn test_route_collection_bypass_with_channel() {
3130        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3131            collection: "position".to_string(),
3132            route: CollectionTransportRoute::Bypass {
3133                encoding: MessageEncoding::Raw,
3134                ttl_ms: 200,
3135                bypass_transport: BypassTransport::Broadcast,
3136            },
3137            priority: MessagePriority::High,
3138        });
3139
3140        let config = TransportManagerConfig {
3141            collection_routes: table,
3142            ..Default::default()
3143        };
3144        let mut manager = TransportManager::new(config);
3145
3146        // Set up bypass channel
3147        let bypass_config = BypassChannelConfig::new();
3148        let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
3149        manager.set_bypass_channel(bypass);
3150
3151        let peer = NodeId::new("peer-1".to_string());
3152        let requirements = MessageRequirements::default();
3153        let decision = manager.route_collection("position", &peer, &requirements);
3154        assert_eq!(decision, RouteDecision::Bypass);
3155    }
3156
3157    #[test]
3158    fn test_route_collection_bypass_without_channel() {
3159        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3160            collection: "position".to_string(),
3161            route: CollectionTransportRoute::Bypass {
3162                encoding: MessageEncoding::Raw,
3163                ttl_ms: 200,
3164                bypass_transport: BypassTransport::Broadcast,
3165            },
3166            priority: MessagePriority::High,
3167        });
3168
3169        let config = TransportManagerConfig {
3170            collection_routes: table,
3171            ..Default::default()
3172        };
3173        let manager = TransportManager::new(config);
3174
3175        let peer = NodeId::new("peer-1".to_string());
3176        let requirements = MessageRequirements::default();
3177        // No bypass channel configured
3178        let decision = manager.route_collection("position", &peer, &requirements);
3179        assert_eq!(decision, RouteDecision::NoRoute);
3180    }
3181
3182    #[test]
3183    fn test_route_collection_pace_routes_correctly() {
3184        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3185
3186        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3187            collection: "sync_data".to_string(),
3188            route: CollectionTransportRoute::Pace {
3189                policy_override: None,
3190            },
3191            priority: MessagePriority::Normal,
3192        });
3193
3194        let config = TransportManagerConfig {
3195            collection_routes: table,
3196            default_policy: Some(policy),
3197            ..Default::default()
3198        };
3199        let manager = TransportManager::new(config);
3200
3201        let peer = NodeId::new("peer-1".to_string());
3202
3203        // Register the PACE instance
3204        let inst = TransportInstance::new(
3205            "iroh-eth0",
3206            TransportType::Quic,
3207            TransportCapabilities::quic(),
3208        );
3209        let t = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3210        manager.register_instance(inst, t);
3211
3212        let requirements = MessageRequirements::default();
3213        let decision = manager.route_collection("sync_data", &peer, &requirements);
3214        assert_eq!(
3215            decision,
3216            RouteDecision::TransportInstance("iroh-eth0".to_string())
3217        );
3218    }
3219
3220    #[test]
3221    fn test_route_collection_pace_no_available_returns_no_route() {
3222        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3223
3224        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3225            collection: "sync_data".to_string(),
3226            route: CollectionTransportRoute::Pace {
3227                policy_override: None,
3228            },
3229            priority: MessagePriority::Normal,
3230        });
3231
3232        let config = TransportManagerConfig {
3233            collection_routes: table,
3234            default_policy: Some(policy),
3235            ..Default::default()
3236        };
3237        let manager = TransportManager::new(config);
3238
3239        let peer = NodeId::new("peer-1".to_string());
3240        // No instances registered
3241        let requirements = MessageRequirements::default();
3242        let decision = manager.route_collection("sync_data", &peer, &requirements);
3243        assert_eq!(decision, RouteDecision::NoRoute);
3244    }
3245
3246    #[test]
3247    fn test_route_collection_pace_with_policy_override() {
3248        // Default policy points to non-existent transport
3249        let default_policy = TransportPolicy::new("default").primary(vec!["nonexistent"]);
3250
3251        // Override policy points to available transport
3252        let override_policy = TransportPolicy::new("override").primary(vec!["ble-hci0"]);
3253
3254        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3255            collection: "ble_data".to_string(),
3256            route: CollectionTransportRoute::Pace {
3257                policy_override: Some(override_policy),
3258            },
3259            priority: MessagePriority::Normal,
3260        });
3261
3262        let config = TransportManagerConfig {
3263            collection_routes: table,
3264            default_policy: Some(default_policy),
3265            ..Default::default()
3266        };
3267        let manager = TransportManager::new(config);
3268
3269        let peer = NodeId::new("peer-1".to_string());
3270        let inst = TransportInstance::new(
3271            "ble-hci0",
3272            TransportType::BluetoothLE,
3273            TransportCapabilities::bluetooth_le(),
3274        );
3275        let t = Arc::new(
3276            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3277        );
3278        manager.register_instance(inst, t);
3279
3280        let requirements = MessageRequirements::default();
3281        let decision = manager.route_collection("ble_data", &peer, &requirements);
3282        assert_eq!(
3283            decision,
3284            RouteDecision::TransportInstance("ble-hci0".to_string())
3285        );
3286    }
3287
3288    // =========================================================================
3289    // RouteDecision::TransportInstance Tests
3290    // =========================================================================
3291
3292    #[test]
3293    fn test_route_decision_transport_instance_variant() {
3294        let decision = RouteDecision::TransportInstance("iroh-eth0".to_string());
3295        assert_eq!(
3296            decision,
3297            RouteDecision::TransportInstance("iroh-eth0".to_string())
3298        );
3299        assert_ne!(decision, RouteDecision::Bypass);
3300        assert_ne!(decision, RouteDecision::NoRoute);
3301        assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
3302    }
3303
3304    #[test]
3305    fn test_route_decision_transport_instance_debug() {
3306        let decision = RouteDecision::TransportInstance("ble-hci0".to_string());
3307        let debug = format!("{:?}", decision);
3308        assert!(debug.contains("TransportInstance"));
3309        assert!(debug.contains("ble-hci0"));
3310    }
3311
3312    #[test]
3313    fn test_route_decision_transport_instance_clone() {
3314        let original = RouteDecision::TransportInstance("iroh-wlan0".to_string());
3315        let cloned = original.clone();
3316        assert_eq!(original, cloned);
3317    }
3318
3319    // =========================================================================
3320    // get_collection_route() Tests
3321    // =========================================================================
3322
3323    #[test]
3324    fn test_get_collection_route_found() {
3325        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3326            collection: "telemetry".to_string(),
3327            route: CollectionTransportRoute::Fixed {
3328                transport_type: TransportType::Quic,
3329            },
3330            priority: MessagePriority::High,
3331        });
3332
3333        let config = TransportManagerConfig {
3334            collection_routes: table,
3335            ..Default::default()
3336        };
3337        let manager = TransportManager::new(config);
3338
3339        let route = manager.get_collection_route("telemetry");
3340        assert!(route.is_some());
3341        assert_eq!(route.unwrap().collection, "telemetry");
3342        assert_eq!(route.unwrap().priority, MessagePriority::High);
3343    }
3344
3345    #[test]
3346    fn test_get_collection_route_not_found() {
3347        let config = TransportManagerConfig::default();
3348        let manager = TransportManager::new(config);
3349
3350        assert!(manager.get_collection_route("nonexistent").is_none());
3351    }
3352}