Skip to main content

peat_mesh/transport/
manager.rs

1//! Transport Manager for multi-transport coordination
2//!
3//! This module provides the `TransportManager` which coordinates multiple
4//! transport implementations, selecting the best one for each message
5//! based on requirements and current conditions.
6//!
7//! ## Architecture (ADR-032 + ADR-042)
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────────────┐
11//! │                           Application Layer                              │
12//! │         ┌────────────────────────────────────┐                           │
13//! │         │        Transport Manager           │ ◄── Transport Selection   │
14//! │         │   (Multi-Transport Coordinator)    │     Message Requirements  │
15//! │         └──────────────┬─────────────────────┘                           │
16//! │                        │                                                 │
17//! │    ┌───────────────────┼───────────────────────┐                         │
18//! │    ▼                   ▼              ▼        ▼                          │
19//! │ ┌──────────┐    ┌────────────┐ ┌──────────┐ ┌────────────┐               │
20//! │ │  UDP     │    │   QUIC     │ │ Bluetooth│ │   LoRa     │               │
21//! │ │ Bypass   │    │  (Iroh)    │ │    LE    │ │            │               │
22//! │ │(ADR-042) │    └────────────┘ └──────────┘ └────────────┘               │
23//! │ └──────────┘                                                              │
24//! └─────────────────────────────────────────────────────────────────────────┘
25//! ```
26//!
27//! ## Lock ordering
28//!
29//! `TransportManager` contains four sync `RwLock`s and one optional async
30//! `RwLock`:
31//!
32//! | Lock | Type | Protects |
33//! |------|------|----------|
34//! | `transport_instances` | `std::sync::RwLock` | PACE transport registry |
35//! | `peer_transports` | `std::sync::RwLock` | Legacy per-peer transport cache |
36//! | `peer_transport_ids` | `std::sync::RwLock` | PACE per-peer transport cache |
37//! | `peer_distances` | `std::sync::RwLock` | Distance estimates per peer |
38//! | `bypass_channel` | `tokio::sync::RwLock` (behind `Option<Arc<..>>`) | UDP bypass channel |
39//!
40//! **No method acquires more than one of these locks simultaneously.** Each
41//! public method reads or writes a single lock, completes its work, and
42//! releases the guard before returning or performing further I/O. This means
43//! there is no required ordering among the four locks and no deadlock risk
44//! within this module.
45//!
46//! **Invariant:** never hold any sync `RwLock` in this struct while awaiting
47//! `bypass_channel`. The sync guards are not `Send` and would block the async
48//! runtime if held across an `.await` point.
49//!
50//! `transports` (the legacy `HashMap<TransportType, Arc<dyn Transport>>`) is
51//! *not* behind a lock -- it is only mutated through `&mut self` methods
52//! (`register`, `unregister`) which require exclusive access at construction
53//! time.
54//!
55//! ## Example
56//!
57//! ```ignore
58//! use peat_mesh::transport::{
59//!     TransportManager, TransportManagerConfig,
60//!     MessageRequirements, MessagePriority, TransportType,
61//! };
62//!
63//! // Create manager with configuration
64//! let config = TransportManagerConfig::default();
65//! let mut manager = TransportManager::new(config);
66//!
67//! // Register transports
68//! manager.register(Arc::new(quic_transport));
69//! manager.register(Arc::new(ble_transport));
70//!
71//! // Select best transport for message
72//! let requirements = MessageRequirements {
73//!     reliable: true,
74//!     priority: MessagePriority::High,
75//!     ..Default::default()
76//! };
77//!
78//! if let Some(transport_type) = manager.select_transport(&peer_id, &requirements) {
79//!     println!("Selected transport: {}", transport_type);
80//! }
81//!
82//! // Send via bypass for low-latency delivery (ADR-042)
83//! let bypass_req = MessageRequirements::bypass(5);
84//! manager.send_bypass("position_updates", &position_bytes, None).await?;
85//! ```
86
87use std::collections::HashMap;
88use std::net::SocketAddr;
89use std::sync::{Arc, RwLock};
90
91use super::bypass::{
92    BypassMessage, BypassTarget, BypassTransport, MessageEncoding, UdpBypassChannel,
93};
94use super::capabilities::{
95    MessagePriority, MessageRequirements, PaceLevel, PeerDistance, RangeMode, Transport,
96    TransportId, TransportInstance, TransportMode, TransportPolicy, TransportType,
97};
98use super::{NodeId, Result, TransportError};
99use serde::{Deserialize, Serialize};
100use std::collections::HashSet;
101use tokio::sync::broadcast;
102use tokio::sync::RwLock as TokioRwLock;
103
104/// Storage type for registered transport instances
105type TransportInstanceMap = HashMap<TransportId, (TransportInstance, Arc<dyn Transport>)>;
106
107// =============================================================================
108// Transport Manager Configuration
109// =============================================================================
110
111/// Configuration for TransportManager
112#[derive(Debug, Clone)]
113pub struct TransportManagerConfig {
114    /// Transport preference order (first = highest preference)
115    /// Used for legacy TransportType-based selection
116    pub preference_order: Vec<TransportType>,
117
118    /// Enable automatic transport fallback on failure
119    pub enable_fallback: bool,
120
121    /// Cache transport selection per peer
122    pub cache_peer_transport: bool,
123
124    /// Minimum score difference to switch transports
125    pub switch_threshold: i32,
126
127    /// Default PACE policy for transport selection (ADR-032)
128    /// If set, takes precedence over preference_order
129    pub default_policy: Option<TransportPolicy>,
130
131    /// Transport mode (Single, Redundant, Bonded, LoadBalanced)
132    pub transport_mode: TransportMode,
133
134    /// Per-collection transport routing table
135    ///
136    /// Collections listed here are routed to their configured transport
137    /// instead of going through legacy scoring. Collections not in this
138    /// table fall through to `route_message()`.
139    pub collection_routes: CollectionRouteTable,
140}
141
142impl Default for TransportManagerConfig {
143    fn default() -> Self {
144        Self {
145            preference_order: vec![
146                TransportType::Quic,
147                TransportType::WifiDirect,
148                TransportType::BluetoothLE,
149                TransportType::LoRa,
150            ],
151            enable_fallback: true,
152            cache_peer_transport: true,
153            switch_threshold: 10,
154            default_policy: None,
155            transport_mode: TransportMode::Single,
156            collection_routes: CollectionRouteTable::default(),
157        }
158    }
159}
160
161impl TransportManagerConfig {
162    /// Create config with a PACE policy
163    pub fn with_policy(policy: TransportPolicy) -> Self {
164        Self {
165            default_policy: Some(policy),
166            ..Default::default()
167        }
168    }
169
170    /// Set the transport mode
171    pub fn with_mode(mut self, mode: TransportMode) -> Self {
172        self.transport_mode = mode;
173        self
174    }
175}
176
177// =============================================================================
178// Transport Manager
179// =============================================================================
180
181/// Manages multiple transports and handles transport selection
182///
183/// TransportManager coordinates multiple transport implementations,
184/// selecting the best one for each message based on:
185/// - Message requirements (reliability, latency, size)
186/// - Transport capabilities
187/// - Current availability and signal quality
188/// - PACE policy (Primary, Alternate, Contingency, Emergency)
189/// - Historical success with peer
190///
191/// Also manages the UDP bypass channel (ADR-042) for low-latency,
192/// high-frequency data that bypasses CRDT synchronization.
193///
194/// ## PACE Policy (ADR-032)
195///
196/// When a PACE policy is configured, transport selection follows:
197/// 1. Try primary transports first
198/// 2. Fall back to alternate if no primary available
199/// 3. Use contingency for degraded operation
200/// 4. Emergency as last resort
201///
202/// ```ignore
203/// let policy = TransportPolicy::new("tactical")
204///     .primary(vec!["iroh-eth0", "iroh-wlan0"])
205///     .alternate(vec!["iroh-starlink"])
206///     .contingency(vec!["lora-primary"])
207///     .emergency(vec!["ble-mesh"]);
208///
209/// let config = TransportManagerConfig::with_policy(policy);
210/// let manager = TransportManager::new(config);
211/// ```
212pub struct TransportManager {
213    /// Registered transports by type (legacy API)
214    transports: HashMap<TransportType, Arc<dyn Transport>>,
215
216    /// Registered transports by ID (ADR-032 PACE API)
217    transport_instances: RwLock<TransportInstanceMap>,
218
219    /// Active transport per peer (learned from successful deliveries)
220    peer_transports: RwLock<HashMap<NodeId, TransportType>>,
221
222    /// Active transport ID per peer (PACE-based)
223    peer_transport_ids: RwLock<HashMap<NodeId, TransportId>>,
224
225    /// Peer distance estimates
226    peer_distances: RwLock<HashMap<NodeId, PeerDistance>>,
227
228    /// Configuration
229    config: TransportManagerConfig,
230
231    /// UDP bypass channel for low-latency ephemeral data (ADR-042)
232    ///
233    /// When set, the manager can route messages with `bypass_sync: true`
234    /// through this channel instead of CRDT transports.
235    bypass_channel: Option<Arc<TokioRwLock<UdpBypassChannel>>>,
236}
237
238impl TransportManager {
239    /// Create a new TransportManager with the given configuration
240    pub fn new(config: TransportManagerConfig) -> Self {
241        Self {
242            transports: HashMap::new(),
243            transport_instances: RwLock::new(HashMap::new()),
244            peer_transports: RwLock::new(HashMap::new()),
245            peer_transport_ids: RwLock::new(HashMap::new()),
246            peer_distances: RwLock::new(HashMap::new()),
247            config,
248            bypass_channel: None,
249        }
250    }
251
252    /// Create a new TransportManager with bypass channel support (ADR-042)
253    pub fn with_bypass(config: TransportManagerConfig, bypass: UdpBypassChannel) -> Self {
254        Self {
255            transports: HashMap::new(),
256            transport_instances: RwLock::new(HashMap::new()),
257            peer_transports: RwLock::new(HashMap::new()),
258            peer_transport_ids: RwLock::new(HashMap::new()),
259            peer_distances: RwLock::new(HashMap::new()),
260            config,
261            bypass_channel: Some(Arc::new(TokioRwLock::new(bypass))),
262        }
263    }
264
265    /// Set the bypass channel after construction
266    pub fn set_bypass_channel(&mut self, bypass: UdpBypassChannel) {
267        self.bypass_channel = Some(Arc::new(TokioRwLock::new(bypass)));
268    }
269
270    /// Check if bypass channel is available
271    pub fn has_bypass_channel(&self) -> bool {
272        self.bypass_channel.is_some()
273    }
274
275    /// Check if a collection is configured for bypass
276    pub async fn is_bypass_collection(&self, collection: &str) -> bool {
277        if let Some(ref bypass) = self.bypass_channel {
278            bypass.read().await.is_bypass_collection(collection)
279        } else {
280            false
281        }
282    }
283
284    /// Register a transport
285    ///
286    /// The transport will be available for selection based on its capabilities.
287    pub fn register(&mut self, transport: Arc<dyn Transport>) {
288        let transport_type = transport.capabilities().transport_type;
289        self.transports.insert(transport_type, transport);
290    }
291
292    /// Unregister a transport
293    ///
294    /// Returns the removed transport, if it was registered.
295    pub fn unregister(&mut self, transport_type: TransportType) -> Option<Arc<dyn Transport>> {
296        self.transports.remove(&transport_type)
297    }
298
299    /// Get a registered transport by type
300    pub fn get_transport(&self, transport_type: TransportType) -> Option<&Arc<dyn Transport>> {
301        self.transports.get(&transport_type)
302    }
303
304    /// Get all registered transport types
305    pub fn registered_transports(&self) -> Vec<TransportType> {
306        self.transports.keys().copied().collect()
307    }
308
309    /// Get transports that are currently available and can reach the peer
310    pub fn available_transports(&self, peer_id: &NodeId) -> Vec<TransportType> {
311        self.transports
312            .iter()
313            .filter(|(_, t)| t.is_available() && t.can_reach(peer_id))
314            .map(|(tt, _)| *tt)
315            .collect()
316    }
317
318    // =========================================================================
319    // PACE Transport Instance API (ADR-032)
320    // =========================================================================
321
322    /// Register a transport instance by ID
323    ///
324    /// This is the preferred API for multi-instance transports (e.g., multiple NICs).
325    ///
326    /// # Arguments
327    ///
328    /// * `instance` - Transport instance metadata
329    /// * `transport` - The transport implementation
330    ///
331    /// # Example
332    ///
333    /// ```ignore
334    /// let instance = TransportInstance::new("iroh-eth0", TransportType::Quic, caps)
335    ///     .with_interface("eth0");
336    /// manager.register_instance(instance, Arc::new(transport));
337    /// ```
338    pub fn register_instance(&self, instance: TransportInstance, transport: Arc<dyn Transport>) {
339        let id = instance.id.clone();
340        self.transport_instances
341            .write()
342            .unwrap()
343            .insert(id, (instance, transport));
344    }
345
346    /// Unregister a transport instance by ID
347    pub fn unregister_instance(
348        &self,
349        id: &TransportId,
350    ) -> Option<(TransportInstance, Arc<dyn Transport>)> {
351        self.transport_instances
352            .write()
353            .unwrap_or_else(|e| e.into_inner())
354            .remove(id)
355    }
356
357    /// Get a transport instance by ID
358    pub fn get_instance(&self, id: &TransportId) -> Option<Arc<dyn Transport>> {
359        self.transport_instances
360            .read()
361            .unwrap()
362            .get(id)
363            .map(|(_, t)| Arc::clone(t))
364    }
365
366    /// Get all registered instance IDs
367    pub fn registered_instance_ids(&self) -> Vec<TransportId> {
368        self.transport_instances
369            .read()
370            .unwrap()
371            .keys()
372            .cloned()
373            .collect()
374    }
375
376    /// Get IDs of available transport instances
377    pub fn available_instance_ids(&self) -> HashSet<TransportId> {
378        self.transport_instances
379            .read()
380            .unwrap()
381            .iter()
382            .filter(|(_, (inst, transport))| inst.available && transport.is_available())
383            .map(|(id, _)| id.clone())
384            .collect()
385    }
386
387    /// Get IDs of available transports that can reach a peer
388    pub fn available_instances_for_peer(&self, peer_id: &NodeId) -> Vec<TransportId> {
389        self.transport_instances
390            .read()
391            .unwrap()
392            .iter()
393            .filter(|(_, (inst, transport))| {
394                inst.available && transport.is_available() && transport.can_reach(peer_id)
395            })
396            .map(|(id, _)| id.clone())
397            .collect()
398    }
399
400    /// Get the current PACE level based on available transports
401    ///
402    /// Returns the best PACE level for which at least one transport is available.
403    pub fn current_pace_level(&self) -> PaceLevel {
404        match &self.config.default_policy {
405            Some(policy) => policy.current_level(&self.available_instance_ids()),
406            None => {
407                // No policy - if any transport available, consider it "Primary"
408                if !self.available_instance_ids().is_empty() {
409                    PaceLevel::Primary
410                } else {
411                    PaceLevel::None
412                }
413            }
414        }
415    }
416
417    /// Select transport(s) using PACE policy
418    ///
419    /// Returns transport IDs in priority order based on PACE policy and availability.
420    /// The number of transports returned depends on the configured TransportMode:
421    /// - Single: Returns at most one transport
422    /// - Redundant: Returns multiple for simultaneous send
423    /// - LoadBalanced: Returns all available for distribution
424    ///
425    /// # Arguments
426    ///
427    /// * `peer_id` - Target peer
428    /// * `requirements` - Message requirements
429    ///
430    /// # Returns
431    ///
432    /// Vector of transport IDs in priority order
433    pub fn select_transports_pace(
434        &self,
435        peer_id: &NodeId,
436        requirements: &MessageRequirements,
437    ) -> Vec<TransportId> {
438        let policy = match &self.config.default_policy {
439            Some(p) => p,
440            None => return Vec::new(), // No PACE policy configured
441        };
442
443        let instances = self
444            .transport_instances
445            .read()
446            .unwrap_or_else(|e| e.into_inner());
447        let available_for_peer: HashSet<_> = instances
448            .iter()
449            .filter(|(_, (inst, transport))| {
450                inst.available
451                    && transport.is_available()
452                    && transport.can_reach(peer_id)
453                    && transport.capabilities().meets_requirements(requirements)
454            })
455            .map(|(id, _)| id.clone())
456            .collect();
457
458        // Get candidates in PACE order
459        let candidates: Vec<TransportId> = policy
460            .ordered()
461            .filter(|id| available_for_peer.contains(*id))
462            .cloned()
463            .collect();
464
465        // Apply transport mode
466        match &self.config.transport_mode {
467            TransportMode::Single => candidates.into_iter().take(1).collect(),
468            TransportMode::Redundant {
469                min_paths,
470                max_paths,
471            } => {
472                let min = *min_paths as usize;
473                let max = max_paths.map(|m| m as usize).unwrap_or(candidates.len());
474                candidates.into_iter().take(max.max(min)).collect()
475            }
476            TransportMode::Bonded => candidates, // All for bandwidth aggregation
477            TransportMode::LoadBalanced { .. } => candidates, // All for distribution
478        }
479    }
480
481    /// Select the best single transport using PACE policy
482    ///
483    /// Convenience wrapper that returns just the first (best) transport.
484    pub fn select_transport_pace(
485        &self,
486        peer_id: &NodeId,
487        requirements: &MessageRequirements,
488    ) -> Option<TransportId> {
489        self.select_transports_pace(peer_id, requirements)
490            .into_iter()
491            .next()
492    }
493
494    /// Record successful transport use for a peer (PACE version)
495    pub fn record_success_pace(&self, peer_id: &NodeId, transport_id: TransportId) {
496        if self.config.cache_peer_transport {
497            self.peer_transport_ids
498                .write()
499                .unwrap()
500                .insert(peer_id.clone(), transport_id);
501        }
502    }
503
504    /// Clear cached transport for a peer (PACE version)
505    pub fn clear_cache_pace(&self, peer_id: &NodeId) {
506        self.peer_transport_ids
507            .write()
508            .unwrap_or_else(|e| e.into_inner())
509            .remove(peer_id);
510    }
511
512    /// Select the best transport for a peer and message requirements
513    ///
514    /// Returns the transport type that best matches the requirements,
515    /// or `None` if no suitable transport is available.
516    ///
517    /// # Selection Algorithm
518    ///
519    /// 1. Filter transports by availability and reachability
520    /// 2. Filter by hard requirements (reliability, bandwidth, message size)
521    /// 3. Score remaining transports based on:
522    ///    - Latency (for high-priority messages)
523    ///    - Power consumption (if power-sensitive)
524    ///    - User preference order
525    ///    - Signal quality (for wireless)
526    /// 4. Return highest-scoring transport
527    pub fn select_transport(
528        &self,
529        peer_id: &NodeId,
530        requirements: &MessageRequirements,
531    ) -> Option<TransportType> {
532        // Check cache first if enabled
533        if self.config.cache_peer_transport {
534            if let Some(&cached) = self
535                .peer_transports
536                .read()
537                .unwrap_or_else(|e| e.into_inner())
538                .get(peer_id)
539            {
540                // Verify cached transport still valid
541                if let Some(transport) = self.transports.get(&cached) {
542                    if transport.is_available()
543                        && transport.can_reach(peer_id)
544                        && transport.capabilities().meets_requirements(requirements)
545                    {
546                        return Some(cached);
547                    }
548                }
549            }
550        }
551
552        // Find available transports that meet requirements
553        let candidates: Vec<_> = self
554            .available_transports(peer_id)
555            .into_iter()
556            .filter_map(|tt| {
557                let transport = self.transports.get(&tt)?;
558                let caps = transport.capabilities();
559
560                // Check hard requirements
561                if !caps.meets_requirements(requirements) {
562                    return None;
563                }
564
565                // Check latency requirement
566                if let Some(max_latency) = requirements.max_latency_ms {
567                    let est_delivery = transport.estimate_delivery_ms(requirements.message_size);
568                    if est_delivery > max_latency {
569                        return None;
570                    }
571                }
572
573                // Calculate preference bonus
574                let preference_bonus = self
575                    .config
576                    .preference_order
577                    .iter()
578                    .position(|&t| t == tt)
579                    .map(|idx| 20 - (idx as i32 * 5))
580                    .unwrap_or(0);
581
582                let score = transport.calculate_score(requirements, preference_bonus);
583                Some((tt, score))
584            })
585            .collect();
586
587        // Return highest-scoring transport
588        candidates
589            .into_iter()
590            .max_by_key(|(_, score)| *score)
591            .map(|(tt, _)| tt)
592    }
593
594    /// Select transport with distance-based range mode adaptation
595    ///
596    /// Returns the best transport type and optionally a recommended range mode
597    /// if the transport supports dynamic range configuration.
598    pub fn select_transport_for_distance(
599        &self,
600        peer_id: &NodeId,
601        requirements: &MessageRequirements,
602    ) -> Option<(TransportType, Option<RangeMode>)> {
603        let transport_type = self.select_transport(peer_id, requirements)?;
604
605        // Get distance estimate if available
606        let distance = self
607            .peer_distances
608            .read()
609            .unwrap()
610            .get(peer_id)
611            .map(|d| d.distance_meters);
612
613        // If we have a configurable transport, get recommended mode
614        let range_mode = if let Some(_dist) = distance {
615            // This would need runtime trait casting - for now return None
616            // In a full implementation, we'd use trait objects with downcast
617            None // Placeholder - see implementation note below
618        } else {
619            None
620        };
621
622        Some((transport_type, range_mode))
623    }
624
625    /// Record successful transport use for a peer
626    ///
627    /// This updates the peer transport cache for future selections.
628    pub fn record_success(&self, peer_id: &NodeId, transport_type: TransportType) {
629        if self.config.cache_peer_transport {
630            self.peer_transports
631                .write()
632                .unwrap()
633                .insert(peer_id.clone(), transport_type);
634        }
635    }
636
637    /// Clear cached transport for a peer
638    ///
639    /// Call this when a transport fails for a peer.
640    pub fn clear_cache(&self, peer_id: &NodeId) {
641        self.peer_transports
642            .write()
643            .unwrap_or_else(|e| e.into_inner())
644            .remove(peer_id);
645    }
646
647    /// Update distance estimate for a peer
648    pub fn update_peer_distance(&self, distance: PeerDistance) {
649        self.peer_distances
650            .write()
651            .unwrap()
652            .insert(distance.peer_id.clone(), distance);
653    }
654
655    /// Get current distance estimate for a peer
656    pub fn get_peer_distance(&self, peer_id: &NodeId) -> Option<PeerDistance> {
657        self.peer_distances
658            .read()
659            .unwrap_or_else(|e| e.into_inner())
660            .get(peer_id)
661            .cloned()
662    }
663
664    /// Connect to a peer using the best available transport
665    ///
666    /// This is a convenience method that selects the transport and connects.
667    pub async fn connect(
668        &self,
669        peer_id: &NodeId,
670        requirements: &MessageRequirements,
671    ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
672        let transport_type = self
673            .select_transport(peer_id, requirements)
674            .ok_or_else(|| {
675                TransportError::PeerNotFound(format!("No suitable transport for {}", peer_id))
676            })?;
677
678        let transport = self
679            .transports
680            .get(&transport_type)
681            .ok_or(TransportError::NotStarted)?;
682
683        let connection = transport.connect(peer_id).await?;
684
685        // Record successful connection
686        self.record_success(peer_id, transport_type);
687
688        Ok((transport_type, connection))
689    }
690
691    /// Connect with fallback to alternative transports
692    ///
693    /// Tries the primary transport first, then falls back to others if enabled.
694    pub async fn connect_with_fallback(
695        &self,
696        peer_id: &NodeId,
697        requirements: &MessageRequirements,
698    ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
699        // Get all candidate transports sorted by score
700        let candidates: Vec<_> = self
701            .available_transports(peer_id)
702            .into_iter()
703            .filter_map(|tt| {
704                let transport = self.transports.get(&tt)?;
705                if !transport.capabilities().meets_requirements(requirements) {
706                    return None;
707                }
708                let preference_bonus = self
709                    .config
710                    .preference_order
711                    .iter()
712                    .position(|&t| t == tt)
713                    .map(|idx| 20 - (idx as i32 * 5))
714                    .unwrap_or(0);
715                let score = transport.calculate_score(requirements, preference_bonus);
716                Some((tt, score))
717            })
718            .collect();
719
720        let mut sorted: Vec<_> = candidates;
721        sorted.sort_by(|a, b| b.1.cmp(&a.1)); // Sort descending by score
722
723        if sorted.is_empty() {
724            return Err(TransportError::PeerNotFound(format!(
725                "No suitable transport for {}",
726                peer_id
727            )));
728        }
729
730        let mut last_error = None;
731
732        for (transport_type, _) in sorted {
733            let transport = match self.transports.get(&transport_type) {
734                Some(t) => t,
735                None => continue,
736            };
737
738            match transport.connect(peer_id).await {
739                Ok(conn) => {
740                    self.record_success(peer_id, transport_type);
741                    return Ok((transport_type, conn));
742                }
743                Err(e) => {
744                    if !self.config.enable_fallback {
745                        return Err(e);
746                    }
747                    last_error = Some(e);
748                    self.clear_cache(peer_id);
749                }
750            }
751        }
752
753        Err(last_error.unwrap_or_else(|| {
754            TransportError::PeerNotFound(format!("All transports failed for {}", peer_id))
755        }))
756    }
757
758    // =========================================================================
759    // Bypass Channel Methods (ADR-042)
760    // =========================================================================
761
762    /// Send data via the UDP bypass channel
763    ///
764    /// Sends data directly via UDP, bypassing CRDT synchronization.
765    /// Use for high-frequency, low-latency, or ephemeral data.
766    ///
767    /// # Arguments
768    ///
769    /// * `collection` - Collection name (must be configured for bypass)
770    /// * `data` - Raw data to send (already serialized)
771    /// * `target` - Optional target for unicast; uses collection config if None
772    ///
773    /// # Returns
774    ///
775    /// * `Ok(())` - Data sent successfully
776    /// * `Err(TransportError)` - Send failed or bypass not available
777    ///
778    /// # Example
779    ///
780    /// ```ignore
781    /// // Send position update via bypass
782    /// manager.send_bypass("position_updates", &position_bytes, None).await?;
783    ///
784    /// // Send to specific peer via unicast
785    /// let target = "192.168.1.100:5150".parse().unwrap();
786    /// manager.send_bypass("commands", &cmd_bytes, Some(target)).await?;
787    /// ```
788    pub async fn send_bypass(
789        &self,
790        collection: &str,
791        data: &[u8],
792        target: Option<SocketAddr>,
793    ) -> Result<()> {
794        let bypass = self
795            .bypass_channel
796            .as_ref()
797            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
798
799        bypass
800            .read()
801            .await
802            .send_to_collection(collection, target, data)
803            .await
804            .map_err(|e| TransportError::Other(e.to_string().into()))
805    }
806
807    /// Send data via bypass channel with explicit target
808    ///
809    /// Lower-level method for sending to a specific target.
810    ///
811    /// # Arguments
812    ///
813    /// * `target` - Target address (unicast, multicast, or broadcast)
814    /// * `collection` - Collection name for header
815    /// * `data` - Raw data to send
816    pub async fn send_bypass_to(
817        &self,
818        target: BypassTarget,
819        collection: &str,
820        data: &[u8],
821    ) -> Result<()> {
822        let bypass = self
823            .bypass_channel
824            .as_ref()
825            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
826
827        bypass
828            .read()
829            .await
830            .send(target, collection, data)
831            .await
832            .map_err(|e| TransportError::Other(e.to_string().into()))
833    }
834
835    /// Subscribe to incoming bypass messages
836    ///
837    /// Returns a broadcast receiver for all incoming bypass channel messages.
838    ///
839    /// # Returns
840    ///
841    /// * `Ok(Receiver)` - Subscription successful
842    /// * `Err(TransportError)` - Bypass not available
843    ///
844    /// # Example
845    ///
846    /// ```ignore
847    /// let mut rx = manager.subscribe_bypass().await?;
848    /// while let Ok(msg) = rx.recv().await {
849    ///     println!("Bypass message from {}: {} bytes",
850    ///         msg.source, msg.data.len());
851    /// }
852    /// ```
853    pub async fn subscribe_bypass(&self) -> Result<broadcast::Receiver<BypassMessage>> {
854        let bypass = self
855            .bypass_channel
856            .as_ref()
857            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
858
859        Ok(bypass.read().await.subscribe())
860    }
861
862    /// Subscribe to bypass messages for a specific collection
863    ///
864    /// Returns the collection hash and a receiver. Filter received messages
865    /// by comparing `msg.collection_hash == hash`.
866    ///
867    /// # Arguments
868    ///
869    /// * `collection` - Collection name to subscribe to
870    ///
871    /// # Returns
872    ///
873    /// * `Ok((hash, Receiver))` - Subscription successful with collection hash
874    /// * `Err(TransportError)` - Bypass not available
875    pub async fn subscribe_bypass_collection(
876        &self,
877        collection: &str,
878    ) -> Result<(u32, broadcast::Receiver<BypassMessage>)> {
879        let bypass = self
880            .bypass_channel
881            .as_ref()
882            .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
883
884        Ok(bypass.read().await.subscribe_collection(collection))
885    }
886
887    // =========================================================================
888    // Per-Collection Transport Routing (M4 / ADR-032)
889    // =========================================================================
890
891    /// Route a message for a specific collection
892    ///
893    /// Looks up the collection in the route table and returns the appropriate
894    /// routing decision. If the collection is not configured, falls through
895    /// to `route_message()` for legacy scoring.
896    ///
897    /// # Arguments
898    ///
899    /// * `collection` - Collection name
900    /// * `peer_id` - Target peer
901    /// * `requirements` - Message requirements
902    ///
903    /// # Returns
904    ///
905    /// Routing decision for this collection's messages
906    pub fn route_collection(
907        &self,
908        collection: &str,
909        peer_id: &NodeId,
910        requirements: &MessageRequirements,
911    ) -> RouteDecision {
912        let route_config = match self.config.collection_routes.get(collection) {
913            Some(config) => config,
914            None => return self.route_message(peer_id, requirements),
915        };
916
917        match &route_config.route {
918            CollectionTransportRoute::Bypass { .. } => {
919                if self.bypass_channel.is_some() {
920                    RouteDecision::Bypass
921                } else {
922                    RouteDecision::NoRoute
923                }
924            }
925            CollectionTransportRoute::Fixed { transport_type } => {
926                // Check if the fixed transport is registered and can reach the peer
927                if let Some(transport) = self.transports.get(transport_type) {
928                    if transport.is_available() && transport.can_reach(peer_id) {
929                        RouteDecision::Transport(*transport_type)
930                    } else {
931                        RouteDecision::NoRoute
932                    }
933                } else {
934                    RouteDecision::NoRoute
935                }
936            }
937            CollectionTransportRoute::Pace { policy_override } => {
938                match self.select_transport_pace_with_policy(
939                    peer_id,
940                    requirements,
941                    policy_override.as_ref(),
942                ) {
943                    Some(id) => RouteDecision::TransportInstance(id),
944                    None => RouteDecision::NoRoute,
945                }
946            }
947        }
948    }
949
950    /// Select a transport instance using a specific or default PACE policy
951    ///
952    /// If `policy_override` is `Some`, uses that policy. Otherwise falls back
953    /// to the default policy from config.
954    fn select_transport_pace_with_policy(
955        &self,
956        peer_id: &NodeId,
957        requirements: &MessageRequirements,
958        policy_override: Option<&TransportPolicy>,
959    ) -> Option<TransportId> {
960        let policy = policy_override.or(self.config.default_policy.as_ref())?;
961
962        let instances = self
963            .transport_instances
964            .read()
965            .unwrap_or_else(|e| e.into_inner());
966        let available_for_peer: HashSet<_> = instances
967            .iter()
968            .filter(|(_, (inst, transport))| {
969                inst.available
970                    && transport.is_available()
971                    && transport.can_reach(peer_id)
972                    && transport.capabilities().meets_requirements(requirements)
973            })
974            .map(|(id, _)| id.clone())
975            .collect();
976
977        policy
978            .ordered()
979            .find(|id| available_for_peer.contains(*id))
980            .cloned()
981    }
982
983    /// Get the route configuration for a collection
984    pub fn get_collection_route(&self, collection: &str) -> Option<&CollectionRouteConfig> {
985        self.config.collection_routes.get(collection)
986    }
987
988    /// Route a message based on requirements
989    ///
990    /// If `requirements.bypass_sync` is `true` and bypass channel is available,
991    /// returns `RouteDecision::Bypass`. Otherwise returns the selected transport.
992    ///
993    /// # Arguments
994    ///
995    /// * `peer_id` - Target peer (ignored for bypass)
996    /// * `requirements` - Message requirements
997    ///
998    /// # Returns
999    ///
1000    /// Decision on how to route the message
1001    pub fn route_message(
1002        &self,
1003        peer_id: &NodeId,
1004        requirements: &MessageRequirements,
1005    ) -> RouteDecision {
1006        // Check if bypass is requested and available
1007        if requirements.bypass_sync && self.bypass_channel.is_some() {
1008            return RouteDecision::Bypass;
1009        }
1010        // Fall back to normal transport if bypass not available or not requested
1011
1012        // Select normal transport
1013        match self.select_transport(peer_id, requirements) {
1014            Some(transport_type) => RouteDecision::Transport(transport_type),
1015            None => RouteDecision::NoRoute,
1016        }
1017    }
1018}
1019
1020/// Routing decision for a message
1021#[derive(Debug, Clone, PartialEq, Eq)]
1022pub enum RouteDecision {
1023    /// Use UDP bypass channel
1024    Bypass,
1025    /// Use specified transport type (legacy scoring)
1026    Transport(TransportType),
1027    /// Use a specific transport instance (PACE selection result)
1028    TransportInstance(TransportId),
1029    /// No suitable route available
1030    NoRoute,
1031}
1032
1033// =============================================================================
1034// Per-Collection Transport Routing (M4 / ADR-032)
1035// =============================================================================
1036
1037/// Routing strategy for a specific collection
1038///
1039/// Determines how messages for a collection are routed to a transport.
1040/// This generalizes the `BypassCollectionConfig` pattern to all transports.
1041///
1042/// # Variants
1043///
1044/// - `Fixed` — Always use a specific transport type (e.g., Quic, BluetoothLE)
1045/// - `Bypass` — Route via UDP bypass channel
1046/// - `Pace` — Use PACE-based dynamic selection with optional policy override
1047///
1048/// # Example
1049///
1050/// ```
1051/// use peat_mesh::transport::{CollectionTransportRoute, TransportType};
1052///
1053/// // Fixed route to BLE
1054/// let ble_route = CollectionTransportRoute::Fixed {
1055///     transport_type: TransportType::BluetoothLE,
1056/// };
1057///
1058/// // PACE route with default policy
1059/// let pace_route = CollectionTransportRoute::Pace {
1060///     policy_override: None,
1061/// };
1062/// ```
1063#[derive(Debug, Clone, Serialize, Deserialize)]
1064#[serde(tag = "transport", rename_all = "snake_case")]
1065pub enum CollectionTransportRoute {
1066    /// Always use a specific transport type (e.g., Quic, BluetoothLE)
1067    Fixed { transport_type: TransportType },
1068    /// Route via UDP bypass channel
1069    Bypass {
1070        encoding: MessageEncoding,
1071        ttl_ms: u64,
1072        bypass_transport: BypassTransport,
1073    },
1074    /// Use PACE-based dynamic selection
1075    Pace {
1076        policy_override: Option<TransportPolicy>,
1077    },
1078}
1079
1080/// Per-collection routing entry
1081///
1082/// Binds a collection name to a routing strategy and message priority.
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084pub struct CollectionRouteConfig {
1085    /// Collection name (e.g., "position_updates", "sensor_data")
1086    pub collection: String,
1087    /// Routing strategy for this collection
1088    pub route: CollectionTransportRoute,
1089    /// Default message priority for this collection
1090    pub priority: MessagePriority,
1091}
1092
1093/// Lookup table for per-collection transport routing
1094///
1095/// Maps collection names to their transport routing configuration.
1096/// Collections not in this table fall through to legacy scoring
1097/// via `route_message()`.
1098///
1099/// # Example
1100///
1101/// ```
1102/// use peat_mesh::transport::{
1103///     CollectionRouteTable, CollectionRouteConfig, CollectionTransportRoute,
1104///     TransportType, MessagePriority,
1105/// };
1106///
1107/// let table = CollectionRouteTable::new()
1108///     .with_collection(CollectionRouteConfig {
1109///         collection: "telemetry".to_string(),
1110///         route: CollectionTransportRoute::Fixed {
1111///             transport_type: TransportType::BluetoothLE,
1112///         },
1113///         priority: MessagePriority::Normal,
1114///     });
1115///
1116/// assert!(table.has_collection("telemetry"));
1117/// assert!(!table.has_collection("unknown"));
1118/// ```
1119#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1120pub struct CollectionRouteTable {
1121    collections: Vec<CollectionRouteConfig>,
1122}
1123
1124impl CollectionRouteTable {
1125    /// Create an empty route table
1126    pub fn new() -> Self {
1127        Self::default()
1128    }
1129
1130    /// Add a collection route configuration (builder pattern)
1131    pub fn with_collection(mut self, config: CollectionRouteConfig) -> Self {
1132        self.collections.push(config);
1133        self
1134    }
1135
1136    /// Get the route config for a collection
1137    pub fn get(&self, collection: &str) -> Option<&CollectionRouteConfig> {
1138        self.collections.iter().find(|c| c.collection == collection)
1139    }
1140
1141    /// Check if a collection has a route configured
1142    pub fn has_collection(&self, collection: &str) -> bool {
1143        self.collections.iter().any(|c| c.collection == collection)
1144    }
1145
1146    /// Check if a collection is configured for bypass routing
1147    pub fn is_bypass(&self, collection: &str) -> bool {
1148        self.get(collection)
1149            .map(|c| matches!(c.route, CollectionTransportRoute::Bypass { .. }))
1150            .unwrap_or(false)
1151    }
1152}
1153
1154impl std::fmt::Debug for TransportManager {
1155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1156        f.debug_struct("TransportManager")
1157            .field("transports", &self.transports.keys().collect::<Vec<_>>())
1158            .field("config", &self.config)
1159            .finish()
1160    }
1161}
1162
1163// =============================================================================
1164// Tests
1165// =============================================================================
1166
1167#[cfg(test)]
1168mod tests {
1169    use super::*;
1170    use crate::transport::bypass::{BypassChannelConfig, UdpBypassChannel};
1171    use crate::transport::capabilities::{MessagePriority, TransportCapabilities};
1172    use crate::transport::{MeshConnection, MeshTransport, PeerEventReceiver};
1173    use async_trait::async_trait;
1174    use std::time::Instant;
1175    use tokio::sync::mpsc;
1176
1177    // Mock transport for testing
1178    struct MockTransport {
1179        caps: TransportCapabilities,
1180        available: bool,
1181        reachable_peers: Vec<NodeId>,
1182        signal: Option<u8>,
1183    }
1184
1185    impl MockTransport {
1186        fn new(caps: TransportCapabilities) -> Self {
1187            Self {
1188                caps,
1189                available: true,
1190                reachable_peers: vec![],
1191                signal: None,
1192            }
1193        }
1194
1195        fn with_peer(mut self, peer: NodeId) -> Self {
1196            self.reachable_peers.push(peer);
1197            self
1198        }
1199
1200        #[allow(dead_code)]
1201        fn with_signal(mut self, signal: u8) -> Self {
1202            self.signal = Some(signal);
1203            self
1204        }
1205
1206        fn unavailable(mut self) -> Self {
1207            self.available = false;
1208            self
1209        }
1210    }
1211
1212    struct MockConnection {
1213        peer_id: NodeId,
1214        connected_at: Instant,
1215    }
1216
1217    impl MeshConnection for MockConnection {
1218        fn peer_id(&self) -> &NodeId {
1219            &self.peer_id
1220        }
1221
1222        fn is_alive(&self) -> bool {
1223            true
1224        }
1225
1226        fn connected_at(&self) -> Instant {
1227            self.connected_at
1228        }
1229    }
1230
1231    #[async_trait]
1232    impl MeshTransport for MockTransport {
1233        async fn start(&self) -> Result<()> {
1234            Ok(())
1235        }
1236
1237        async fn stop(&self) -> Result<()> {
1238            Ok(())
1239        }
1240
1241        async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>> {
1242            if self.reachable_peers.contains(peer_id) {
1243                Ok(Box::new(MockConnection {
1244                    peer_id: peer_id.clone(),
1245                    connected_at: Instant::now(),
1246                }))
1247            } else {
1248                Err(TransportError::PeerNotFound(peer_id.to_string()))
1249            }
1250        }
1251
1252        async fn disconnect(&self, _peer_id: &NodeId) -> Result<()> {
1253            Ok(())
1254        }
1255
1256        fn get_connection(&self, _peer_id: &NodeId) -> Option<Box<dyn MeshConnection>> {
1257            None
1258        }
1259
1260        fn peer_count(&self) -> usize {
1261            0
1262        }
1263
1264        fn connected_peers(&self) -> Vec<NodeId> {
1265            vec![]
1266        }
1267
1268        fn subscribe_peer_events(&self) -> PeerEventReceiver {
1269            let (_tx, rx) = mpsc::channel(1);
1270            rx
1271        }
1272    }
1273
1274    impl Transport for MockTransport {
1275        fn capabilities(&self) -> &TransportCapabilities {
1276            &self.caps
1277        }
1278
1279        fn is_available(&self) -> bool {
1280            self.available
1281        }
1282
1283        fn signal_quality(&self) -> Option<u8> {
1284            self.signal
1285        }
1286
1287        fn can_reach(&self, peer_id: &NodeId) -> bool {
1288            self.reachable_peers.contains(peer_id)
1289        }
1290    }
1291
1292    #[test]
1293    fn test_register_transport() {
1294        let config = TransportManagerConfig::default();
1295        let mut manager = TransportManager::new(config);
1296
1297        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1298        manager.register(transport);
1299
1300        assert!(manager.get_transport(TransportType::Quic).is_some());
1301        assert!(manager.get_transport(TransportType::LoRa).is_none());
1302    }
1303
1304    #[test]
1305    fn test_unregister_transport() {
1306        let config = TransportManagerConfig::default();
1307        let mut manager = TransportManager::new(config);
1308
1309        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1310        manager.register(transport);
1311
1312        let removed = manager.unregister(TransportType::Quic);
1313        assert!(removed.is_some());
1314        assert!(manager.get_transport(TransportType::Quic).is_none());
1315    }
1316
1317    #[test]
1318    fn test_available_transports() {
1319        let config = TransportManagerConfig::default();
1320        let mut manager = TransportManager::new(config);
1321
1322        let peer = NodeId::new("peer-1".to_string());
1323
1324        // QUIC available and can reach peer
1325        let quic =
1326            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1327        manager.register(quic);
1328
1329        // BLE available but can't reach peer
1330        let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1331        manager.register(ble);
1332
1333        // LoRa unavailable
1334        let lora = Arc::new(
1335            MockTransport::new(TransportCapabilities::lora(7))
1336                .unavailable()
1337                .with_peer(peer.clone()),
1338        );
1339        manager.register(lora);
1340
1341        let available = manager.available_transports(&peer);
1342        assert_eq!(available.len(), 1);
1343        assert!(available.contains(&TransportType::Quic));
1344    }
1345
1346    #[test]
1347    fn test_select_transport_by_reliability() {
1348        let config = TransportManagerConfig::default();
1349        let mut manager = TransportManager::new(config);
1350
1351        let peer = NodeId::new("peer-1".to_string());
1352
1353        // QUIC is reliable
1354        let quic =
1355            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1356        manager.register(quic);
1357
1358        // LoRa is not reliable by default
1359        let lora =
1360            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1361        manager.register(lora);
1362
1363        // Require reliability
1364        let requirements = MessageRequirements {
1365            reliable: true,
1366            ..Default::default()
1367        };
1368
1369        let selected = manager.select_transport(&peer, &requirements);
1370        assert_eq!(selected, Some(TransportType::Quic));
1371    }
1372
1373    #[test]
1374    fn test_select_transport_by_preference() {
1375        let config = TransportManagerConfig {
1376            preference_order: vec![TransportType::BluetoothLE, TransportType::Quic],
1377            ..Default::default()
1378        };
1379        let mut manager = TransportManager::new(config);
1380
1381        let peer = NodeId::new("peer-1".to_string());
1382
1383        // Both transports available
1384        let quic =
1385            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1386        manager.register(quic);
1387
1388        let ble = Arc::new(
1389            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1390        );
1391        manager.register(ble);
1392
1393        let requirements = MessageRequirements::default();
1394        let selected = manager.select_transport(&peer, &requirements);
1395
1396        // BLE preferred over QUIC in this config
1397        assert_eq!(selected, Some(TransportType::BluetoothLE));
1398    }
1399
1400    #[test]
1401    fn test_select_transport_by_latency() {
1402        let config = TransportManagerConfig::default();
1403        let mut manager = TransportManager::new(config);
1404
1405        let peer = NodeId::new("peer-1".to_string());
1406
1407        // QUIC has 10ms latency
1408        let quic =
1409            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1410        manager.register(quic);
1411
1412        // LoRa has 100ms+ latency
1413        let mut lora_caps = TransportCapabilities::lora(7);
1414        lora_caps.reliable = true; // Make it reliable for this test
1415        let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1416        manager.register(lora);
1417
1418        // High priority message - should prefer low latency
1419        let requirements = MessageRequirements {
1420            priority: MessagePriority::High,
1421            reliable: true,
1422            ..Default::default()
1423        };
1424
1425        let selected = manager.select_transport(&peer, &requirements);
1426        assert_eq!(selected, Some(TransportType::Quic));
1427    }
1428
1429    #[test]
1430    fn test_select_transport_with_latency_requirement() {
1431        let config = TransportManagerConfig::default();
1432        let mut manager = TransportManager::new(config);
1433
1434        let peer = NodeId::new("peer-1".to_string());
1435
1436        // QUIC: 10ms latency
1437        let quic =
1438            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1439        manager.register(quic);
1440
1441        // LoRa SF12: ~1000ms latency
1442        let mut lora_caps = TransportCapabilities::lora(12);
1443        lora_caps.reliable = true;
1444        let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1445        manager.register(lora);
1446
1447        // Strict latency requirement - should exclude LoRa
1448        let requirements = MessageRequirements {
1449            reliable: true,
1450            max_latency_ms: Some(50),
1451            ..Default::default()
1452        };
1453
1454        let selected = manager.select_transport(&peer, &requirements);
1455        assert_eq!(selected, Some(TransportType::Quic));
1456    }
1457
1458    #[test]
1459    fn test_select_transport_no_match() {
1460        let config = TransportManagerConfig::default();
1461        let mut manager = TransportManager::new(config);
1462
1463        let peer = NodeId::new("peer-1".to_string());
1464
1465        // Only unreliable LoRa available
1466        let lora =
1467            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1468        manager.register(lora);
1469
1470        // Require reliability
1471        let requirements = MessageRequirements {
1472            reliable: true,
1473            ..Default::default()
1474        };
1475
1476        let selected = manager.select_transport(&peer, &requirements);
1477        assert_eq!(selected, None);
1478    }
1479
1480    #[test]
1481    fn test_peer_transport_caching() {
1482        let config = TransportManagerConfig {
1483            cache_peer_transport: true,
1484            ..Default::default()
1485        };
1486        let mut manager = TransportManager::new(config);
1487
1488        let peer = NodeId::new("peer-1".to_string());
1489
1490        let quic =
1491            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1492        manager.register(quic);
1493
1494        let ble = Arc::new(
1495            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1496        );
1497        manager.register(ble);
1498
1499        // Record BLE success
1500        manager.record_success(&peer, TransportType::BluetoothLE);
1501
1502        // Should return cached BLE even though QUIC might score higher
1503        let requirements = MessageRequirements::default();
1504        let selected = manager.select_transport(&peer, &requirements);
1505        assert_eq!(selected, Some(TransportType::BluetoothLE));
1506
1507        // Clear cache
1508        manager.clear_cache(&peer);
1509
1510        // Now should select based on score
1511        let selected = manager.select_transport(&peer, &requirements);
1512        // With default preference order, QUIC should be selected
1513        assert_eq!(selected, Some(TransportType::Quic));
1514    }
1515
1516    #[test]
1517    fn test_power_sensitive_selection() {
1518        // Use empty preference order so only power consumption matters
1519        let config = TransportManagerConfig {
1520            preference_order: vec![],
1521            ..Default::default()
1522        };
1523        let mut manager = TransportManager::new(config);
1524
1525        let peer = NodeId::new("peer-1".to_string());
1526
1527        // QUIC: 20 battery impact
1528        let quic =
1529            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1530        manager.register(quic);
1531
1532        // BLE: 15 battery impact (more efficient)
1533        let ble = Arc::new(
1534            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1535        );
1536        manager.register(ble);
1537
1538        // Power-sensitive requirement
1539        let requirements = MessageRequirements {
1540            power_sensitive: true,
1541            ..Default::default()
1542        };
1543
1544        let selected = manager.select_transport(&peer, &requirements);
1545        // BLE should be preferred due to lower power consumption
1546        assert_eq!(selected, Some(TransportType::BluetoothLE));
1547    }
1548
1549    #[tokio::test]
1550    async fn test_connect_selects_transport() {
1551        let config = TransportManagerConfig::default();
1552        let mut manager = TransportManager::new(config);
1553
1554        let peer = NodeId::new("peer-1".to_string());
1555
1556        let quic =
1557            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1558        manager.register(quic);
1559
1560        let requirements = MessageRequirements::default();
1561        let result = manager.connect(&peer, &requirements).await;
1562
1563        assert!(result.is_ok());
1564        let (transport_type, conn) = result.unwrap();
1565        assert_eq!(transport_type, TransportType::Quic);
1566        assert_eq!(conn.peer_id(), &peer);
1567    }
1568
1569    #[tokio::test]
1570    async fn test_connect_with_fallback() {
1571        let config = TransportManagerConfig {
1572            enable_fallback: true,
1573            ..Default::default()
1574        };
1575        let mut manager = TransportManager::new(config);
1576
1577        let peer = NodeId::new("peer-1".to_string());
1578
1579        // QUIC can't reach peer
1580        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1581        manager.register(quic);
1582
1583        // BLE can reach peer
1584        let ble = Arc::new(
1585            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1586        );
1587        manager.register(ble);
1588
1589        let requirements = MessageRequirements::default();
1590        let result = manager.connect_with_fallback(&peer, &requirements).await;
1591
1592        assert!(result.is_ok());
1593        let (transport_type, _) = result.unwrap();
1594        assert_eq!(transport_type, TransportType::BluetoothLE);
1595    }
1596
1597    #[test]
1598    fn test_distance_tracking() {
1599        let config = TransportManagerConfig::default();
1600        let manager = TransportManager::new(config);
1601
1602        let peer = NodeId::new("peer-1".to_string());
1603
1604        let distance = PeerDistance {
1605            peer_id: peer.clone(),
1606            distance_meters: 500,
1607            source: super::super::capabilities::DistanceSource::Gps {
1608                confidence_meters: 10,
1609            },
1610            last_updated: Instant::now(),
1611        };
1612
1613        manager.update_peer_distance(distance);
1614
1615        let retrieved = manager.get_peer_distance(&peer);
1616        assert!(retrieved.is_some());
1617        assert_eq!(retrieved.unwrap().distance_meters, 500);
1618    }
1619
1620    // =========================================================================
1621    // Bypass Integration Tests (ADR-042)
1622    // =========================================================================
1623
1624    #[tokio::test]
1625    async fn test_no_bypass_channel_by_default() {
1626        let config = TransportManagerConfig::default();
1627        let manager = TransportManager::new(config);
1628
1629        assert!(!manager.has_bypass_channel());
1630        assert!(!manager.is_bypass_collection("test").await);
1631    }
1632
1633    #[test]
1634    fn test_route_message_without_bypass() {
1635        let config = TransportManagerConfig::default();
1636        let mut manager = TransportManager::new(config);
1637
1638        let peer = NodeId::new("peer-1".to_string());
1639
1640        let quic =
1641            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1642        manager.register(quic);
1643
1644        // Normal requirements - should select transport
1645        let requirements = MessageRequirements::default();
1646        let decision = manager.route_message(&peer, &requirements);
1647        assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1648
1649        // Bypass requested but not available - should fall back to transport
1650        // Note: We use a generous latency (100ms) so QUIC (10ms) can be selected
1651        let bypass_req = MessageRequirements {
1652            bypass_sync: true,
1653            max_latency_ms: Some(100), // QUIC has 10ms typical latency
1654            ..Default::default()
1655        };
1656        let decision = manager.route_message(&peer, &bypass_req);
1657        // Falls back to QUIC since bypass not available
1658        assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1659    }
1660
1661    #[tokio::test]
1662    async fn test_subscribe_bypass_not_configured() {
1663        let config = TransportManagerConfig::default();
1664        let manager = TransportManager::new(config);
1665
1666        let result = manager.subscribe_bypass().await;
1667        assert!(result.is_err());
1668    }
1669
1670    #[test]
1671    fn test_route_decision_equality() {
1672        assert_eq!(RouteDecision::Bypass, RouteDecision::Bypass);
1673        assert_eq!(
1674            RouteDecision::Transport(TransportType::Quic),
1675            RouteDecision::Transport(TransportType::Quic)
1676        );
1677        assert_ne!(RouteDecision::Bypass, RouteDecision::NoRoute);
1678        assert_ne!(
1679            RouteDecision::Transport(TransportType::Quic),
1680            RouteDecision::Transport(TransportType::LoRa)
1681        );
1682    }
1683
1684    // =========================================================================
1685    // PACE Instance API Tests
1686    // =========================================================================
1687
1688    #[test]
1689    fn test_register_instance() {
1690        let config = TransportManagerConfig::default();
1691        let manager = TransportManager::new(config);
1692
1693        let peer = NodeId::new("peer-1".to_string());
1694        let instance = TransportInstance::new(
1695            "iroh-eth0",
1696            TransportType::Quic,
1697            TransportCapabilities::quic(),
1698        );
1699        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer));
1700
1701        manager.register_instance(instance, transport);
1702
1703        assert!(manager.get_instance(&"iroh-eth0".to_string()).is_some());
1704        assert!(manager.get_instance(&"nonexistent".to_string()).is_none());
1705    }
1706
1707    #[test]
1708    fn test_unregister_instance() {
1709        let config = TransportManagerConfig::default();
1710        let manager = TransportManager::new(config);
1711
1712        let instance = TransportInstance::new(
1713            "iroh-eth0",
1714            TransportType::Quic,
1715            TransportCapabilities::quic(),
1716        );
1717        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1718
1719        manager.register_instance(instance, transport);
1720
1721        let removed = manager.unregister_instance(&"iroh-eth0".to_string());
1722        assert!(removed.is_some());
1723        let (inst, _) = removed.unwrap();
1724        assert_eq!(inst.id, "iroh-eth0");
1725
1726        // Should be gone now
1727        assert!(manager.get_instance(&"iroh-eth0".to_string()).is_none());
1728
1729        // Unregistering again returns None
1730        let removed_again = manager.unregister_instance(&"iroh-eth0".to_string());
1731        assert!(removed_again.is_none());
1732    }
1733
1734    #[test]
1735    fn test_registered_instance_ids() {
1736        let config = TransportManagerConfig::default();
1737        let manager = TransportManager::new(config);
1738
1739        // Empty initially
1740        assert!(manager.registered_instance_ids().is_empty());
1741
1742        let inst1 = TransportInstance::new(
1743            "iroh-eth0",
1744            TransportType::Quic,
1745            TransportCapabilities::quic(),
1746        );
1747        let inst2 = TransportInstance::new(
1748            "lora-915",
1749            TransportType::LoRa,
1750            TransportCapabilities::lora(7),
1751        );
1752
1753        manager.register_instance(
1754            inst1,
1755            Arc::new(MockTransport::new(TransportCapabilities::quic())),
1756        );
1757        manager.register_instance(
1758            inst2,
1759            Arc::new(MockTransport::new(TransportCapabilities::lora(7))),
1760        );
1761
1762        let ids = manager.registered_instance_ids();
1763        assert_eq!(ids.len(), 2);
1764        assert!(ids.contains(&"iroh-eth0".to_string()));
1765        assert!(ids.contains(&"lora-915".to_string()));
1766    }
1767
1768    #[test]
1769    fn test_available_instance_ids() {
1770        let config = TransportManagerConfig::default();
1771        let manager = TransportManager::new(config);
1772
1773        // Available instance
1774        let inst1 = TransportInstance::new(
1775            "iroh-eth0",
1776            TransportType::Quic,
1777            TransportCapabilities::quic(),
1778        );
1779        let transport1 = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1780        manager.register_instance(inst1, transport1);
1781
1782        // Unavailable instance (transport unavailable)
1783        let inst2 = TransportInstance::new(
1784            "lora-off",
1785            TransportType::LoRa,
1786            TransportCapabilities::lora(7),
1787        );
1788        let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)).unavailable());
1789        manager.register_instance(inst2, transport2);
1790
1791        // Unavailable instance (instance.available = false)
1792        let mut inst3 = TransportInstance::new(
1793            "ble-disabled",
1794            TransportType::BluetoothLE,
1795            TransportCapabilities::bluetooth_le(),
1796        );
1797        inst3.available = false;
1798        let transport3 = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1799        manager.register_instance(inst3, transport3);
1800
1801        let available = manager.available_instance_ids();
1802        assert_eq!(available.len(), 1);
1803        assert!(available.contains("iroh-eth0"));
1804    }
1805
1806    #[test]
1807    fn test_available_instances_for_peer() {
1808        let config = TransportManagerConfig::default();
1809        let manager = TransportManager::new(config);
1810
1811        let peer = NodeId::new("peer-1".to_string());
1812
1813        // Instance that can reach peer
1814        let inst1 = TransportInstance::new(
1815            "iroh-eth0",
1816            TransportType::Quic,
1817            TransportCapabilities::quic(),
1818        );
1819        let transport1 =
1820            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1821        manager.register_instance(inst1, transport1);
1822
1823        // Instance that cannot reach peer
1824        let inst2 = TransportInstance::new(
1825            "lora-915",
1826            TransportType::LoRa,
1827            TransportCapabilities::lora(7),
1828        );
1829        let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1830        manager.register_instance(inst2, transport2);
1831
1832        // Unavailable instance that could reach peer
1833        let inst3 = TransportInstance::new(
1834            "ble-off",
1835            TransportType::BluetoothLE,
1836            TransportCapabilities::bluetooth_le(),
1837        );
1838        let transport3 = Arc::new(
1839            MockTransport::new(TransportCapabilities::bluetooth_le())
1840                .with_peer(peer.clone())
1841                .unavailable(),
1842        );
1843        manager.register_instance(inst3, transport3);
1844
1845        let for_peer = manager.available_instances_for_peer(&peer);
1846        assert_eq!(for_peer.len(), 1);
1847        assert_eq!(for_peer[0], "iroh-eth0");
1848    }
1849
1850    // =========================================================================
1851    // current_pace_level() Tests
1852    // =========================================================================
1853
1854    #[test]
1855    fn test_current_pace_level_no_policy_with_available() {
1856        let config = TransportManagerConfig::default();
1857        let manager = TransportManager::new(config);
1858
1859        // Register an available instance
1860        let inst = TransportInstance::new(
1861            "iroh-eth0",
1862            TransportType::Quic,
1863            TransportCapabilities::quic(),
1864        );
1865        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1866        manager.register_instance(inst, transport);
1867
1868        // No policy: if any transport available, returns Primary
1869        assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1870    }
1871
1872    #[test]
1873    fn test_current_pace_level_no_policy_none_available() {
1874        let config = TransportManagerConfig::default();
1875        let manager = TransportManager::new(config);
1876
1877        // No instances at all
1878        assert_eq!(manager.current_pace_level(), PaceLevel::None);
1879    }
1880
1881    #[test]
1882    fn test_current_pace_level_no_policy_all_unavailable() {
1883        let config = TransportManagerConfig::default();
1884        let manager = TransportManager::new(config);
1885
1886        // Register an unavailable instance
1887        let inst = TransportInstance::new(
1888            "iroh-eth0",
1889            TransportType::Quic,
1890            TransportCapabilities::quic(),
1891        );
1892        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).unavailable());
1893        manager.register_instance(inst, transport);
1894
1895        assert_eq!(manager.current_pace_level(), PaceLevel::None);
1896    }
1897
1898    #[test]
1899    fn test_current_pace_level_with_policy_primary() {
1900        let policy = TransportPolicy::new("test")
1901            .primary(vec!["iroh-eth0"])
1902            .alternate(vec!["lora-915"])
1903            .emergency(vec!["ble-mesh"]);
1904
1905        let config = TransportManagerConfig::with_policy(policy);
1906        let manager = TransportManager::new(config);
1907
1908        // Register iroh-eth0 as available
1909        let inst = TransportInstance::new(
1910            "iroh-eth0",
1911            TransportType::Quic,
1912            TransportCapabilities::quic(),
1913        );
1914        let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1915        manager.register_instance(inst, transport);
1916
1917        assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1918    }
1919
1920    #[test]
1921    fn test_current_pace_level_with_policy_alternate() {
1922        let policy = TransportPolicy::new("test")
1923            .primary(vec!["iroh-eth0"])
1924            .alternate(vec!["lora-915"])
1925            .emergency(vec!["ble-mesh"]);
1926
1927        let config = TransportManagerConfig::with_policy(policy);
1928        let manager = TransportManager::new(config);
1929
1930        // Only alternate is available
1931        let inst = TransportInstance::new(
1932            "lora-915",
1933            TransportType::LoRa,
1934            TransportCapabilities::lora(7),
1935        );
1936        let transport = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1937        manager.register_instance(inst, transport);
1938
1939        assert_eq!(manager.current_pace_level(), PaceLevel::Alternate);
1940    }
1941
1942    #[test]
1943    fn test_current_pace_level_with_policy_emergency() {
1944        let policy = TransportPolicy::new("test")
1945            .primary(vec!["iroh-eth0"])
1946            .alternate(vec!["lora-915"])
1947            .emergency(vec!["ble-mesh"]);
1948
1949        let config = TransportManagerConfig::with_policy(policy);
1950        let manager = TransportManager::new(config);
1951
1952        // Only emergency is available
1953        let inst = TransportInstance::new(
1954            "ble-mesh",
1955            TransportType::BluetoothLE,
1956            TransportCapabilities::bluetooth_le(),
1957        );
1958        let transport = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1959        manager.register_instance(inst, transport);
1960
1961        assert_eq!(manager.current_pace_level(), PaceLevel::Emergency);
1962    }
1963
1964    #[test]
1965    fn test_current_pace_level_with_policy_none_available() {
1966        let policy = TransportPolicy::new("test")
1967            .primary(vec!["iroh-eth0"])
1968            .alternate(vec!["lora-915"]);
1969
1970        let config = TransportManagerConfig::with_policy(policy);
1971        let manager = TransportManager::new(config);
1972
1973        // No instances registered
1974        assert_eq!(manager.current_pace_level(), PaceLevel::None);
1975    }
1976
1977    // =========================================================================
1978    // select_transports_pace() Tests
1979    // =========================================================================
1980
1981    #[test]
1982    fn test_select_transports_pace_no_policy() {
1983        let config = TransportManagerConfig::default();
1984        let manager = TransportManager::new(config);
1985
1986        let peer = NodeId::new("peer-1".to_string());
1987        let requirements = MessageRequirements::default();
1988
1989        // No policy => empty vec
1990        let selected = manager.select_transports_pace(&peer, &requirements);
1991        assert!(selected.is_empty());
1992    }
1993
1994    #[test]
1995    fn test_select_transports_pace_single_mode() {
1996        let policy = TransportPolicy::new("test")
1997            .primary(vec!["iroh-eth0", "iroh-wlan0"])
1998            .alternate(vec!["lora-915"]);
1999
2000        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
2001        let manager = TransportManager::new(config);
2002
2003        let peer = NodeId::new("peer-1".to_string());
2004
2005        // Register two available primary instances that can reach peer
2006        let inst1 = TransportInstance::new(
2007            "iroh-eth0",
2008            TransportType::Quic,
2009            TransportCapabilities::quic(),
2010        );
2011        let t1 =
2012            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2013        manager.register_instance(inst1, t1);
2014
2015        let inst2 = TransportInstance::new(
2016            "iroh-wlan0",
2017            TransportType::Quic,
2018            TransportCapabilities::quic(),
2019        );
2020        let t2 =
2021            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2022        manager.register_instance(inst2, t2);
2023
2024        let requirements = MessageRequirements::default();
2025        let selected = manager.select_transports_pace(&peer, &requirements);
2026
2027        // Single mode: at most 1
2028        assert_eq!(selected.len(), 1);
2029        assert_eq!(selected[0], "iroh-eth0");
2030    }
2031
2032    #[test]
2033    fn test_select_transports_pace_redundant_mode() {
2034        let policy = TransportPolicy::new("test")
2035            .primary(vec!["iroh-eth0", "iroh-wlan0"])
2036            .alternate(vec!["lora-915"]);
2037
2038        let config =
2039            TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(2));
2040        let manager = TransportManager::new(config);
2041
2042        let peer = NodeId::new("peer-1".to_string());
2043
2044        let inst1 = TransportInstance::new(
2045            "iroh-eth0",
2046            TransportType::Quic,
2047            TransportCapabilities::quic(),
2048        );
2049        let t1 =
2050            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2051        manager.register_instance(inst1, t1);
2052
2053        let inst2 = TransportInstance::new(
2054            "iroh-wlan0",
2055            TransportType::Quic,
2056            TransportCapabilities::quic(),
2057        );
2058        let t2 =
2059            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2060        manager.register_instance(inst2, t2);
2061
2062        let inst3 = TransportInstance::new(
2063            "lora-915",
2064            TransportType::LoRa,
2065            TransportCapabilities::lora(7),
2066        );
2067        let t3 =
2068            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2069        manager.register_instance(inst3, t3);
2070
2071        let requirements = MessageRequirements::default();
2072        let selected = manager.select_transports_pace(&peer, &requirements);
2073
2074        // Redundant { min_paths: 2, max_paths: None } => takes max(len, min) = all 3
2075        assert!(selected.len() >= 2);
2076    }
2077
2078    #[test]
2079    fn test_select_transports_pace_redundant_bounded() {
2080        let policy = TransportPolicy::new("test").primary(vec!["t1", "t2", "t3", "t4"]);
2081
2082        let config = TransportManagerConfig::with_policy(policy)
2083            .with_mode(TransportMode::redundant_bounded(1, 2));
2084        let manager = TransportManager::new(config);
2085
2086        let peer = NodeId::new("peer-1".to_string());
2087
2088        // Register 4 instances
2089        for name in &["t1", "t2", "t3", "t4"] {
2090            let inst =
2091                TransportInstance::new(*name, TransportType::Quic, TransportCapabilities::quic());
2092            let t =
2093                Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2094            manager.register_instance(inst, t);
2095        }
2096
2097        let requirements = MessageRequirements::default();
2098        let selected = manager.select_transports_pace(&peer, &requirements);
2099
2100        // Redundant { min_paths: 1, max_paths: Some(2) } => takes max(2, 1) = 2
2101        assert_eq!(selected.len(), 2);
2102    }
2103
2104    #[test]
2105    fn test_select_transports_pace_bonded_mode() {
2106        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2107
2108        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2109        let manager = TransportManager::new(config);
2110
2111        let peer = NodeId::new("peer-1".to_string());
2112
2113        let inst1 = TransportInstance::new(
2114            "iroh-eth0",
2115            TransportType::Quic,
2116            TransportCapabilities::quic(),
2117        );
2118        let t1 =
2119            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2120        manager.register_instance(inst1, t1);
2121
2122        let inst2 = TransportInstance::new(
2123            "iroh-wlan0",
2124            TransportType::Quic,
2125            TransportCapabilities::quic(),
2126        );
2127        let t2 =
2128            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2129        manager.register_instance(inst2, t2);
2130
2131        let requirements = MessageRequirements::default();
2132        let selected = manager.select_transports_pace(&peer, &requirements);
2133
2134        // Bonded: returns all candidates
2135        assert_eq!(selected.len(), 2);
2136    }
2137
2138    #[test]
2139    fn test_select_transports_pace_load_balanced_mode() {
2140        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2141
2142        let config = TransportManagerConfig::with_policy(policy)
2143            .with_mode(TransportMode::LoadBalanced { weights: None });
2144        let manager = TransportManager::new(config);
2145
2146        let peer = NodeId::new("peer-1".to_string());
2147
2148        let inst1 = TransportInstance::new(
2149            "iroh-eth0",
2150            TransportType::Quic,
2151            TransportCapabilities::quic(),
2152        );
2153        let t1 =
2154            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2155        manager.register_instance(inst1, t1);
2156
2157        let inst2 = TransportInstance::new(
2158            "iroh-wlan0",
2159            TransportType::Quic,
2160            TransportCapabilities::quic(),
2161        );
2162        let t2 =
2163            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2164        manager.register_instance(inst2, t2);
2165
2166        let requirements = MessageRequirements::default();
2167        let selected = manager.select_transports_pace(&peer, &requirements);
2168
2169        // LoadBalanced: returns all candidates
2170        assert_eq!(selected.len(), 2);
2171    }
2172
2173    #[test]
2174    fn test_select_transports_pace_filters_by_requirements() {
2175        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2176
2177        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2178        let manager = TransportManager::new(config);
2179
2180        let peer = NodeId::new("peer-1".to_string());
2181
2182        // QUIC is reliable
2183        let inst1 = TransportInstance::new(
2184            "iroh-eth0",
2185            TransportType::Quic,
2186            TransportCapabilities::quic(),
2187        );
2188        let t1 =
2189            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2190        manager.register_instance(inst1, t1);
2191
2192        // LoRa is NOT reliable
2193        let inst2 = TransportInstance::new(
2194            "lora-915",
2195            TransportType::LoRa,
2196            TransportCapabilities::lora(7),
2197        );
2198        let t2 =
2199            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2200        manager.register_instance(inst2, t2);
2201
2202        // Require reliability => should filter out LoRa
2203        let requirements = MessageRequirements {
2204            reliable: true,
2205            ..Default::default()
2206        };
2207        let selected = manager.select_transports_pace(&peer, &requirements);
2208
2209        assert_eq!(selected.len(), 1);
2210        assert_eq!(selected[0], "iroh-eth0");
2211    }
2212
2213    #[test]
2214    fn test_select_transports_pace_filters_unreachable_peer() {
2215        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2216
2217        let config = TransportManagerConfig::with_policy(policy);
2218        let manager = TransportManager::new(config);
2219
2220        let peer = NodeId::new("peer-1".to_string());
2221
2222        // Can reach peer
2223        let inst1 = TransportInstance::new(
2224            "iroh-eth0",
2225            TransportType::Quic,
2226            TransportCapabilities::quic(),
2227        );
2228        let t1 =
2229            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2230        manager.register_instance(inst1, t1);
2231
2232        // Cannot reach peer
2233        let inst2 = TransportInstance::new(
2234            "lora-915",
2235            TransportType::LoRa,
2236            TransportCapabilities::lora(7),
2237        );
2238        let t2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
2239        manager.register_instance(inst2, t2);
2240
2241        let requirements = MessageRequirements::default();
2242        let selected = manager.select_transports_pace(&peer, &requirements);
2243
2244        assert_eq!(selected.len(), 1);
2245        assert_eq!(selected[0], "iroh-eth0");
2246    }
2247
2248    // =========================================================================
2249    // select_transport_pace() Tests
2250    // =========================================================================
2251
2252    #[test]
2253    fn test_select_transport_pace_returns_first() {
2254        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2255
2256        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2257        let manager = TransportManager::new(config);
2258
2259        let peer = NodeId::new("peer-1".to_string());
2260
2261        let inst1 = TransportInstance::new(
2262            "iroh-eth0",
2263            TransportType::Quic,
2264            TransportCapabilities::quic(),
2265        );
2266        let t1 =
2267            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2268        manager.register_instance(inst1, t1);
2269
2270        let inst2 = TransportInstance::new(
2271            "iroh-wlan0",
2272            TransportType::Quic,
2273            TransportCapabilities::quic(),
2274        );
2275        let t2 =
2276            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2277        manager.register_instance(inst2, t2);
2278
2279        let requirements = MessageRequirements::default();
2280        let selected = manager.select_transport_pace(&peer, &requirements);
2281
2282        assert_eq!(selected, Some("iroh-eth0".to_string()));
2283    }
2284
2285    #[test]
2286    fn test_select_transport_pace_returns_none_no_policy() {
2287        let config = TransportManagerConfig::default();
2288        let manager = TransportManager::new(config);
2289
2290        let peer = NodeId::new("peer-1".to_string());
2291        let requirements = MessageRequirements::default();
2292
2293        assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2294    }
2295
2296    #[test]
2297    fn test_select_transport_pace_returns_none_no_candidates() {
2298        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
2299
2300        let config = TransportManagerConfig::with_policy(policy);
2301        let manager = TransportManager::new(config);
2302
2303        let peer = NodeId::new("peer-1".to_string());
2304        let requirements = MessageRequirements::default();
2305
2306        // No instances registered
2307        assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2308    }
2309
2310    // =========================================================================
2311    // record_success_pace() and clear_cache_pace() Tests
2312    // =========================================================================
2313
2314    #[test]
2315    fn test_record_success_pace_caching_enabled() {
2316        let config = TransportManagerConfig {
2317            cache_peer_transport: true,
2318            ..Default::default()
2319        };
2320        let manager = TransportManager::new(config);
2321
2322        let peer = NodeId::new("peer-1".to_string());
2323        manager.record_success_pace(&peer, "iroh-eth0".to_string());
2324
2325        let cached = manager
2326            .peer_transport_ids
2327            .read()
2328            .unwrap_or_else(|e| e.into_inner());
2329        assert_eq!(cached.get(&peer), Some(&"iroh-eth0".to_string()));
2330    }
2331
2332    #[test]
2333    fn test_record_success_pace_caching_disabled() {
2334        let config = TransportManagerConfig {
2335            cache_peer_transport: false,
2336            ..Default::default()
2337        };
2338        let manager = TransportManager::new(config);
2339
2340        let peer = NodeId::new("peer-1".to_string());
2341        manager.record_success_pace(&peer, "iroh-eth0".to_string());
2342
2343        let cached = manager
2344            .peer_transport_ids
2345            .read()
2346            .unwrap_or_else(|e| e.into_inner());
2347        assert!(cached.get(&peer).is_none());
2348    }
2349
2350    #[test]
2351    fn test_clear_cache_pace() {
2352        let config = TransportManagerConfig {
2353            cache_peer_transport: true,
2354            ..Default::default()
2355        };
2356        let manager = TransportManager::new(config);
2357
2358        let peer = NodeId::new("peer-1".to_string());
2359        manager.record_success_pace(&peer, "iroh-eth0".to_string());
2360
2361        // Verify it's cached
2362        assert!(manager
2363            .peer_transport_ids
2364            .read()
2365            .unwrap()
2366            .get(&peer)
2367            .is_some());
2368
2369        manager.clear_cache_pace(&peer);
2370
2371        // Verify it's cleared
2372        assert!(manager
2373            .peer_transport_ids
2374            .read()
2375            .unwrap()
2376            .get(&peer)
2377            .is_none());
2378    }
2379
2380    #[test]
2381    fn test_clear_cache_pace_nonexistent_peer() {
2382        let config = TransportManagerConfig::default();
2383        let manager = TransportManager::new(config);
2384
2385        let peer = NodeId::new("nonexistent".to_string());
2386
2387        // Should not panic
2388        manager.clear_cache_pace(&peer);
2389    }
2390
2391    // =========================================================================
2392    // select_transport_for_distance() Tests
2393    // =========================================================================
2394
2395    #[test]
2396    fn test_select_transport_for_distance_no_distance() {
2397        let config = TransportManagerConfig::default();
2398        let mut manager = TransportManager::new(config);
2399
2400        let peer = NodeId::new("peer-1".to_string());
2401        let quic =
2402            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2403        manager.register(quic);
2404
2405        let requirements = MessageRequirements::default();
2406        let result = manager.select_transport_for_distance(&peer, &requirements);
2407
2408        assert!(result.is_some());
2409        let (transport_type, range_mode) = result.unwrap();
2410        assert_eq!(transport_type, TransportType::Quic);
2411        assert!(range_mode.is_none());
2412    }
2413
2414    #[test]
2415    fn test_select_transport_for_distance_with_distance() {
2416        let config = TransportManagerConfig::default();
2417        let mut manager = TransportManager::new(config);
2418
2419        let peer = NodeId::new("peer-1".to_string());
2420        let quic =
2421            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2422        manager.register(quic);
2423
2424        // Set distance for peer
2425        let distance = PeerDistance {
2426            peer_id: peer.clone(),
2427            distance_meters: 1000,
2428            source: super::super::capabilities::DistanceSource::Configured,
2429            last_updated: Instant::now(),
2430        };
2431        manager.update_peer_distance(distance);
2432
2433        let requirements = MessageRequirements::default();
2434        let result = manager.select_transport_for_distance(&peer, &requirements);
2435
2436        assert!(result.is_some());
2437        let (transport_type, range_mode) = result.unwrap();
2438        assert_eq!(transport_type, TransportType::Quic);
2439        // Range mode is None because placeholder logic doesn't do runtime downcasting
2440        assert!(range_mode.is_none());
2441    }
2442
2443    #[test]
2444    fn test_select_transport_for_distance_no_suitable_transport() {
2445        let config = TransportManagerConfig::default();
2446        let manager = TransportManager::new(config);
2447
2448        let peer = NodeId::new("peer-1".to_string());
2449        let requirements = MessageRequirements::default();
2450
2451        let result = manager.select_transport_for_distance(&peer, &requirements);
2452        assert!(result.is_none());
2453    }
2454
2455    // =========================================================================
2456    // TransportManagerConfig builder Tests
2457    // =========================================================================
2458
2459    #[test]
2460    fn test_config_with_policy() {
2461        let policy = TransportPolicy::new("tactical")
2462            .primary(vec!["iroh-eth0"])
2463            .alternate(vec!["lora-915"]);
2464
2465        let config = TransportManagerConfig::with_policy(policy);
2466
2467        assert!(config.default_policy.is_some());
2468        let p = config.default_policy.unwrap();
2469        assert_eq!(p.name, "tactical");
2470        assert_eq!(p.primary.len(), 1);
2471        assert_eq!(p.alternate.len(), 1);
2472        // Verify defaults are preserved
2473        assert!(config.enable_fallback);
2474        assert!(config.cache_peer_transport);
2475        assert_eq!(config.switch_threshold, 10);
2476        assert!(matches!(config.transport_mode, TransportMode::Single));
2477    }
2478
2479    #[test]
2480    fn test_config_with_mode() {
2481        let config = TransportManagerConfig::default().with_mode(TransportMode::Bonded);
2482
2483        assert!(matches!(config.transport_mode, TransportMode::Bonded));
2484    }
2485
2486    #[test]
2487    fn test_config_with_policy_and_mode_chained() {
2488        let policy = TransportPolicy::new("test").primary(vec!["t1"]);
2489        let config =
2490            TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(3));
2491
2492        assert!(config.default_policy.is_some());
2493        assert!(matches!(
2494            config.transport_mode,
2495            TransportMode::Redundant {
2496                min_paths: 3,
2497                max_paths: None
2498            }
2499        ));
2500    }
2501
2502    // =========================================================================
2503    // connect() error paths Tests
2504    // =========================================================================
2505
2506    #[tokio::test]
2507    async fn test_connect_no_suitable_transport() {
2508        let config = TransportManagerConfig::default();
2509        let manager = TransportManager::new(config);
2510
2511        let peer = NodeId::new("peer-1".to_string());
2512        let requirements = MessageRequirements::default();
2513
2514        let result = manager.connect(&peer, &requirements).await;
2515        assert!(result.is_err());
2516        match result {
2517            Err(TransportError::PeerNotFound(_)) => {} // expected
2518            Err(other) => panic!("Expected PeerNotFound, got: {}", other),
2519            Ok(_) => panic!("Expected error but got Ok"),
2520        }
2521    }
2522
2523    #[tokio::test]
2524    async fn test_connect_unreachable_peer() {
2525        let config = TransportManagerConfig::default();
2526        let mut manager = TransportManager::new(config);
2527
2528        // Register QUIC but the peer is not in reachable_peers
2529        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2530        manager.register(quic);
2531
2532        let peer = NodeId::new("unreachable-peer".to_string());
2533        let requirements = MessageRequirements::default();
2534
2535        let result = manager.connect(&peer, &requirements).await;
2536        assert!(result.is_err());
2537    }
2538
2539    // =========================================================================
2540    // connect_with_fallback() Tests
2541    // =========================================================================
2542
2543    #[tokio::test]
2544    async fn test_connect_with_fallback_disabled() {
2545        let config = TransportManagerConfig {
2546            enable_fallback: false,
2547            ..Default::default()
2548        };
2549        let mut manager = TransportManager::new(config);
2550
2551        let peer = NodeId::new("peer-1".to_string());
2552
2553        // QUIC registered but can't reach peer (will fail connect)
2554        let quic =
2555            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2556        manager.register(quic);
2557
2558        // BLE also available
2559        let ble = Arc::new(
2560            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2561        );
2562        manager.register(ble);
2563
2564        // Both can reach, both will succeed, so first should succeed.
2565        // Let's test the error path where the first fails:
2566        // We need a transport that can reach but fails to connect.
2567        // The MockTransport connects if peer is in reachable_peers.
2568        // Actually, both will succeed, so let's just test with no reachable transports.
2569
2570        let peer_unreachable = NodeId::new("nobody".to_string());
2571        let requirements = MessageRequirements::default();
2572
2573        let result = manager
2574            .connect_with_fallback(&peer_unreachable, &requirements)
2575            .await;
2576        assert!(result.is_err());
2577    }
2578
2579    #[tokio::test]
2580    async fn test_connect_with_fallback_no_candidates() {
2581        let config = TransportManagerConfig::default();
2582        let manager = TransportManager::new(config);
2583
2584        let peer = NodeId::new("peer-1".to_string());
2585        let requirements = MessageRequirements::default();
2586
2587        let result = manager.connect_with_fallback(&peer, &requirements).await;
2588        assert!(result.is_err());
2589        match result {
2590            Err(ref e) => {
2591                let err_msg = format!("{}", e);
2592                assert!(err_msg.contains("No suitable transport"));
2593            }
2594            Ok(_) => panic!("Expected error but got Ok"),
2595        }
2596    }
2597
2598    // =========================================================================
2599    // route_message() NoRoute Tests
2600    // =========================================================================
2601
2602    #[test]
2603    fn test_route_message_no_route() {
2604        let config = TransportManagerConfig::default();
2605        let manager = TransportManager::new(config);
2606
2607        let peer = NodeId::new("peer-1".to_string());
2608        let requirements = MessageRequirements::default();
2609
2610        // No transports registered => NoRoute
2611        let decision = manager.route_message(&peer, &requirements);
2612        assert_eq!(decision, RouteDecision::NoRoute);
2613    }
2614
2615    #[test]
2616    fn test_route_message_bypass_requested_no_channel() {
2617        let config = TransportManagerConfig::default();
2618        let manager = TransportManager::new(config);
2619
2620        let peer = NodeId::new("peer-1".to_string());
2621        let requirements = MessageRequirements {
2622            bypass_sync: true,
2623            ..Default::default()
2624        };
2625
2626        // bypass requested but no channel and no transports => NoRoute
2627        let decision = manager.route_message(&peer, &requirements);
2628        assert_eq!(decision, RouteDecision::NoRoute);
2629    }
2630
2631    // =========================================================================
2632    // RouteDecision construction Tests
2633    // =========================================================================
2634
2635    #[test]
2636    fn test_route_decision_no_route() {
2637        let decision = RouteDecision::NoRoute;
2638        assert_eq!(decision, RouteDecision::NoRoute);
2639        assert_ne!(decision, RouteDecision::Bypass);
2640        assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
2641    }
2642
2643    #[test]
2644    fn test_route_decision_debug() {
2645        let bypass = RouteDecision::Bypass;
2646        let transport = RouteDecision::Transport(TransportType::LoRa);
2647        let no_route = RouteDecision::NoRoute;
2648
2649        assert!(format!("{:?}", bypass).contains("Bypass"));
2650        assert!(format!("{:?}", transport).contains("LoRa"));
2651        assert!(format!("{:?}", no_route).contains("NoRoute"));
2652    }
2653
2654    #[test]
2655    fn test_route_decision_clone() {
2656        let original = RouteDecision::Transport(TransportType::BluetoothLE);
2657        let cloned = original.clone();
2658        assert_eq!(original, cloned);
2659    }
2660
2661    // =========================================================================
2662    // TransportManager Debug and misc Tests
2663    // =========================================================================
2664
2665    #[test]
2666    fn test_transport_manager_debug() {
2667        let config = TransportManagerConfig::default();
2668        let mut manager = TransportManager::new(config);
2669
2670        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2671        manager.register(quic);
2672
2673        let debug_str = format!("{:?}", manager);
2674        assert!(debug_str.contains("TransportManager"));
2675        assert!(debug_str.contains("Quic"));
2676    }
2677
2678    #[test]
2679    fn test_registered_transports() {
2680        let config = TransportManagerConfig::default();
2681        let mut manager = TransportManager::new(config);
2682
2683        assert!(manager.registered_transports().is_empty());
2684
2685        let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2686        let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
2687        manager.register(quic);
2688        manager.register(ble);
2689
2690        let registered = manager.registered_transports();
2691        assert_eq!(registered.len(), 2);
2692        assert!(registered.contains(&TransportType::Quic));
2693        assert!(registered.contains(&TransportType::BluetoothLE));
2694    }
2695
2696    #[tokio::test]
2697    async fn test_set_bypass_channel() {
2698        let config = TransportManagerConfig::default();
2699        let mut manager = TransportManager::new(config);
2700
2701        assert!(!manager.has_bypass_channel());
2702
2703        let bypass_config = BypassChannelConfig::new();
2704        let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
2705        manager.set_bypass_channel(bypass);
2706
2707        assert!(manager.has_bypass_channel());
2708    }
2709
2710    #[test]
2711    fn test_record_success_caching_disabled() {
2712        let config = TransportManagerConfig {
2713            cache_peer_transport: false,
2714            ..Default::default()
2715        };
2716        let manager = TransportManager::new(config);
2717
2718        let peer = NodeId::new("peer-1".to_string());
2719        manager.record_success(&peer, TransportType::Quic);
2720
2721        // Cache should be empty since caching is disabled
2722        let cached = manager
2723            .peer_transports
2724            .read()
2725            .unwrap_or_else(|e| e.into_inner());
2726        assert!(cached.get(&peer).is_none());
2727    }
2728
2729    #[test]
2730    fn test_select_transport_cached_transport_invalid() {
2731        let config = TransportManagerConfig {
2732            cache_peer_transport: true,
2733            ..Default::default()
2734        };
2735        let mut manager = TransportManager::new(config);
2736
2737        let peer = NodeId::new("peer-1".to_string());
2738
2739        // Register BLE that is available and can reach peer
2740        let ble = Arc::new(
2741            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2742        );
2743        manager.register(ble);
2744
2745        // Cache a transport type that is NOT registered (e.g., LoRa)
2746        manager.record_success(&peer, TransportType::LoRa);
2747
2748        let requirements = MessageRequirements::default();
2749        let selected = manager.select_transport(&peer, &requirements);
2750
2751        // Should fall through cached transport (LoRa not registered) and select BLE
2752        assert_eq!(selected, Some(TransportType::BluetoothLE));
2753    }
2754
2755    #[test]
2756    fn test_select_transport_cached_transport_unavailable() {
2757        let config = TransportManagerConfig {
2758            cache_peer_transport: true,
2759            ..Default::default()
2760        };
2761        let mut manager = TransportManager::new(config);
2762
2763        let peer = NodeId::new("peer-1".to_string());
2764
2765        // Register QUIC that is available
2766        let quic =
2767            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2768        manager.register(quic);
2769
2770        // Register BLE that is unavailable
2771        let ble = Arc::new(
2772            MockTransport::new(TransportCapabilities::bluetooth_le())
2773                .with_peer(peer.clone())
2774                .unavailable(),
2775        );
2776        manager.register(ble);
2777
2778        // Cache BLE (which is unavailable)
2779        manager.record_success(&peer, TransportType::BluetoothLE);
2780
2781        let requirements = MessageRequirements::default();
2782        let selected = manager.select_transport(&peer, &requirements);
2783
2784        // Should fall through cached BLE (unavailable) and select QUIC
2785        assert_eq!(selected, Some(TransportType::Quic));
2786    }
2787
2788    #[test]
2789    fn test_pace_fallback_order() {
2790        // Test that PACE selection follows policy order when primary fails
2791        let policy = TransportPolicy::new("test")
2792            .primary(vec!["dead-transport"])
2793            .alternate(vec!["lora-915"]);
2794
2795        let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
2796        let manager = TransportManager::new(config);
2797
2798        let peer = NodeId::new("peer-1".to_string());
2799
2800        // Only register the alternate (primary is not registered)
2801        let inst = TransportInstance::new(
2802            "lora-915",
2803            TransportType::LoRa,
2804            TransportCapabilities::lora(7),
2805        );
2806        let t =
2807            Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2808        manager.register_instance(inst, t);
2809
2810        let requirements = MessageRequirements::default();
2811        let selected = manager.select_transports_pace(&peer, &requirements);
2812
2813        // Should fall back to alternate
2814        assert_eq!(selected.len(), 1);
2815        assert_eq!(selected[0], "lora-915");
2816    }
2817
2818    #[test]
2819    fn test_get_peer_distance_none() {
2820        let config = TransportManagerConfig::default();
2821        let manager = TransportManager::new(config);
2822
2823        let peer = NodeId::new("unknown-peer".to_string());
2824        assert!(manager.get_peer_distance(&peer).is_none());
2825    }
2826
2827    #[tokio::test]
2828    async fn test_send_bypass_not_configured() {
2829        let config = TransportManagerConfig::default();
2830        let manager = TransportManager::new(config);
2831
2832        let result = manager.send_bypass("test_collection", b"hello", None).await;
2833        assert!(result.is_err());
2834        let err_msg = format!("{}", result.unwrap_err());
2835        assert!(err_msg.contains("Bypass channel not configured"));
2836    }
2837
2838    #[tokio::test]
2839    async fn test_send_bypass_to_not_configured() {
2840        let config = TransportManagerConfig::default();
2841        let manager = TransportManager::new(config);
2842
2843        let target = BypassTarget::Broadcast { port: 5150 };
2844        let result = manager
2845            .send_bypass_to(target, "test_collection", b"hello")
2846            .await;
2847        assert!(result.is_err());
2848        let err_msg = format!("{}", result.unwrap_err());
2849        assert!(err_msg.contains("Bypass channel not configured"));
2850    }
2851
2852    #[tokio::test]
2853    async fn test_subscribe_bypass_collection_not_configured() {
2854        let config = TransportManagerConfig::default();
2855        let manager = TransportManager::new(config);
2856
2857        let result = manager.subscribe_bypass_collection("test").await;
2858        assert!(result.is_err());
2859    }
2860
2861    // =========================================================================
2862    // CollectionRouteTable Tests
2863    // =========================================================================
2864
2865    #[test]
2866    fn test_route_table_empty_returns_none() {
2867        let table = CollectionRouteTable::new();
2868        assert!(table.get("anything").is_none());
2869        assert!(!table.has_collection("anything"));
2870        assert!(!table.is_bypass("anything"));
2871    }
2872
2873    #[test]
2874    fn test_route_table_builder_and_lookup() {
2875        let table = CollectionRouteTable::new()
2876            .with_collection(CollectionRouteConfig {
2877                collection: "telemetry".to_string(),
2878                route: CollectionTransportRoute::Fixed {
2879                    transport_type: TransportType::BluetoothLE,
2880                },
2881                priority: MessagePriority::Normal,
2882            })
2883            .with_collection(CollectionRouteConfig {
2884                collection: "position".to_string(),
2885                route: CollectionTransportRoute::Bypass {
2886                    encoding: MessageEncoding::Raw,
2887                    ttl_ms: 200,
2888                    bypass_transport: BypassTransport::Broadcast,
2889                },
2890                priority: MessagePriority::High,
2891            });
2892
2893        assert!(table.has_collection("telemetry"));
2894        assert!(table.has_collection("position"));
2895        assert!(!table.has_collection("unknown"));
2896
2897        let telemetry = table.get("telemetry").unwrap();
2898        assert!(matches!(
2899            telemetry.route,
2900            CollectionTransportRoute::Fixed {
2901                transport_type: TransportType::BluetoothLE
2902            }
2903        ));
2904        assert_eq!(telemetry.priority, MessagePriority::Normal);
2905
2906        let position = table.get("position").unwrap();
2907        assert_eq!(position.priority, MessagePriority::High);
2908    }
2909
2910    #[test]
2911    fn test_route_table_is_bypass() {
2912        let table = CollectionRouteTable::new()
2913            .with_collection(CollectionRouteConfig {
2914                collection: "bypass_col".to_string(),
2915                route: CollectionTransportRoute::Bypass {
2916                    encoding: MessageEncoding::Raw,
2917                    ttl_ms: 100,
2918                    bypass_transport: BypassTransport::Unicast,
2919                },
2920                priority: MessagePriority::Normal,
2921            })
2922            .with_collection(CollectionRouteConfig {
2923                collection: "fixed_col".to_string(),
2924                route: CollectionTransportRoute::Fixed {
2925                    transport_type: TransportType::Quic,
2926                },
2927                priority: MessagePriority::Normal,
2928            })
2929            .with_collection(CollectionRouteConfig {
2930                collection: "pace_col".to_string(),
2931                route: CollectionTransportRoute::Pace {
2932                    policy_override: None,
2933                },
2934                priority: MessagePriority::Normal,
2935            });
2936
2937        assert!(table.is_bypass("bypass_col"));
2938        assert!(!table.is_bypass("fixed_col"));
2939        assert!(!table.is_bypass("pace_col"));
2940        assert!(!table.is_bypass("nonexistent"));
2941    }
2942
2943    // =========================================================================
2944    // CollectionTransportRoute Serde Tests
2945    // =========================================================================
2946
2947    #[test]
2948    fn test_serde_fixed_route() {
2949        let route = CollectionTransportRoute::Fixed {
2950            transport_type: TransportType::BluetoothLE,
2951        };
2952        let json = serde_json::to_string(&route).unwrap();
2953        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2954        assert!(matches!(
2955            roundtrip,
2956            CollectionTransportRoute::Fixed {
2957                transport_type: TransportType::BluetoothLE
2958            }
2959        ));
2960    }
2961
2962    #[test]
2963    fn test_serde_bypass_route() {
2964        let route = CollectionTransportRoute::Bypass {
2965            encoding: MessageEncoding::Raw,
2966            ttl_ms: 500,
2967            bypass_transport: BypassTransport::Broadcast,
2968        };
2969        let json = serde_json::to_string(&route).unwrap();
2970        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2971        if let CollectionTransportRoute::Bypass {
2972            encoding,
2973            ttl_ms,
2974            bypass_transport,
2975        } = roundtrip
2976        {
2977            assert_eq!(encoding, MessageEncoding::Raw);
2978            assert_eq!(ttl_ms, 500);
2979            assert_eq!(bypass_transport, BypassTransport::Broadcast);
2980        } else {
2981            panic!("Expected Bypass variant");
2982        }
2983    }
2984
2985    #[test]
2986    fn test_serde_pace_route() {
2987        let route = CollectionTransportRoute::Pace {
2988            policy_override: None,
2989        };
2990        let json = serde_json::to_string(&route).unwrap();
2991        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2992        assert!(matches!(
2993            roundtrip,
2994            CollectionTransportRoute::Pace {
2995                policy_override: None
2996            }
2997        ));
2998    }
2999
3000    #[test]
3001    fn test_serde_pace_route_with_policy() {
3002        let policy = TransportPolicy::new("custom")
3003            .primary(vec!["ble-hci0"])
3004            .alternate(vec!["iroh-wlan0"]);
3005        let route = CollectionTransportRoute::Pace {
3006            policy_override: Some(policy),
3007        };
3008        let json = serde_json::to_string(&route).unwrap();
3009        let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
3010        if let CollectionTransportRoute::Pace {
3011            policy_override: Some(p),
3012        } = roundtrip
3013        {
3014            assert_eq!(p.name, "custom");
3015            assert_eq!(p.primary, vec!["ble-hci0"]);
3016            assert_eq!(p.alternate, vec!["iroh-wlan0"]);
3017        } else {
3018            panic!("Expected Pace with policy_override");
3019        }
3020    }
3021
3022    #[test]
3023    fn test_serde_collection_route_config() {
3024        let config = CollectionRouteConfig {
3025            collection: "sensors".to_string(),
3026            route: CollectionTransportRoute::Fixed {
3027                transport_type: TransportType::LoRa,
3028            },
3029            priority: MessagePriority::High,
3030        };
3031        let json = serde_json::to_string(&config).unwrap();
3032        let roundtrip: CollectionRouteConfig = serde_json::from_str(&json).unwrap();
3033        assert_eq!(roundtrip.collection, "sensors");
3034        assert_eq!(roundtrip.priority, MessagePriority::High);
3035    }
3036
3037    #[test]
3038    fn test_serde_collection_route_table() {
3039        let table = CollectionRouteTable::new()
3040            .with_collection(CollectionRouteConfig {
3041                collection: "a".to_string(),
3042                route: CollectionTransportRoute::Fixed {
3043                    transport_type: TransportType::Quic,
3044                },
3045                priority: MessagePriority::Normal,
3046            })
3047            .with_collection(CollectionRouteConfig {
3048                collection: "b".to_string(),
3049                route: CollectionTransportRoute::Bypass {
3050                    encoding: MessageEncoding::Json,
3051                    ttl_ms: 1000,
3052                    bypass_transport: BypassTransport::Unicast,
3053                },
3054                priority: MessagePriority::Critical,
3055            });
3056
3057        let json = serde_json::to_string(&table).unwrap();
3058        let roundtrip: CollectionRouteTable = serde_json::from_str(&json).unwrap();
3059        assert!(roundtrip.has_collection("a"));
3060        assert!(roundtrip.has_collection("b"));
3061        assert!(roundtrip.is_bypass("b"));
3062        assert!(!roundtrip.is_bypass("a"));
3063    }
3064
3065    #[test]
3066    fn test_serde_transport_type() {
3067        // Verify all variants round-trip through JSON
3068        let types = vec![
3069            TransportType::Quic,
3070            TransportType::BluetoothClassic,
3071            TransportType::BluetoothLE,
3072            TransportType::WifiDirect,
3073            TransportType::LoRa,
3074            TransportType::TacticalRadio,
3075            TransportType::Satellite,
3076            TransportType::Custom(42),
3077        ];
3078        for tt in types {
3079            let json = serde_json::to_string(&tt).unwrap();
3080            let roundtrip: TransportType = serde_json::from_str(&json).unwrap();
3081            assert_eq!(roundtrip, tt, "Failed round-trip for {:?}", tt);
3082        }
3083    }
3084
3085    // =========================================================================
3086    // route_collection() Tests
3087    // =========================================================================
3088
3089    #[test]
3090    fn test_route_collection_unknown_falls_through() {
3091        let config = TransportManagerConfig::default();
3092        let mut manager = TransportManager::new(config);
3093
3094        let peer = NodeId::new("peer-1".to_string());
3095        let quic =
3096            Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3097        manager.register(quic);
3098
3099        let requirements = MessageRequirements::default();
3100        // Unknown collection falls through to route_message()
3101        let decision = manager.route_collection("unknown", &peer, &requirements);
3102        assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
3103    }
3104
3105    #[test]
3106    fn test_route_collection_fixed_routes_correctly() {
3107        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3108            collection: "telemetry".to_string(),
3109            route: CollectionTransportRoute::Fixed {
3110                transport_type: TransportType::BluetoothLE,
3111            },
3112            priority: MessagePriority::Normal,
3113        });
3114
3115        let config = TransportManagerConfig {
3116            collection_routes: table,
3117            ..Default::default()
3118        };
3119        let mut manager = TransportManager::new(config);
3120
3121        let peer = NodeId::new("peer-1".to_string());
3122        let ble = Arc::new(
3123            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3124        );
3125        manager.register(ble);
3126
3127        let requirements = MessageRequirements::default();
3128        let decision = manager.route_collection("telemetry", &peer, &requirements);
3129        assert_eq!(
3130            decision,
3131            RouteDecision::Transport(TransportType::BluetoothLE)
3132        );
3133    }
3134
3135    #[test]
3136    fn test_route_collection_fixed_unavailable_returns_no_route() {
3137        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3138            collection: "telemetry".to_string(),
3139            route: CollectionTransportRoute::Fixed {
3140                transport_type: TransportType::BluetoothLE,
3141            },
3142            priority: MessagePriority::Normal,
3143        });
3144
3145        let config = TransportManagerConfig {
3146            collection_routes: table,
3147            ..Default::default()
3148        };
3149        let mut manager = TransportManager::new(config);
3150
3151        let peer = NodeId::new("peer-1".to_string());
3152
3153        // Register BLE but make it unavailable
3154        let ble = Arc::new(
3155            MockTransport::new(TransportCapabilities::bluetooth_le())
3156                .with_peer(peer.clone())
3157                .unavailable(),
3158        );
3159        manager.register(ble);
3160
3161        let requirements = MessageRequirements::default();
3162        let decision = manager.route_collection("telemetry", &peer, &requirements);
3163        assert_eq!(decision, RouteDecision::NoRoute);
3164    }
3165
3166    #[test]
3167    fn test_route_collection_fixed_not_registered_returns_no_route() {
3168        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3169            collection: "telemetry".to_string(),
3170            route: CollectionTransportRoute::Fixed {
3171                transport_type: TransportType::BluetoothLE,
3172            },
3173            priority: MessagePriority::Normal,
3174        });
3175
3176        let config = TransportManagerConfig {
3177            collection_routes: table,
3178            ..Default::default()
3179        };
3180        let manager = TransportManager::new(config);
3181
3182        let peer = NodeId::new("peer-1".to_string());
3183        // No transports registered at all
3184        let requirements = MessageRequirements::default();
3185        let decision = manager.route_collection("telemetry", &peer, &requirements);
3186        assert_eq!(decision, RouteDecision::NoRoute);
3187    }
3188
3189    #[tokio::test]
3190    async fn test_route_collection_bypass_with_channel() {
3191        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3192            collection: "position".to_string(),
3193            route: CollectionTransportRoute::Bypass {
3194                encoding: MessageEncoding::Raw,
3195                ttl_ms: 200,
3196                bypass_transport: BypassTransport::Broadcast,
3197            },
3198            priority: MessagePriority::High,
3199        });
3200
3201        let config = TransportManagerConfig {
3202            collection_routes: table,
3203            ..Default::default()
3204        };
3205        let mut manager = TransportManager::new(config);
3206
3207        // Set up bypass channel
3208        let bypass_config = BypassChannelConfig::new();
3209        let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
3210        manager.set_bypass_channel(bypass);
3211
3212        let peer = NodeId::new("peer-1".to_string());
3213        let requirements = MessageRequirements::default();
3214        let decision = manager.route_collection("position", &peer, &requirements);
3215        assert_eq!(decision, RouteDecision::Bypass);
3216    }
3217
3218    #[test]
3219    fn test_route_collection_bypass_without_channel() {
3220        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3221            collection: "position".to_string(),
3222            route: CollectionTransportRoute::Bypass {
3223                encoding: MessageEncoding::Raw,
3224                ttl_ms: 200,
3225                bypass_transport: BypassTransport::Broadcast,
3226            },
3227            priority: MessagePriority::High,
3228        });
3229
3230        let config = TransportManagerConfig {
3231            collection_routes: table,
3232            ..Default::default()
3233        };
3234        let manager = TransportManager::new(config);
3235
3236        let peer = NodeId::new("peer-1".to_string());
3237        let requirements = MessageRequirements::default();
3238        // No bypass channel configured
3239        let decision = manager.route_collection("position", &peer, &requirements);
3240        assert_eq!(decision, RouteDecision::NoRoute);
3241    }
3242
3243    #[test]
3244    fn test_route_collection_pace_routes_correctly() {
3245        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3246
3247        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3248            collection: "sync_data".to_string(),
3249            route: CollectionTransportRoute::Pace {
3250                policy_override: None,
3251            },
3252            priority: MessagePriority::Normal,
3253        });
3254
3255        let config = TransportManagerConfig {
3256            collection_routes: table,
3257            default_policy: Some(policy),
3258            ..Default::default()
3259        };
3260        let manager = TransportManager::new(config);
3261
3262        let peer = NodeId::new("peer-1".to_string());
3263
3264        // Register the PACE instance
3265        let inst = TransportInstance::new(
3266            "iroh-eth0",
3267            TransportType::Quic,
3268            TransportCapabilities::quic(),
3269        );
3270        let t = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3271        manager.register_instance(inst, t);
3272
3273        let requirements = MessageRequirements::default();
3274        let decision = manager.route_collection("sync_data", &peer, &requirements);
3275        assert_eq!(
3276            decision,
3277            RouteDecision::TransportInstance("iroh-eth0".to_string())
3278        );
3279    }
3280
3281    #[test]
3282    fn test_route_collection_pace_no_available_returns_no_route() {
3283        let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3284
3285        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3286            collection: "sync_data".to_string(),
3287            route: CollectionTransportRoute::Pace {
3288                policy_override: None,
3289            },
3290            priority: MessagePriority::Normal,
3291        });
3292
3293        let config = TransportManagerConfig {
3294            collection_routes: table,
3295            default_policy: Some(policy),
3296            ..Default::default()
3297        };
3298        let manager = TransportManager::new(config);
3299
3300        let peer = NodeId::new("peer-1".to_string());
3301        // No instances registered
3302        let requirements = MessageRequirements::default();
3303        let decision = manager.route_collection("sync_data", &peer, &requirements);
3304        assert_eq!(decision, RouteDecision::NoRoute);
3305    }
3306
3307    #[test]
3308    fn test_route_collection_pace_with_policy_override() {
3309        // Default policy points to non-existent transport
3310        let default_policy = TransportPolicy::new("default").primary(vec!["nonexistent"]);
3311
3312        // Override policy points to available transport
3313        let override_policy = TransportPolicy::new("override").primary(vec!["ble-hci0"]);
3314
3315        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3316            collection: "ble_data".to_string(),
3317            route: CollectionTransportRoute::Pace {
3318                policy_override: Some(override_policy),
3319            },
3320            priority: MessagePriority::Normal,
3321        });
3322
3323        let config = TransportManagerConfig {
3324            collection_routes: table,
3325            default_policy: Some(default_policy),
3326            ..Default::default()
3327        };
3328        let manager = TransportManager::new(config);
3329
3330        let peer = NodeId::new("peer-1".to_string());
3331        let inst = TransportInstance::new(
3332            "ble-hci0",
3333            TransportType::BluetoothLE,
3334            TransportCapabilities::bluetooth_le(),
3335        );
3336        let t = Arc::new(
3337            MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3338        );
3339        manager.register_instance(inst, t);
3340
3341        let requirements = MessageRequirements::default();
3342        let decision = manager.route_collection("ble_data", &peer, &requirements);
3343        assert_eq!(
3344            decision,
3345            RouteDecision::TransportInstance("ble-hci0".to_string())
3346        );
3347    }
3348
3349    // =========================================================================
3350    // RouteDecision::TransportInstance Tests
3351    // =========================================================================
3352
3353    #[test]
3354    fn test_route_decision_transport_instance_variant() {
3355        let decision = RouteDecision::TransportInstance("iroh-eth0".to_string());
3356        assert_eq!(
3357            decision,
3358            RouteDecision::TransportInstance("iroh-eth0".to_string())
3359        );
3360        assert_ne!(decision, RouteDecision::Bypass);
3361        assert_ne!(decision, RouteDecision::NoRoute);
3362        assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
3363    }
3364
3365    #[test]
3366    fn test_route_decision_transport_instance_debug() {
3367        let decision = RouteDecision::TransportInstance("ble-hci0".to_string());
3368        let debug = format!("{:?}", decision);
3369        assert!(debug.contains("TransportInstance"));
3370        assert!(debug.contains("ble-hci0"));
3371    }
3372
3373    #[test]
3374    fn test_route_decision_transport_instance_clone() {
3375        let original = RouteDecision::TransportInstance("iroh-wlan0".to_string());
3376        let cloned = original.clone();
3377        assert_eq!(original, cloned);
3378    }
3379
3380    // =========================================================================
3381    // get_collection_route() Tests
3382    // =========================================================================
3383
3384    #[test]
3385    fn test_get_collection_route_found() {
3386        let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3387            collection: "telemetry".to_string(),
3388            route: CollectionTransportRoute::Fixed {
3389                transport_type: TransportType::Quic,
3390            },
3391            priority: MessagePriority::High,
3392        });
3393
3394        let config = TransportManagerConfig {
3395            collection_routes: table,
3396            ..Default::default()
3397        };
3398        let manager = TransportManager::new(config);
3399
3400        let route = manager.get_collection_route("telemetry");
3401        assert!(route.is_some());
3402        assert_eq!(route.unwrap().collection, "telemetry");
3403        assert_eq!(route.unwrap().priority, MessagePriority::High);
3404    }
3405
3406    #[test]
3407    fn test_get_collection_route_not_found() {
3408        let config = TransportManagerConfig::default();
3409        let manager = TransportManager::new(config);
3410
3411        assert!(manager.get_collection_route("nonexistent").is_none());
3412    }
3413}