1use std::collections::HashMap;
88use std::net::SocketAddr;
89use std::sync::{Arc, RwLock};
90
91use super::bypass::{
92 BypassMessage, BypassTarget, BypassTransport, MessageEncoding, UdpBypassChannel,
93};
94use super::capabilities::{
95 MessagePriority, MessageRequirements, PaceLevel, PeerDistance, RangeMode, Transport,
96 TransportId, TransportInstance, TransportMode, TransportPolicy, TransportType,
97};
98use super::{NodeId, Result, TransportError};
99use serde::{Deserialize, Serialize};
100use std::collections::HashSet;
101use tokio::sync::broadcast;
102use tokio::sync::RwLock as TokioRwLock;
103
104type TransportInstanceMap = HashMap<TransportId, (TransportInstance, Arc<dyn Transport>)>;
106
107#[derive(Debug, Clone)]
113pub struct TransportManagerConfig {
114 pub preference_order: Vec<TransportType>,
117
118 pub enable_fallback: bool,
120
121 pub cache_peer_transport: bool,
123
124 pub switch_threshold: i32,
126
127 pub default_policy: Option<TransportPolicy>,
130
131 pub transport_mode: TransportMode,
133
134 pub collection_routes: CollectionRouteTable,
140}
141
142impl Default for TransportManagerConfig {
143 fn default() -> Self {
144 Self {
145 preference_order: vec![
146 TransportType::Quic,
147 TransportType::WifiDirect,
148 TransportType::BluetoothLE,
149 TransportType::LoRa,
150 ],
151 enable_fallback: true,
152 cache_peer_transport: true,
153 switch_threshold: 10,
154 default_policy: None,
155 transport_mode: TransportMode::Single,
156 collection_routes: CollectionRouteTable::default(),
157 }
158 }
159}
160
161impl TransportManagerConfig {
162 pub fn with_policy(policy: TransportPolicy) -> Self {
164 Self {
165 default_policy: Some(policy),
166 ..Default::default()
167 }
168 }
169
170 pub fn with_mode(mut self, mode: TransportMode) -> Self {
172 self.transport_mode = mode;
173 self
174 }
175}
176
177pub struct TransportManager {
213 transports: HashMap<TransportType, Arc<dyn Transport>>,
215
216 transport_instances: RwLock<TransportInstanceMap>,
218
219 peer_transports: RwLock<HashMap<NodeId, TransportType>>,
221
222 peer_transport_ids: RwLock<HashMap<NodeId, TransportId>>,
224
225 peer_distances: RwLock<HashMap<NodeId, PeerDistance>>,
227
228 config: TransportManagerConfig,
230
231 bypass_channel: Option<Arc<TokioRwLock<UdpBypassChannel>>>,
236}
237
238impl TransportManager {
239 pub fn new(config: TransportManagerConfig) -> Self {
241 Self {
242 transports: HashMap::new(),
243 transport_instances: RwLock::new(HashMap::new()),
244 peer_transports: RwLock::new(HashMap::new()),
245 peer_transport_ids: RwLock::new(HashMap::new()),
246 peer_distances: RwLock::new(HashMap::new()),
247 config,
248 bypass_channel: None,
249 }
250 }
251
252 pub fn with_bypass(config: TransportManagerConfig, bypass: UdpBypassChannel) -> Self {
254 Self {
255 transports: HashMap::new(),
256 transport_instances: RwLock::new(HashMap::new()),
257 peer_transports: RwLock::new(HashMap::new()),
258 peer_transport_ids: RwLock::new(HashMap::new()),
259 peer_distances: RwLock::new(HashMap::new()),
260 config,
261 bypass_channel: Some(Arc::new(TokioRwLock::new(bypass))),
262 }
263 }
264
265 pub fn set_bypass_channel(&mut self, bypass: UdpBypassChannel) {
267 self.bypass_channel = Some(Arc::new(TokioRwLock::new(bypass)));
268 }
269
270 pub fn has_bypass_channel(&self) -> bool {
272 self.bypass_channel.is_some()
273 }
274
275 pub async fn is_bypass_collection(&self, collection: &str) -> bool {
277 if let Some(ref bypass) = self.bypass_channel {
278 bypass.read().await.is_bypass_collection(collection)
279 } else {
280 false
281 }
282 }
283
284 pub fn register(&mut self, transport: Arc<dyn Transport>) {
288 let transport_type = transport.capabilities().transport_type;
289 self.transports.insert(transport_type, transport);
290 }
291
292 pub fn unregister(&mut self, transport_type: TransportType) -> Option<Arc<dyn Transport>> {
296 self.transports.remove(&transport_type)
297 }
298
299 pub fn get_transport(&self, transport_type: TransportType) -> Option<&Arc<dyn Transport>> {
301 self.transports.get(&transport_type)
302 }
303
304 pub fn registered_transports(&self) -> Vec<TransportType> {
306 self.transports.keys().copied().collect()
307 }
308
309 pub fn available_transports(&self, peer_id: &NodeId) -> Vec<TransportType> {
311 self.transports
312 .iter()
313 .filter(|(_, t)| t.is_available() && t.can_reach(peer_id))
314 .map(|(tt, _)| *tt)
315 .collect()
316 }
317
318 pub fn register_instance(&self, instance: TransportInstance, transport: Arc<dyn Transport>) {
339 let id = instance.id.clone();
340 self.transport_instances
341 .write()
342 .unwrap()
343 .insert(id, (instance, transport));
344 }
345
346 pub fn unregister_instance(
348 &self,
349 id: &TransportId,
350 ) -> Option<(TransportInstance, Arc<dyn Transport>)> {
351 self.transport_instances
352 .write()
353 .unwrap_or_else(|e| e.into_inner())
354 .remove(id)
355 }
356
357 pub fn get_instance(&self, id: &TransportId) -> Option<Arc<dyn Transport>> {
359 self.transport_instances
360 .read()
361 .unwrap()
362 .get(id)
363 .map(|(_, t)| Arc::clone(t))
364 }
365
366 pub fn registered_instance_ids(&self) -> Vec<TransportId> {
368 self.transport_instances
369 .read()
370 .unwrap()
371 .keys()
372 .cloned()
373 .collect()
374 }
375
376 pub fn available_instance_ids(&self) -> HashSet<TransportId> {
378 self.transport_instances
379 .read()
380 .unwrap()
381 .iter()
382 .filter(|(_, (inst, transport))| inst.available && transport.is_available())
383 .map(|(id, _)| id.clone())
384 .collect()
385 }
386
387 pub fn available_instances_for_peer(&self, peer_id: &NodeId) -> Vec<TransportId> {
389 self.transport_instances
390 .read()
391 .unwrap()
392 .iter()
393 .filter(|(_, (inst, transport))| {
394 inst.available && transport.is_available() && transport.can_reach(peer_id)
395 })
396 .map(|(id, _)| id.clone())
397 .collect()
398 }
399
400 pub fn current_pace_level(&self) -> PaceLevel {
404 match &self.config.default_policy {
405 Some(policy) => policy.current_level(&self.available_instance_ids()),
406 None => {
407 if !self.available_instance_ids().is_empty() {
409 PaceLevel::Primary
410 } else {
411 PaceLevel::None
412 }
413 }
414 }
415 }
416
417 pub fn select_transports_pace(
434 &self,
435 peer_id: &NodeId,
436 requirements: &MessageRequirements,
437 ) -> Vec<TransportId> {
438 let policy = match &self.config.default_policy {
439 Some(p) => p,
440 None => return Vec::new(), };
442
443 let instances = self
444 .transport_instances
445 .read()
446 .unwrap_or_else(|e| e.into_inner());
447 let available_for_peer: HashSet<_> = instances
448 .iter()
449 .filter(|(_, (inst, transport))| {
450 inst.available
451 && transport.is_available()
452 && transport.can_reach(peer_id)
453 && transport.capabilities().meets_requirements(requirements)
454 })
455 .map(|(id, _)| id.clone())
456 .collect();
457
458 let candidates: Vec<TransportId> = policy
460 .ordered()
461 .filter(|id| available_for_peer.contains(*id))
462 .cloned()
463 .collect();
464
465 match &self.config.transport_mode {
467 TransportMode::Single => candidates.into_iter().take(1).collect(),
468 TransportMode::Redundant {
469 min_paths,
470 max_paths,
471 } => {
472 let min = *min_paths as usize;
473 let max = max_paths.map(|m| m as usize).unwrap_or(candidates.len());
474 candidates.into_iter().take(max.max(min)).collect()
475 }
476 TransportMode::Bonded => candidates, TransportMode::LoadBalanced { .. } => candidates, }
479 }
480
481 pub fn select_transport_pace(
485 &self,
486 peer_id: &NodeId,
487 requirements: &MessageRequirements,
488 ) -> Option<TransportId> {
489 self.select_transports_pace(peer_id, requirements)
490 .into_iter()
491 .next()
492 }
493
494 pub fn record_success_pace(&self, peer_id: &NodeId, transport_id: TransportId) {
496 if self.config.cache_peer_transport {
497 self.peer_transport_ids
498 .write()
499 .unwrap()
500 .insert(peer_id.clone(), transport_id);
501 }
502 }
503
504 pub fn clear_cache_pace(&self, peer_id: &NodeId) {
506 self.peer_transport_ids
507 .write()
508 .unwrap_or_else(|e| e.into_inner())
509 .remove(peer_id);
510 }
511
512 pub fn select_transport(
528 &self,
529 peer_id: &NodeId,
530 requirements: &MessageRequirements,
531 ) -> Option<TransportType> {
532 if self.config.cache_peer_transport {
534 if let Some(&cached) = self
535 .peer_transports
536 .read()
537 .unwrap_or_else(|e| e.into_inner())
538 .get(peer_id)
539 {
540 if let Some(transport) = self.transports.get(&cached) {
542 if transport.is_available()
543 && transport.can_reach(peer_id)
544 && transport.capabilities().meets_requirements(requirements)
545 {
546 return Some(cached);
547 }
548 }
549 }
550 }
551
552 let candidates: Vec<_> = self
554 .available_transports(peer_id)
555 .into_iter()
556 .filter_map(|tt| {
557 let transport = self.transports.get(&tt)?;
558 let caps = transport.capabilities();
559
560 if !caps.meets_requirements(requirements) {
562 return None;
563 }
564
565 if let Some(max_latency) = requirements.max_latency_ms {
567 let est_delivery = transport.estimate_delivery_ms(requirements.message_size);
568 if est_delivery > max_latency {
569 return None;
570 }
571 }
572
573 let preference_bonus = self
575 .config
576 .preference_order
577 .iter()
578 .position(|&t| t == tt)
579 .map(|idx| 20 - (idx as i32 * 5))
580 .unwrap_or(0);
581
582 let score = transport.calculate_score(requirements, preference_bonus);
583 Some((tt, score))
584 })
585 .collect();
586
587 candidates
589 .into_iter()
590 .max_by_key(|(_, score)| *score)
591 .map(|(tt, _)| tt)
592 }
593
594 pub fn select_transport_for_distance(
599 &self,
600 peer_id: &NodeId,
601 requirements: &MessageRequirements,
602 ) -> Option<(TransportType, Option<RangeMode>)> {
603 let transport_type = self.select_transport(peer_id, requirements)?;
604
605 let distance = self
607 .peer_distances
608 .read()
609 .unwrap()
610 .get(peer_id)
611 .map(|d| d.distance_meters);
612
613 let range_mode = if let Some(_dist) = distance {
615 None } else {
619 None
620 };
621
622 Some((transport_type, range_mode))
623 }
624
625 pub fn record_success(&self, peer_id: &NodeId, transport_type: TransportType) {
629 if self.config.cache_peer_transport {
630 self.peer_transports
631 .write()
632 .unwrap()
633 .insert(peer_id.clone(), transport_type);
634 }
635 }
636
637 pub fn clear_cache(&self, peer_id: &NodeId) {
641 self.peer_transports
642 .write()
643 .unwrap_or_else(|e| e.into_inner())
644 .remove(peer_id);
645 }
646
647 pub fn update_peer_distance(&self, distance: PeerDistance) {
649 self.peer_distances
650 .write()
651 .unwrap()
652 .insert(distance.peer_id.clone(), distance);
653 }
654
655 pub fn get_peer_distance(&self, peer_id: &NodeId) -> Option<PeerDistance> {
657 self.peer_distances
658 .read()
659 .unwrap_or_else(|e| e.into_inner())
660 .get(peer_id)
661 .cloned()
662 }
663
664 pub async fn connect(
668 &self,
669 peer_id: &NodeId,
670 requirements: &MessageRequirements,
671 ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
672 let transport_type = self
673 .select_transport(peer_id, requirements)
674 .ok_or_else(|| {
675 TransportError::PeerNotFound(format!("No suitable transport for {}", peer_id))
676 })?;
677
678 let transport = self
679 .transports
680 .get(&transport_type)
681 .ok_or(TransportError::NotStarted)?;
682
683 let connection = transport.connect(peer_id).await?;
684
685 self.record_success(peer_id, transport_type);
687
688 Ok((transport_type, connection))
689 }
690
691 pub async fn connect_with_fallback(
695 &self,
696 peer_id: &NodeId,
697 requirements: &MessageRequirements,
698 ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
699 let candidates: Vec<_> = self
701 .available_transports(peer_id)
702 .into_iter()
703 .filter_map(|tt| {
704 let transport = self.transports.get(&tt)?;
705 if !transport.capabilities().meets_requirements(requirements) {
706 return None;
707 }
708 let preference_bonus = self
709 .config
710 .preference_order
711 .iter()
712 .position(|&t| t == tt)
713 .map(|idx| 20 - (idx as i32 * 5))
714 .unwrap_or(0);
715 let score = transport.calculate_score(requirements, preference_bonus);
716 Some((tt, score))
717 })
718 .collect();
719
720 let mut sorted: Vec<_> = candidates;
721 sorted.sort_by(|a, b| b.1.cmp(&a.1)); if sorted.is_empty() {
724 return Err(TransportError::PeerNotFound(format!(
725 "No suitable transport for {}",
726 peer_id
727 )));
728 }
729
730 let mut last_error = None;
731
732 for (transport_type, _) in sorted {
733 let transport = match self.transports.get(&transport_type) {
734 Some(t) => t,
735 None => continue,
736 };
737
738 match transport.connect(peer_id).await {
739 Ok(conn) => {
740 self.record_success(peer_id, transport_type);
741 return Ok((transport_type, conn));
742 }
743 Err(e) => {
744 if !self.config.enable_fallback {
745 return Err(e);
746 }
747 last_error = Some(e);
748 self.clear_cache(peer_id);
749 }
750 }
751 }
752
753 Err(last_error.unwrap_or_else(|| {
754 TransportError::PeerNotFound(format!("All transports failed for {}", peer_id))
755 }))
756 }
757
758 pub async fn send_bypass(
789 &self,
790 collection: &str,
791 data: &[u8],
792 target: Option<SocketAddr>,
793 ) -> Result<()> {
794 let bypass = self
795 .bypass_channel
796 .as_ref()
797 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
798
799 bypass
800 .read()
801 .await
802 .send_to_collection(collection, target, data)
803 .await
804 .map_err(|e| TransportError::Other(e.to_string().into()))
805 }
806
807 pub async fn send_bypass_to(
817 &self,
818 target: BypassTarget,
819 collection: &str,
820 data: &[u8],
821 ) -> Result<()> {
822 let bypass = self
823 .bypass_channel
824 .as_ref()
825 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
826
827 bypass
828 .read()
829 .await
830 .send(target, collection, data)
831 .await
832 .map_err(|e| TransportError::Other(e.to_string().into()))
833 }
834
835 pub async fn subscribe_bypass(&self) -> Result<broadcast::Receiver<BypassMessage>> {
854 let bypass = self
855 .bypass_channel
856 .as_ref()
857 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
858
859 Ok(bypass.read().await.subscribe())
860 }
861
862 pub async fn subscribe_bypass_collection(
876 &self,
877 collection: &str,
878 ) -> Result<(u32, broadcast::Receiver<BypassMessage>)> {
879 let bypass = self
880 .bypass_channel
881 .as_ref()
882 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
883
884 Ok(bypass.read().await.subscribe_collection(collection))
885 }
886
887 pub fn route_collection(
907 &self,
908 collection: &str,
909 peer_id: &NodeId,
910 requirements: &MessageRequirements,
911 ) -> RouteDecision {
912 let route_config = match self.config.collection_routes.get(collection) {
913 Some(config) => config,
914 None => return self.route_message(peer_id, requirements),
915 };
916
917 match &route_config.route {
918 CollectionTransportRoute::Bypass { .. } => {
919 if self.bypass_channel.is_some() {
920 RouteDecision::Bypass
921 } else {
922 RouteDecision::NoRoute
923 }
924 }
925 CollectionTransportRoute::Fixed { transport_type } => {
926 if let Some(transport) = self.transports.get(transport_type) {
928 if transport.is_available() && transport.can_reach(peer_id) {
929 RouteDecision::Transport(*transport_type)
930 } else {
931 RouteDecision::NoRoute
932 }
933 } else {
934 RouteDecision::NoRoute
935 }
936 }
937 CollectionTransportRoute::Pace { policy_override } => {
938 match self.select_transport_pace_with_policy(
939 peer_id,
940 requirements,
941 policy_override.as_ref(),
942 ) {
943 Some(id) => RouteDecision::TransportInstance(id),
944 None => RouteDecision::NoRoute,
945 }
946 }
947 }
948 }
949
950 fn select_transport_pace_with_policy(
955 &self,
956 peer_id: &NodeId,
957 requirements: &MessageRequirements,
958 policy_override: Option<&TransportPolicy>,
959 ) -> Option<TransportId> {
960 let policy = policy_override.or(self.config.default_policy.as_ref())?;
961
962 let instances = self
963 .transport_instances
964 .read()
965 .unwrap_or_else(|e| e.into_inner());
966 let available_for_peer: HashSet<_> = instances
967 .iter()
968 .filter(|(_, (inst, transport))| {
969 inst.available
970 && transport.is_available()
971 && transport.can_reach(peer_id)
972 && transport.capabilities().meets_requirements(requirements)
973 })
974 .map(|(id, _)| id.clone())
975 .collect();
976
977 policy
978 .ordered()
979 .find(|id| available_for_peer.contains(*id))
980 .cloned()
981 }
982
983 pub fn get_collection_route(&self, collection: &str) -> Option<&CollectionRouteConfig> {
985 self.config.collection_routes.get(collection)
986 }
987
988 pub fn route_message(
1002 &self,
1003 peer_id: &NodeId,
1004 requirements: &MessageRequirements,
1005 ) -> RouteDecision {
1006 if requirements.bypass_sync && self.bypass_channel.is_some() {
1008 return RouteDecision::Bypass;
1009 }
1010 match self.select_transport(peer_id, requirements) {
1014 Some(transport_type) => RouteDecision::Transport(transport_type),
1015 None => RouteDecision::NoRoute,
1016 }
1017 }
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq)]
1022pub enum RouteDecision {
1023 Bypass,
1025 Transport(TransportType),
1027 TransportInstance(TransportId),
1029 NoRoute,
1031}
1032
1033#[derive(Debug, Clone, Serialize, Deserialize)]
1064#[serde(tag = "transport", rename_all = "snake_case")]
1065pub enum CollectionTransportRoute {
1066 Fixed { transport_type: TransportType },
1068 Bypass {
1070 encoding: MessageEncoding,
1071 ttl_ms: u64,
1072 bypass_transport: BypassTransport,
1073 },
1074 Pace {
1076 policy_override: Option<TransportPolicy>,
1077 },
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1084pub struct CollectionRouteConfig {
1085 pub collection: String,
1087 pub route: CollectionTransportRoute,
1089 pub priority: MessagePriority,
1091}
1092
1093#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1120pub struct CollectionRouteTable {
1121 collections: Vec<CollectionRouteConfig>,
1122}
1123
1124impl CollectionRouteTable {
1125 pub fn new() -> Self {
1127 Self::default()
1128 }
1129
1130 pub fn with_collection(mut self, config: CollectionRouteConfig) -> Self {
1132 self.collections.push(config);
1133 self
1134 }
1135
1136 pub fn get(&self, collection: &str) -> Option<&CollectionRouteConfig> {
1138 self.collections.iter().find(|c| c.collection == collection)
1139 }
1140
1141 pub fn has_collection(&self, collection: &str) -> bool {
1143 self.collections.iter().any(|c| c.collection == collection)
1144 }
1145
1146 pub fn is_bypass(&self, collection: &str) -> bool {
1148 self.get(collection)
1149 .map(|c| matches!(c.route, CollectionTransportRoute::Bypass { .. }))
1150 .unwrap_or(false)
1151 }
1152}
1153
1154impl std::fmt::Debug for TransportManager {
1155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1156 f.debug_struct("TransportManager")
1157 .field("transports", &self.transports.keys().collect::<Vec<_>>())
1158 .field("config", &self.config)
1159 .finish()
1160 }
1161}
1162
1163#[cfg(test)]
1168mod tests {
1169 use super::*;
1170 use crate::transport::bypass::{BypassChannelConfig, UdpBypassChannel};
1171 use crate::transport::capabilities::{MessagePriority, TransportCapabilities};
1172 use crate::transport::{MeshConnection, MeshTransport, PeerEventReceiver};
1173 use async_trait::async_trait;
1174 use std::time::Instant;
1175 use tokio::sync::mpsc;
1176
1177 struct MockTransport {
1179 caps: TransportCapabilities,
1180 available: bool,
1181 reachable_peers: Vec<NodeId>,
1182 signal: Option<u8>,
1183 }
1184
1185 impl MockTransport {
1186 fn new(caps: TransportCapabilities) -> Self {
1187 Self {
1188 caps,
1189 available: true,
1190 reachable_peers: vec![],
1191 signal: None,
1192 }
1193 }
1194
1195 fn with_peer(mut self, peer: NodeId) -> Self {
1196 self.reachable_peers.push(peer);
1197 self
1198 }
1199
1200 #[allow(dead_code)]
1201 fn with_signal(mut self, signal: u8) -> Self {
1202 self.signal = Some(signal);
1203 self
1204 }
1205
1206 fn unavailable(mut self) -> Self {
1207 self.available = false;
1208 self
1209 }
1210 }
1211
1212 struct MockConnection {
1213 peer_id: NodeId,
1214 connected_at: Instant,
1215 }
1216
1217 impl MeshConnection for MockConnection {
1218 fn peer_id(&self) -> &NodeId {
1219 &self.peer_id
1220 }
1221
1222 fn is_alive(&self) -> bool {
1223 true
1224 }
1225
1226 fn connected_at(&self) -> Instant {
1227 self.connected_at
1228 }
1229 }
1230
1231 #[async_trait]
1232 impl MeshTransport for MockTransport {
1233 async fn start(&self) -> Result<()> {
1234 Ok(())
1235 }
1236
1237 async fn stop(&self) -> Result<()> {
1238 Ok(())
1239 }
1240
1241 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>> {
1242 if self.reachable_peers.contains(peer_id) {
1243 Ok(Box::new(MockConnection {
1244 peer_id: peer_id.clone(),
1245 connected_at: Instant::now(),
1246 }))
1247 } else {
1248 Err(TransportError::PeerNotFound(peer_id.to_string()))
1249 }
1250 }
1251
1252 async fn disconnect(&self, _peer_id: &NodeId) -> Result<()> {
1253 Ok(())
1254 }
1255
1256 fn get_connection(&self, _peer_id: &NodeId) -> Option<Box<dyn MeshConnection>> {
1257 None
1258 }
1259
1260 fn peer_count(&self) -> usize {
1261 0
1262 }
1263
1264 fn connected_peers(&self) -> Vec<NodeId> {
1265 vec![]
1266 }
1267
1268 fn subscribe_peer_events(&self) -> PeerEventReceiver {
1269 let (_tx, rx) = mpsc::channel(1);
1270 rx
1271 }
1272 }
1273
1274 impl Transport for MockTransport {
1275 fn capabilities(&self) -> &TransportCapabilities {
1276 &self.caps
1277 }
1278
1279 fn is_available(&self) -> bool {
1280 self.available
1281 }
1282
1283 fn signal_quality(&self) -> Option<u8> {
1284 self.signal
1285 }
1286
1287 fn can_reach(&self, peer_id: &NodeId) -> bool {
1288 self.reachable_peers.contains(peer_id)
1289 }
1290 }
1291
1292 #[test]
1293 fn test_register_transport() {
1294 let config = TransportManagerConfig::default();
1295 let mut manager = TransportManager::new(config);
1296
1297 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1298 manager.register(transport);
1299
1300 assert!(manager.get_transport(TransportType::Quic).is_some());
1301 assert!(manager.get_transport(TransportType::LoRa).is_none());
1302 }
1303
1304 #[test]
1305 fn test_unregister_transport() {
1306 let config = TransportManagerConfig::default();
1307 let mut manager = TransportManager::new(config);
1308
1309 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1310 manager.register(transport);
1311
1312 let removed = manager.unregister(TransportType::Quic);
1313 assert!(removed.is_some());
1314 assert!(manager.get_transport(TransportType::Quic).is_none());
1315 }
1316
1317 #[test]
1318 fn test_available_transports() {
1319 let config = TransportManagerConfig::default();
1320 let mut manager = TransportManager::new(config);
1321
1322 let peer = NodeId::new("peer-1".to_string());
1323
1324 let quic =
1326 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1327 manager.register(quic);
1328
1329 let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1331 manager.register(ble);
1332
1333 let lora = Arc::new(
1335 MockTransport::new(TransportCapabilities::lora(7))
1336 .unavailable()
1337 .with_peer(peer.clone()),
1338 );
1339 manager.register(lora);
1340
1341 let available = manager.available_transports(&peer);
1342 assert_eq!(available.len(), 1);
1343 assert!(available.contains(&TransportType::Quic));
1344 }
1345
1346 #[test]
1347 fn test_select_transport_by_reliability() {
1348 let config = TransportManagerConfig::default();
1349 let mut manager = TransportManager::new(config);
1350
1351 let peer = NodeId::new("peer-1".to_string());
1352
1353 let quic =
1355 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1356 manager.register(quic);
1357
1358 let lora =
1360 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1361 manager.register(lora);
1362
1363 let requirements = MessageRequirements {
1365 reliable: true,
1366 ..Default::default()
1367 };
1368
1369 let selected = manager.select_transport(&peer, &requirements);
1370 assert_eq!(selected, Some(TransportType::Quic));
1371 }
1372
1373 #[test]
1374 fn test_select_transport_by_preference() {
1375 let config = TransportManagerConfig {
1376 preference_order: vec![TransportType::BluetoothLE, TransportType::Quic],
1377 ..Default::default()
1378 };
1379 let mut manager = TransportManager::new(config);
1380
1381 let peer = NodeId::new("peer-1".to_string());
1382
1383 let quic =
1385 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1386 manager.register(quic);
1387
1388 let ble = Arc::new(
1389 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1390 );
1391 manager.register(ble);
1392
1393 let requirements = MessageRequirements::default();
1394 let selected = manager.select_transport(&peer, &requirements);
1395
1396 assert_eq!(selected, Some(TransportType::BluetoothLE));
1398 }
1399
1400 #[test]
1401 fn test_select_transport_by_latency() {
1402 let config = TransportManagerConfig::default();
1403 let mut manager = TransportManager::new(config);
1404
1405 let peer = NodeId::new("peer-1".to_string());
1406
1407 let quic =
1409 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1410 manager.register(quic);
1411
1412 let mut lora_caps = TransportCapabilities::lora(7);
1414 lora_caps.reliable = true; let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1416 manager.register(lora);
1417
1418 let requirements = MessageRequirements {
1420 priority: MessagePriority::High,
1421 reliable: true,
1422 ..Default::default()
1423 };
1424
1425 let selected = manager.select_transport(&peer, &requirements);
1426 assert_eq!(selected, Some(TransportType::Quic));
1427 }
1428
1429 #[test]
1430 fn test_select_transport_with_latency_requirement() {
1431 let config = TransportManagerConfig::default();
1432 let mut manager = TransportManager::new(config);
1433
1434 let peer = NodeId::new("peer-1".to_string());
1435
1436 let quic =
1438 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1439 manager.register(quic);
1440
1441 let mut lora_caps = TransportCapabilities::lora(12);
1443 lora_caps.reliable = true;
1444 let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1445 manager.register(lora);
1446
1447 let requirements = MessageRequirements {
1449 reliable: true,
1450 max_latency_ms: Some(50),
1451 ..Default::default()
1452 };
1453
1454 let selected = manager.select_transport(&peer, &requirements);
1455 assert_eq!(selected, Some(TransportType::Quic));
1456 }
1457
1458 #[test]
1459 fn test_select_transport_no_match() {
1460 let config = TransportManagerConfig::default();
1461 let mut manager = TransportManager::new(config);
1462
1463 let peer = NodeId::new("peer-1".to_string());
1464
1465 let lora =
1467 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1468 manager.register(lora);
1469
1470 let requirements = MessageRequirements {
1472 reliable: true,
1473 ..Default::default()
1474 };
1475
1476 let selected = manager.select_transport(&peer, &requirements);
1477 assert_eq!(selected, None);
1478 }
1479
1480 #[test]
1481 fn test_peer_transport_caching() {
1482 let config = TransportManagerConfig {
1483 cache_peer_transport: true,
1484 ..Default::default()
1485 };
1486 let mut manager = TransportManager::new(config);
1487
1488 let peer = NodeId::new("peer-1".to_string());
1489
1490 let quic =
1491 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1492 manager.register(quic);
1493
1494 let ble = Arc::new(
1495 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1496 );
1497 manager.register(ble);
1498
1499 manager.record_success(&peer, TransportType::BluetoothLE);
1501
1502 let requirements = MessageRequirements::default();
1504 let selected = manager.select_transport(&peer, &requirements);
1505 assert_eq!(selected, Some(TransportType::BluetoothLE));
1506
1507 manager.clear_cache(&peer);
1509
1510 let selected = manager.select_transport(&peer, &requirements);
1512 assert_eq!(selected, Some(TransportType::Quic));
1514 }
1515
1516 #[test]
1517 fn test_power_sensitive_selection() {
1518 let config = TransportManagerConfig {
1520 preference_order: vec![],
1521 ..Default::default()
1522 };
1523 let mut manager = TransportManager::new(config);
1524
1525 let peer = NodeId::new("peer-1".to_string());
1526
1527 let quic =
1529 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1530 manager.register(quic);
1531
1532 let ble = Arc::new(
1534 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1535 );
1536 manager.register(ble);
1537
1538 let requirements = MessageRequirements {
1540 power_sensitive: true,
1541 ..Default::default()
1542 };
1543
1544 let selected = manager.select_transport(&peer, &requirements);
1545 assert_eq!(selected, Some(TransportType::BluetoothLE));
1547 }
1548
1549 #[tokio::test]
1550 async fn test_connect_selects_transport() {
1551 let config = TransportManagerConfig::default();
1552 let mut manager = TransportManager::new(config);
1553
1554 let peer = NodeId::new("peer-1".to_string());
1555
1556 let quic =
1557 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1558 manager.register(quic);
1559
1560 let requirements = MessageRequirements::default();
1561 let result = manager.connect(&peer, &requirements).await;
1562
1563 assert!(result.is_ok());
1564 let (transport_type, conn) = result.unwrap();
1565 assert_eq!(transport_type, TransportType::Quic);
1566 assert_eq!(conn.peer_id(), &peer);
1567 }
1568
1569 #[tokio::test]
1570 async fn test_connect_with_fallback() {
1571 let config = TransportManagerConfig {
1572 enable_fallback: true,
1573 ..Default::default()
1574 };
1575 let mut manager = TransportManager::new(config);
1576
1577 let peer = NodeId::new("peer-1".to_string());
1578
1579 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1581 manager.register(quic);
1582
1583 let ble = Arc::new(
1585 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1586 );
1587 manager.register(ble);
1588
1589 let requirements = MessageRequirements::default();
1590 let result = manager.connect_with_fallback(&peer, &requirements).await;
1591
1592 assert!(result.is_ok());
1593 let (transport_type, _) = result.unwrap();
1594 assert_eq!(transport_type, TransportType::BluetoothLE);
1595 }
1596
1597 #[test]
1598 fn test_distance_tracking() {
1599 let config = TransportManagerConfig::default();
1600 let manager = TransportManager::new(config);
1601
1602 let peer = NodeId::new("peer-1".to_string());
1603
1604 let distance = PeerDistance {
1605 peer_id: peer.clone(),
1606 distance_meters: 500,
1607 source: super::super::capabilities::DistanceSource::Gps {
1608 confidence_meters: 10,
1609 },
1610 last_updated: Instant::now(),
1611 };
1612
1613 manager.update_peer_distance(distance);
1614
1615 let retrieved = manager.get_peer_distance(&peer);
1616 assert!(retrieved.is_some());
1617 assert_eq!(retrieved.unwrap().distance_meters, 500);
1618 }
1619
1620 #[tokio::test]
1625 async fn test_no_bypass_channel_by_default() {
1626 let config = TransportManagerConfig::default();
1627 let manager = TransportManager::new(config);
1628
1629 assert!(!manager.has_bypass_channel());
1630 assert!(!manager.is_bypass_collection("test").await);
1631 }
1632
1633 #[test]
1634 fn test_route_message_without_bypass() {
1635 let config = TransportManagerConfig::default();
1636 let mut manager = TransportManager::new(config);
1637
1638 let peer = NodeId::new("peer-1".to_string());
1639
1640 let quic =
1641 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1642 manager.register(quic);
1643
1644 let requirements = MessageRequirements::default();
1646 let decision = manager.route_message(&peer, &requirements);
1647 assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1648
1649 let bypass_req = MessageRequirements {
1652 bypass_sync: true,
1653 max_latency_ms: Some(100), ..Default::default()
1655 };
1656 let decision = manager.route_message(&peer, &bypass_req);
1657 assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1659 }
1660
1661 #[tokio::test]
1662 async fn test_subscribe_bypass_not_configured() {
1663 let config = TransportManagerConfig::default();
1664 let manager = TransportManager::new(config);
1665
1666 let result = manager.subscribe_bypass().await;
1667 assert!(result.is_err());
1668 }
1669
1670 #[test]
1671 fn test_route_decision_equality() {
1672 assert_eq!(RouteDecision::Bypass, RouteDecision::Bypass);
1673 assert_eq!(
1674 RouteDecision::Transport(TransportType::Quic),
1675 RouteDecision::Transport(TransportType::Quic)
1676 );
1677 assert_ne!(RouteDecision::Bypass, RouteDecision::NoRoute);
1678 assert_ne!(
1679 RouteDecision::Transport(TransportType::Quic),
1680 RouteDecision::Transport(TransportType::LoRa)
1681 );
1682 }
1683
1684 #[test]
1689 fn test_register_instance() {
1690 let config = TransportManagerConfig::default();
1691 let manager = TransportManager::new(config);
1692
1693 let peer = NodeId::new("peer-1".to_string());
1694 let instance = TransportInstance::new(
1695 "iroh-eth0",
1696 TransportType::Quic,
1697 TransportCapabilities::quic(),
1698 );
1699 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer));
1700
1701 manager.register_instance(instance, transport);
1702
1703 assert!(manager.get_instance(&"iroh-eth0".to_string()).is_some());
1704 assert!(manager.get_instance(&"nonexistent".to_string()).is_none());
1705 }
1706
1707 #[test]
1708 fn test_unregister_instance() {
1709 let config = TransportManagerConfig::default();
1710 let manager = TransportManager::new(config);
1711
1712 let instance = TransportInstance::new(
1713 "iroh-eth0",
1714 TransportType::Quic,
1715 TransportCapabilities::quic(),
1716 );
1717 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1718
1719 manager.register_instance(instance, transport);
1720
1721 let removed = manager.unregister_instance(&"iroh-eth0".to_string());
1722 assert!(removed.is_some());
1723 let (inst, _) = removed.unwrap();
1724 assert_eq!(inst.id, "iroh-eth0");
1725
1726 assert!(manager.get_instance(&"iroh-eth0".to_string()).is_none());
1728
1729 let removed_again = manager.unregister_instance(&"iroh-eth0".to_string());
1731 assert!(removed_again.is_none());
1732 }
1733
1734 #[test]
1735 fn test_registered_instance_ids() {
1736 let config = TransportManagerConfig::default();
1737 let manager = TransportManager::new(config);
1738
1739 assert!(manager.registered_instance_ids().is_empty());
1741
1742 let inst1 = TransportInstance::new(
1743 "iroh-eth0",
1744 TransportType::Quic,
1745 TransportCapabilities::quic(),
1746 );
1747 let inst2 = TransportInstance::new(
1748 "lora-915",
1749 TransportType::LoRa,
1750 TransportCapabilities::lora(7),
1751 );
1752
1753 manager.register_instance(
1754 inst1,
1755 Arc::new(MockTransport::new(TransportCapabilities::quic())),
1756 );
1757 manager.register_instance(
1758 inst2,
1759 Arc::new(MockTransport::new(TransportCapabilities::lora(7))),
1760 );
1761
1762 let ids = manager.registered_instance_ids();
1763 assert_eq!(ids.len(), 2);
1764 assert!(ids.contains(&"iroh-eth0".to_string()));
1765 assert!(ids.contains(&"lora-915".to_string()));
1766 }
1767
1768 #[test]
1769 fn test_available_instance_ids() {
1770 let config = TransportManagerConfig::default();
1771 let manager = TransportManager::new(config);
1772
1773 let inst1 = TransportInstance::new(
1775 "iroh-eth0",
1776 TransportType::Quic,
1777 TransportCapabilities::quic(),
1778 );
1779 let transport1 = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1780 manager.register_instance(inst1, transport1);
1781
1782 let inst2 = TransportInstance::new(
1784 "lora-off",
1785 TransportType::LoRa,
1786 TransportCapabilities::lora(7),
1787 );
1788 let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)).unavailable());
1789 manager.register_instance(inst2, transport2);
1790
1791 let mut inst3 = TransportInstance::new(
1793 "ble-disabled",
1794 TransportType::BluetoothLE,
1795 TransportCapabilities::bluetooth_le(),
1796 );
1797 inst3.available = false;
1798 let transport3 = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1799 manager.register_instance(inst3, transport3);
1800
1801 let available = manager.available_instance_ids();
1802 assert_eq!(available.len(), 1);
1803 assert!(available.contains("iroh-eth0"));
1804 }
1805
1806 #[test]
1807 fn test_available_instances_for_peer() {
1808 let config = TransportManagerConfig::default();
1809 let manager = TransportManager::new(config);
1810
1811 let peer = NodeId::new("peer-1".to_string());
1812
1813 let inst1 = TransportInstance::new(
1815 "iroh-eth0",
1816 TransportType::Quic,
1817 TransportCapabilities::quic(),
1818 );
1819 let transport1 =
1820 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1821 manager.register_instance(inst1, transport1);
1822
1823 let inst2 = TransportInstance::new(
1825 "lora-915",
1826 TransportType::LoRa,
1827 TransportCapabilities::lora(7),
1828 );
1829 let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1830 manager.register_instance(inst2, transport2);
1831
1832 let inst3 = TransportInstance::new(
1834 "ble-off",
1835 TransportType::BluetoothLE,
1836 TransportCapabilities::bluetooth_le(),
1837 );
1838 let transport3 = Arc::new(
1839 MockTransport::new(TransportCapabilities::bluetooth_le())
1840 .with_peer(peer.clone())
1841 .unavailable(),
1842 );
1843 manager.register_instance(inst3, transport3);
1844
1845 let for_peer = manager.available_instances_for_peer(&peer);
1846 assert_eq!(for_peer.len(), 1);
1847 assert_eq!(for_peer[0], "iroh-eth0");
1848 }
1849
1850 #[test]
1855 fn test_current_pace_level_no_policy_with_available() {
1856 let config = TransportManagerConfig::default();
1857 let manager = TransportManager::new(config);
1858
1859 let inst = TransportInstance::new(
1861 "iroh-eth0",
1862 TransportType::Quic,
1863 TransportCapabilities::quic(),
1864 );
1865 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1866 manager.register_instance(inst, transport);
1867
1868 assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1870 }
1871
1872 #[test]
1873 fn test_current_pace_level_no_policy_none_available() {
1874 let config = TransportManagerConfig::default();
1875 let manager = TransportManager::new(config);
1876
1877 assert_eq!(manager.current_pace_level(), PaceLevel::None);
1879 }
1880
1881 #[test]
1882 fn test_current_pace_level_no_policy_all_unavailable() {
1883 let config = TransportManagerConfig::default();
1884 let manager = TransportManager::new(config);
1885
1886 let inst = TransportInstance::new(
1888 "iroh-eth0",
1889 TransportType::Quic,
1890 TransportCapabilities::quic(),
1891 );
1892 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).unavailable());
1893 manager.register_instance(inst, transport);
1894
1895 assert_eq!(manager.current_pace_level(), PaceLevel::None);
1896 }
1897
1898 #[test]
1899 fn test_current_pace_level_with_policy_primary() {
1900 let policy = TransportPolicy::new("test")
1901 .primary(vec!["iroh-eth0"])
1902 .alternate(vec!["lora-915"])
1903 .emergency(vec!["ble-mesh"]);
1904
1905 let config = TransportManagerConfig::with_policy(policy);
1906 let manager = TransportManager::new(config);
1907
1908 let inst = TransportInstance::new(
1910 "iroh-eth0",
1911 TransportType::Quic,
1912 TransportCapabilities::quic(),
1913 );
1914 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1915 manager.register_instance(inst, transport);
1916
1917 assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1918 }
1919
1920 #[test]
1921 fn test_current_pace_level_with_policy_alternate() {
1922 let policy = TransportPolicy::new("test")
1923 .primary(vec!["iroh-eth0"])
1924 .alternate(vec!["lora-915"])
1925 .emergency(vec!["ble-mesh"]);
1926
1927 let config = TransportManagerConfig::with_policy(policy);
1928 let manager = TransportManager::new(config);
1929
1930 let inst = TransportInstance::new(
1932 "lora-915",
1933 TransportType::LoRa,
1934 TransportCapabilities::lora(7),
1935 );
1936 let transport = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1937 manager.register_instance(inst, transport);
1938
1939 assert_eq!(manager.current_pace_level(), PaceLevel::Alternate);
1940 }
1941
1942 #[test]
1943 fn test_current_pace_level_with_policy_emergency() {
1944 let policy = TransportPolicy::new("test")
1945 .primary(vec!["iroh-eth0"])
1946 .alternate(vec!["lora-915"])
1947 .emergency(vec!["ble-mesh"]);
1948
1949 let config = TransportManagerConfig::with_policy(policy);
1950 let manager = TransportManager::new(config);
1951
1952 let inst = TransportInstance::new(
1954 "ble-mesh",
1955 TransportType::BluetoothLE,
1956 TransportCapabilities::bluetooth_le(),
1957 );
1958 let transport = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1959 manager.register_instance(inst, transport);
1960
1961 assert_eq!(manager.current_pace_level(), PaceLevel::Emergency);
1962 }
1963
1964 #[test]
1965 fn test_current_pace_level_with_policy_none_available() {
1966 let policy = TransportPolicy::new("test")
1967 .primary(vec!["iroh-eth0"])
1968 .alternate(vec!["lora-915"]);
1969
1970 let config = TransportManagerConfig::with_policy(policy);
1971 let manager = TransportManager::new(config);
1972
1973 assert_eq!(manager.current_pace_level(), PaceLevel::None);
1975 }
1976
1977 #[test]
1982 fn test_select_transports_pace_no_policy() {
1983 let config = TransportManagerConfig::default();
1984 let manager = TransportManager::new(config);
1985
1986 let peer = NodeId::new("peer-1".to_string());
1987 let requirements = MessageRequirements::default();
1988
1989 let selected = manager.select_transports_pace(&peer, &requirements);
1991 assert!(selected.is_empty());
1992 }
1993
1994 #[test]
1995 fn test_select_transports_pace_single_mode() {
1996 let policy = TransportPolicy::new("test")
1997 .primary(vec!["iroh-eth0", "iroh-wlan0"])
1998 .alternate(vec!["lora-915"]);
1999
2000 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
2001 let manager = TransportManager::new(config);
2002
2003 let peer = NodeId::new("peer-1".to_string());
2004
2005 let inst1 = TransportInstance::new(
2007 "iroh-eth0",
2008 TransportType::Quic,
2009 TransportCapabilities::quic(),
2010 );
2011 let t1 =
2012 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2013 manager.register_instance(inst1, t1);
2014
2015 let inst2 = TransportInstance::new(
2016 "iroh-wlan0",
2017 TransportType::Quic,
2018 TransportCapabilities::quic(),
2019 );
2020 let t2 =
2021 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2022 manager.register_instance(inst2, t2);
2023
2024 let requirements = MessageRequirements::default();
2025 let selected = manager.select_transports_pace(&peer, &requirements);
2026
2027 assert_eq!(selected.len(), 1);
2029 assert_eq!(selected[0], "iroh-eth0");
2030 }
2031
2032 #[test]
2033 fn test_select_transports_pace_redundant_mode() {
2034 let policy = TransportPolicy::new("test")
2035 .primary(vec!["iroh-eth0", "iroh-wlan0"])
2036 .alternate(vec!["lora-915"]);
2037
2038 let config =
2039 TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(2));
2040 let manager = TransportManager::new(config);
2041
2042 let peer = NodeId::new("peer-1".to_string());
2043
2044 let inst1 = TransportInstance::new(
2045 "iroh-eth0",
2046 TransportType::Quic,
2047 TransportCapabilities::quic(),
2048 );
2049 let t1 =
2050 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2051 manager.register_instance(inst1, t1);
2052
2053 let inst2 = TransportInstance::new(
2054 "iroh-wlan0",
2055 TransportType::Quic,
2056 TransportCapabilities::quic(),
2057 );
2058 let t2 =
2059 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2060 manager.register_instance(inst2, t2);
2061
2062 let inst3 = TransportInstance::new(
2063 "lora-915",
2064 TransportType::LoRa,
2065 TransportCapabilities::lora(7),
2066 );
2067 let t3 =
2068 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2069 manager.register_instance(inst3, t3);
2070
2071 let requirements = MessageRequirements::default();
2072 let selected = manager.select_transports_pace(&peer, &requirements);
2073
2074 assert!(selected.len() >= 2);
2076 }
2077
2078 #[test]
2079 fn test_select_transports_pace_redundant_bounded() {
2080 let policy = TransportPolicy::new("test").primary(vec!["t1", "t2", "t3", "t4"]);
2081
2082 let config = TransportManagerConfig::with_policy(policy)
2083 .with_mode(TransportMode::redundant_bounded(1, 2));
2084 let manager = TransportManager::new(config);
2085
2086 let peer = NodeId::new("peer-1".to_string());
2087
2088 for name in &["t1", "t2", "t3", "t4"] {
2090 let inst =
2091 TransportInstance::new(*name, TransportType::Quic, TransportCapabilities::quic());
2092 let t =
2093 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2094 manager.register_instance(inst, t);
2095 }
2096
2097 let requirements = MessageRequirements::default();
2098 let selected = manager.select_transports_pace(&peer, &requirements);
2099
2100 assert_eq!(selected.len(), 2);
2102 }
2103
2104 #[test]
2105 fn test_select_transports_pace_bonded_mode() {
2106 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2107
2108 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2109 let manager = TransportManager::new(config);
2110
2111 let peer = NodeId::new("peer-1".to_string());
2112
2113 let inst1 = TransportInstance::new(
2114 "iroh-eth0",
2115 TransportType::Quic,
2116 TransportCapabilities::quic(),
2117 );
2118 let t1 =
2119 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2120 manager.register_instance(inst1, t1);
2121
2122 let inst2 = TransportInstance::new(
2123 "iroh-wlan0",
2124 TransportType::Quic,
2125 TransportCapabilities::quic(),
2126 );
2127 let t2 =
2128 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2129 manager.register_instance(inst2, t2);
2130
2131 let requirements = MessageRequirements::default();
2132 let selected = manager.select_transports_pace(&peer, &requirements);
2133
2134 assert_eq!(selected.len(), 2);
2136 }
2137
2138 #[test]
2139 fn test_select_transports_pace_load_balanced_mode() {
2140 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2141
2142 let config = TransportManagerConfig::with_policy(policy)
2143 .with_mode(TransportMode::LoadBalanced { weights: None });
2144 let manager = TransportManager::new(config);
2145
2146 let peer = NodeId::new("peer-1".to_string());
2147
2148 let inst1 = TransportInstance::new(
2149 "iroh-eth0",
2150 TransportType::Quic,
2151 TransportCapabilities::quic(),
2152 );
2153 let t1 =
2154 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2155 manager.register_instance(inst1, t1);
2156
2157 let inst2 = TransportInstance::new(
2158 "iroh-wlan0",
2159 TransportType::Quic,
2160 TransportCapabilities::quic(),
2161 );
2162 let t2 =
2163 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2164 manager.register_instance(inst2, t2);
2165
2166 let requirements = MessageRequirements::default();
2167 let selected = manager.select_transports_pace(&peer, &requirements);
2168
2169 assert_eq!(selected.len(), 2);
2171 }
2172
2173 #[test]
2174 fn test_select_transports_pace_filters_by_requirements() {
2175 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2176
2177 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2178 let manager = TransportManager::new(config);
2179
2180 let peer = NodeId::new("peer-1".to_string());
2181
2182 let inst1 = TransportInstance::new(
2184 "iroh-eth0",
2185 TransportType::Quic,
2186 TransportCapabilities::quic(),
2187 );
2188 let t1 =
2189 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2190 manager.register_instance(inst1, t1);
2191
2192 let inst2 = TransportInstance::new(
2194 "lora-915",
2195 TransportType::LoRa,
2196 TransportCapabilities::lora(7),
2197 );
2198 let t2 =
2199 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2200 manager.register_instance(inst2, t2);
2201
2202 let requirements = MessageRequirements {
2204 reliable: true,
2205 ..Default::default()
2206 };
2207 let selected = manager.select_transports_pace(&peer, &requirements);
2208
2209 assert_eq!(selected.len(), 1);
2210 assert_eq!(selected[0], "iroh-eth0");
2211 }
2212
2213 #[test]
2214 fn test_select_transports_pace_filters_unreachable_peer() {
2215 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2216
2217 let config = TransportManagerConfig::with_policy(policy);
2218 let manager = TransportManager::new(config);
2219
2220 let peer = NodeId::new("peer-1".to_string());
2221
2222 let inst1 = TransportInstance::new(
2224 "iroh-eth0",
2225 TransportType::Quic,
2226 TransportCapabilities::quic(),
2227 );
2228 let t1 =
2229 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2230 manager.register_instance(inst1, t1);
2231
2232 let inst2 = TransportInstance::new(
2234 "lora-915",
2235 TransportType::LoRa,
2236 TransportCapabilities::lora(7),
2237 );
2238 let t2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
2239 manager.register_instance(inst2, t2);
2240
2241 let requirements = MessageRequirements::default();
2242 let selected = manager.select_transports_pace(&peer, &requirements);
2243
2244 assert_eq!(selected.len(), 1);
2245 assert_eq!(selected[0], "iroh-eth0");
2246 }
2247
2248 #[test]
2253 fn test_select_transport_pace_returns_first() {
2254 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2255
2256 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2257 let manager = TransportManager::new(config);
2258
2259 let peer = NodeId::new("peer-1".to_string());
2260
2261 let inst1 = TransportInstance::new(
2262 "iroh-eth0",
2263 TransportType::Quic,
2264 TransportCapabilities::quic(),
2265 );
2266 let t1 =
2267 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2268 manager.register_instance(inst1, t1);
2269
2270 let inst2 = TransportInstance::new(
2271 "iroh-wlan0",
2272 TransportType::Quic,
2273 TransportCapabilities::quic(),
2274 );
2275 let t2 =
2276 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2277 manager.register_instance(inst2, t2);
2278
2279 let requirements = MessageRequirements::default();
2280 let selected = manager.select_transport_pace(&peer, &requirements);
2281
2282 assert_eq!(selected, Some("iroh-eth0".to_string()));
2283 }
2284
2285 #[test]
2286 fn test_select_transport_pace_returns_none_no_policy() {
2287 let config = TransportManagerConfig::default();
2288 let manager = TransportManager::new(config);
2289
2290 let peer = NodeId::new("peer-1".to_string());
2291 let requirements = MessageRequirements::default();
2292
2293 assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2294 }
2295
2296 #[test]
2297 fn test_select_transport_pace_returns_none_no_candidates() {
2298 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
2299
2300 let config = TransportManagerConfig::with_policy(policy);
2301 let manager = TransportManager::new(config);
2302
2303 let peer = NodeId::new("peer-1".to_string());
2304 let requirements = MessageRequirements::default();
2305
2306 assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2308 }
2309
2310 #[test]
2315 fn test_record_success_pace_caching_enabled() {
2316 let config = TransportManagerConfig {
2317 cache_peer_transport: true,
2318 ..Default::default()
2319 };
2320 let manager = TransportManager::new(config);
2321
2322 let peer = NodeId::new("peer-1".to_string());
2323 manager.record_success_pace(&peer, "iroh-eth0".to_string());
2324
2325 let cached = manager
2326 .peer_transport_ids
2327 .read()
2328 .unwrap_or_else(|e| e.into_inner());
2329 assert_eq!(cached.get(&peer), Some(&"iroh-eth0".to_string()));
2330 }
2331
2332 #[test]
2333 fn test_record_success_pace_caching_disabled() {
2334 let config = TransportManagerConfig {
2335 cache_peer_transport: false,
2336 ..Default::default()
2337 };
2338 let manager = TransportManager::new(config);
2339
2340 let peer = NodeId::new("peer-1".to_string());
2341 manager.record_success_pace(&peer, "iroh-eth0".to_string());
2342
2343 let cached = manager
2344 .peer_transport_ids
2345 .read()
2346 .unwrap_or_else(|e| e.into_inner());
2347 assert!(cached.get(&peer).is_none());
2348 }
2349
2350 #[test]
2351 fn test_clear_cache_pace() {
2352 let config = TransportManagerConfig {
2353 cache_peer_transport: true,
2354 ..Default::default()
2355 };
2356 let manager = TransportManager::new(config);
2357
2358 let peer = NodeId::new("peer-1".to_string());
2359 manager.record_success_pace(&peer, "iroh-eth0".to_string());
2360
2361 assert!(manager
2363 .peer_transport_ids
2364 .read()
2365 .unwrap()
2366 .get(&peer)
2367 .is_some());
2368
2369 manager.clear_cache_pace(&peer);
2370
2371 assert!(manager
2373 .peer_transport_ids
2374 .read()
2375 .unwrap()
2376 .get(&peer)
2377 .is_none());
2378 }
2379
2380 #[test]
2381 fn test_clear_cache_pace_nonexistent_peer() {
2382 let config = TransportManagerConfig::default();
2383 let manager = TransportManager::new(config);
2384
2385 let peer = NodeId::new("nonexistent".to_string());
2386
2387 manager.clear_cache_pace(&peer);
2389 }
2390
2391 #[test]
2396 fn test_select_transport_for_distance_no_distance() {
2397 let config = TransportManagerConfig::default();
2398 let mut manager = TransportManager::new(config);
2399
2400 let peer = NodeId::new("peer-1".to_string());
2401 let quic =
2402 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2403 manager.register(quic);
2404
2405 let requirements = MessageRequirements::default();
2406 let result = manager.select_transport_for_distance(&peer, &requirements);
2407
2408 assert!(result.is_some());
2409 let (transport_type, range_mode) = result.unwrap();
2410 assert_eq!(transport_type, TransportType::Quic);
2411 assert!(range_mode.is_none());
2412 }
2413
2414 #[test]
2415 fn test_select_transport_for_distance_with_distance() {
2416 let config = TransportManagerConfig::default();
2417 let mut manager = TransportManager::new(config);
2418
2419 let peer = NodeId::new("peer-1".to_string());
2420 let quic =
2421 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2422 manager.register(quic);
2423
2424 let distance = PeerDistance {
2426 peer_id: peer.clone(),
2427 distance_meters: 1000,
2428 source: super::super::capabilities::DistanceSource::Configured,
2429 last_updated: Instant::now(),
2430 };
2431 manager.update_peer_distance(distance);
2432
2433 let requirements = MessageRequirements::default();
2434 let result = manager.select_transport_for_distance(&peer, &requirements);
2435
2436 assert!(result.is_some());
2437 let (transport_type, range_mode) = result.unwrap();
2438 assert_eq!(transport_type, TransportType::Quic);
2439 assert!(range_mode.is_none());
2441 }
2442
2443 #[test]
2444 fn test_select_transport_for_distance_no_suitable_transport() {
2445 let config = TransportManagerConfig::default();
2446 let manager = TransportManager::new(config);
2447
2448 let peer = NodeId::new("peer-1".to_string());
2449 let requirements = MessageRequirements::default();
2450
2451 let result = manager.select_transport_for_distance(&peer, &requirements);
2452 assert!(result.is_none());
2453 }
2454
2455 #[test]
2460 fn test_config_with_policy() {
2461 let policy = TransportPolicy::new("tactical")
2462 .primary(vec!["iroh-eth0"])
2463 .alternate(vec!["lora-915"]);
2464
2465 let config = TransportManagerConfig::with_policy(policy);
2466
2467 assert!(config.default_policy.is_some());
2468 let p = config.default_policy.unwrap();
2469 assert_eq!(p.name, "tactical");
2470 assert_eq!(p.primary.len(), 1);
2471 assert_eq!(p.alternate.len(), 1);
2472 assert!(config.enable_fallback);
2474 assert!(config.cache_peer_transport);
2475 assert_eq!(config.switch_threshold, 10);
2476 assert!(matches!(config.transport_mode, TransportMode::Single));
2477 }
2478
2479 #[test]
2480 fn test_config_with_mode() {
2481 let config = TransportManagerConfig::default().with_mode(TransportMode::Bonded);
2482
2483 assert!(matches!(config.transport_mode, TransportMode::Bonded));
2484 }
2485
2486 #[test]
2487 fn test_config_with_policy_and_mode_chained() {
2488 let policy = TransportPolicy::new("test").primary(vec!["t1"]);
2489 let config =
2490 TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(3));
2491
2492 assert!(config.default_policy.is_some());
2493 assert!(matches!(
2494 config.transport_mode,
2495 TransportMode::Redundant {
2496 min_paths: 3,
2497 max_paths: None
2498 }
2499 ));
2500 }
2501
2502 #[tokio::test]
2507 async fn test_connect_no_suitable_transport() {
2508 let config = TransportManagerConfig::default();
2509 let manager = TransportManager::new(config);
2510
2511 let peer = NodeId::new("peer-1".to_string());
2512 let requirements = MessageRequirements::default();
2513
2514 let result = manager.connect(&peer, &requirements).await;
2515 assert!(result.is_err());
2516 match result {
2517 Err(TransportError::PeerNotFound(_)) => {} Err(other) => panic!("Expected PeerNotFound, got: {}", other),
2519 Ok(_) => panic!("Expected error but got Ok"),
2520 }
2521 }
2522
2523 #[tokio::test]
2524 async fn test_connect_unreachable_peer() {
2525 let config = TransportManagerConfig::default();
2526 let mut manager = TransportManager::new(config);
2527
2528 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2530 manager.register(quic);
2531
2532 let peer = NodeId::new("unreachable-peer".to_string());
2533 let requirements = MessageRequirements::default();
2534
2535 let result = manager.connect(&peer, &requirements).await;
2536 assert!(result.is_err());
2537 }
2538
2539 #[tokio::test]
2544 async fn test_connect_with_fallback_disabled() {
2545 let config = TransportManagerConfig {
2546 enable_fallback: false,
2547 ..Default::default()
2548 };
2549 let mut manager = TransportManager::new(config);
2550
2551 let peer = NodeId::new("peer-1".to_string());
2552
2553 let quic =
2555 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2556 manager.register(quic);
2557
2558 let ble = Arc::new(
2560 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2561 );
2562 manager.register(ble);
2563
2564 let peer_unreachable = NodeId::new("nobody".to_string());
2571 let requirements = MessageRequirements::default();
2572
2573 let result = manager
2574 .connect_with_fallback(&peer_unreachable, &requirements)
2575 .await;
2576 assert!(result.is_err());
2577 }
2578
2579 #[tokio::test]
2580 async fn test_connect_with_fallback_no_candidates() {
2581 let config = TransportManagerConfig::default();
2582 let manager = TransportManager::new(config);
2583
2584 let peer = NodeId::new("peer-1".to_string());
2585 let requirements = MessageRequirements::default();
2586
2587 let result = manager.connect_with_fallback(&peer, &requirements).await;
2588 assert!(result.is_err());
2589 match result {
2590 Err(ref e) => {
2591 let err_msg = format!("{}", e);
2592 assert!(err_msg.contains("No suitable transport"));
2593 }
2594 Ok(_) => panic!("Expected error but got Ok"),
2595 }
2596 }
2597
2598 #[test]
2603 fn test_route_message_no_route() {
2604 let config = TransportManagerConfig::default();
2605 let manager = TransportManager::new(config);
2606
2607 let peer = NodeId::new("peer-1".to_string());
2608 let requirements = MessageRequirements::default();
2609
2610 let decision = manager.route_message(&peer, &requirements);
2612 assert_eq!(decision, RouteDecision::NoRoute);
2613 }
2614
2615 #[test]
2616 fn test_route_message_bypass_requested_no_channel() {
2617 let config = TransportManagerConfig::default();
2618 let manager = TransportManager::new(config);
2619
2620 let peer = NodeId::new("peer-1".to_string());
2621 let requirements = MessageRequirements {
2622 bypass_sync: true,
2623 ..Default::default()
2624 };
2625
2626 let decision = manager.route_message(&peer, &requirements);
2628 assert_eq!(decision, RouteDecision::NoRoute);
2629 }
2630
2631 #[test]
2636 fn test_route_decision_no_route() {
2637 let decision = RouteDecision::NoRoute;
2638 assert_eq!(decision, RouteDecision::NoRoute);
2639 assert_ne!(decision, RouteDecision::Bypass);
2640 assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
2641 }
2642
2643 #[test]
2644 fn test_route_decision_debug() {
2645 let bypass = RouteDecision::Bypass;
2646 let transport = RouteDecision::Transport(TransportType::LoRa);
2647 let no_route = RouteDecision::NoRoute;
2648
2649 assert!(format!("{:?}", bypass).contains("Bypass"));
2650 assert!(format!("{:?}", transport).contains("LoRa"));
2651 assert!(format!("{:?}", no_route).contains("NoRoute"));
2652 }
2653
2654 #[test]
2655 fn test_route_decision_clone() {
2656 let original = RouteDecision::Transport(TransportType::BluetoothLE);
2657 let cloned = original.clone();
2658 assert_eq!(original, cloned);
2659 }
2660
2661 #[test]
2666 fn test_transport_manager_debug() {
2667 let config = TransportManagerConfig::default();
2668 let mut manager = TransportManager::new(config);
2669
2670 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2671 manager.register(quic);
2672
2673 let debug_str = format!("{:?}", manager);
2674 assert!(debug_str.contains("TransportManager"));
2675 assert!(debug_str.contains("Quic"));
2676 }
2677
2678 #[test]
2679 fn test_registered_transports() {
2680 let config = TransportManagerConfig::default();
2681 let mut manager = TransportManager::new(config);
2682
2683 assert!(manager.registered_transports().is_empty());
2684
2685 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2686 let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
2687 manager.register(quic);
2688 manager.register(ble);
2689
2690 let registered = manager.registered_transports();
2691 assert_eq!(registered.len(), 2);
2692 assert!(registered.contains(&TransportType::Quic));
2693 assert!(registered.contains(&TransportType::BluetoothLE));
2694 }
2695
2696 #[tokio::test]
2697 async fn test_set_bypass_channel() {
2698 let config = TransportManagerConfig::default();
2699 let mut manager = TransportManager::new(config);
2700
2701 assert!(!manager.has_bypass_channel());
2702
2703 let bypass_config = BypassChannelConfig::new();
2704 let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
2705 manager.set_bypass_channel(bypass);
2706
2707 assert!(manager.has_bypass_channel());
2708 }
2709
2710 #[test]
2711 fn test_record_success_caching_disabled() {
2712 let config = TransportManagerConfig {
2713 cache_peer_transport: false,
2714 ..Default::default()
2715 };
2716 let manager = TransportManager::new(config);
2717
2718 let peer = NodeId::new("peer-1".to_string());
2719 manager.record_success(&peer, TransportType::Quic);
2720
2721 let cached = manager
2723 .peer_transports
2724 .read()
2725 .unwrap_or_else(|e| e.into_inner());
2726 assert!(cached.get(&peer).is_none());
2727 }
2728
2729 #[test]
2730 fn test_select_transport_cached_transport_invalid() {
2731 let config = TransportManagerConfig {
2732 cache_peer_transport: true,
2733 ..Default::default()
2734 };
2735 let mut manager = TransportManager::new(config);
2736
2737 let peer = NodeId::new("peer-1".to_string());
2738
2739 let ble = Arc::new(
2741 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2742 );
2743 manager.register(ble);
2744
2745 manager.record_success(&peer, TransportType::LoRa);
2747
2748 let requirements = MessageRequirements::default();
2749 let selected = manager.select_transport(&peer, &requirements);
2750
2751 assert_eq!(selected, Some(TransportType::BluetoothLE));
2753 }
2754
2755 #[test]
2756 fn test_select_transport_cached_transport_unavailable() {
2757 let config = TransportManagerConfig {
2758 cache_peer_transport: true,
2759 ..Default::default()
2760 };
2761 let mut manager = TransportManager::new(config);
2762
2763 let peer = NodeId::new("peer-1".to_string());
2764
2765 let quic =
2767 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2768 manager.register(quic);
2769
2770 let ble = Arc::new(
2772 MockTransport::new(TransportCapabilities::bluetooth_le())
2773 .with_peer(peer.clone())
2774 .unavailable(),
2775 );
2776 manager.register(ble);
2777
2778 manager.record_success(&peer, TransportType::BluetoothLE);
2780
2781 let requirements = MessageRequirements::default();
2782 let selected = manager.select_transport(&peer, &requirements);
2783
2784 assert_eq!(selected, Some(TransportType::Quic));
2786 }
2787
2788 #[test]
2789 fn test_pace_fallback_order() {
2790 let policy = TransportPolicy::new("test")
2792 .primary(vec!["dead-transport"])
2793 .alternate(vec!["lora-915"]);
2794
2795 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
2796 let manager = TransportManager::new(config);
2797
2798 let peer = NodeId::new("peer-1".to_string());
2799
2800 let inst = TransportInstance::new(
2802 "lora-915",
2803 TransportType::LoRa,
2804 TransportCapabilities::lora(7),
2805 );
2806 let t =
2807 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2808 manager.register_instance(inst, t);
2809
2810 let requirements = MessageRequirements::default();
2811 let selected = manager.select_transports_pace(&peer, &requirements);
2812
2813 assert_eq!(selected.len(), 1);
2815 assert_eq!(selected[0], "lora-915");
2816 }
2817
2818 #[test]
2819 fn test_get_peer_distance_none() {
2820 let config = TransportManagerConfig::default();
2821 let manager = TransportManager::new(config);
2822
2823 let peer = NodeId::new("unknown-peer".to_string());
2824 assert!(manager.get_peer_distance(&peer).is_none());
2825 }
2826
2827 #[tokio::test]
2828 async fn test_send_bypass_not_configured() {
2829 let config = TransportManagerConfig::default();
2830 let manager = TransportManager::new(config);
2831
2832 let result = manager.send_bypass("test_collection", b"hello", None).await;
2833 assert!(result.is_err());
2834 let err_msg = format!("{}", result.unwrap_err());
2835 assert!(err_msg.contains("Bypass channel not configured"));
2836 }
2837
2838 #[tokio::test]
2839 async fn test_send_bypass_to_not_configured() {
2840 let config = TransportManagerConfig::default();
2841 let manager = TransportManager::new(config);
2842
2843 let target = BypassTarget::Broadcast { port: 5150 };
2844 let result = manager
2845 .send_bypass_to(target, "test_collection", b"hello")
2846 .await;
2847 assert!(result.is_err());
2848 let err_msg = format!("{}", result.unwrap_err());
2849 assert!(err_msg.contains("Bypass channel not configured"));
2850 }
2851
2852 #[tokio::test]
2853 async fn test_subscribe_bypass_collection_not_configured() {
2854 let config = TransportManagerConfig::default();
2855 let manager = TransportManager::new(config);
2856
2857 let result = manager.subscribe_bypass_collection("test").await;
2858 assert!(result.is_err());
2859 }
2860
2861 #[test]
2866 fn test_route_table_empty_returns_none() {
2867 let table = CollectionRouteTable::new();
2868 assert!(table.get("anything").is_none());
2869 assert!(!table.has_collection("anything"));
2870 assert!(!table.is_bypass("anything"));
2871 }
2872
2873 #[test]
2874 fn test_route_table_builder_and_lookup() {
2875 let table = CollectionRouteTable::new()
2876 .with_collection(CollectionRouteConfig {
2877 collection: "telemetry".to_string(),
2878 route: CollectionTransportRoute::Fixed {
2879 transport_type: TransportType::BluetoothLE,
2880 },
2881 priority: MessagePriority::Normal,
2882 })
2883 .with_collection(CollectionRouteConfig {
2884 collection: "position".to_string(),
2885 route: CollectionTransportRoute::Bypass {
2886 encoding: MessageEncoding::Raw,
2887 ttl_ms: 200,
2888 bypass_transport: BypassTransport::Broadcast,
2889 },
2890 priority: MessagePriority::High,
2891 });
2892
2893 assert!(table.has_collection("telemetry"));
2894 assert!(table.has_collection("position"));
2895 assert!(!table.has_collection("unknown"));
2896
2897 let telemetry = table.get("telemetry").unwrap();
2898 assert!(matches!(
2899 telemetry.route,
2900 CollectionTransportRoute::Fixed {
2901 transport_type: TransportType::BluetoothLE
2902 }
2903 ));
2904 assert_eq!(telemetry.priority, MessagePriority::Normal);
2905
2906 let position = table.get("position").unwrap();
2907 assert_eq!(position.priority, MessagePriority::High);
2908 }
2909
2910 #[test]
2911 fn test_route_table_is_bypass() {
2912 let table = CollectionRouteTable::new()
2913 .with_collection(CollectionRouteConfig {
2914 collection: "bypass_col".to_string(),
2915 route: CollectionTransportRoute::Bypass {
2916 encoding: MessageEncoding::Raw,
2917 ttl_ms: 100,
2918 bypass_transport: BypassTransport::Unicast,
2919 },
2920 priority: MessagePriority::Normal,
2921 })
2922 .with_collection(CollectionRouteConfig {
2923 collection: "fixed_col".to_string(),
2924 route: CollectionTransportRoute::Fixed {
2925 transport_type: TransportType::Quic,
2926 },
2927 priority: MessagePriority::Normal,
2928 })
2929 .with_collection(CollectionRouteConfig {
2930 collection: "pace_col".to_string(),
2931 route: CollectionTransportRoute::Pace {
2932 policy_override: None,
2933 },
2934 priority: MessagePriority::Normal,
2935 });
2936
2937 assert!(table.is_bypass("bypass_col"));
2938 assert!(!table.is_bypass("fixed_col"));
2939 assert!(!table.is_bypass("pace_col"));
2940 assert!(!table.is_bypass("nonexistent"));
2941 }
2942
2943 #[test]
2948 fn test_serde_fixed_route() {
2949 let route = CollectionTransportRoute::Fixed {
2950 transport_type: TransportType::BluetoothLE,
2951 };
2952 let json = serde_json::to_string(&route).unwrap();
2953 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2954 assert!(matches!(
2955 roundtrip,
2956 CollectionTransportRoute::Fixed {
2957 transport_type: TransportType::BluetoothLE
2958 }
2959 ));
2960 }
2961
2962 #[test]
2963 fn test_serde_bypass_route() {
2964 let route = CollectionTransportRoute::Bypass {
2965 encoding: MessageEncoding::Raw,
2966 ttl_ms: 500,
2967 bypass_transport: BypassTransport::Broadcast,
2968 };
2969 let json = serde_json::to_string(&route).unwrap();
2970 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2971 if let CollectionTransportRoute::Bypass {
2972 encoding,
2973 ttl_ms,
2974 bypass_transport,
2975 } = roundtrip
2976 {
2977 assert_eq!(encoding, MessageEncoding::Raw);
2978 assert_eq!(ttl_ms, 500);
2979 assert_eq!(bypass_transport, BypassTransport::Broadcast);
2980 } else {
2981 panic!("Expected Bypass variant");
2982 }
2983 }
2984
2985 #[test]
2986 fn test_serde_pace_route() {
2987 let route = CollectionTransportRoute::Pace {
2988 policy_override: None,
2989 };
2990 let json = serde_json::to_string(&route).unwrap();
2991 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2992 assert!(matches!(
2993 roundtrip,
2994 CollectionTransportRoute::Pace {
2995 policy_override: None
2996 }
2997 ));
2998 }
2999
3000 #[test]
3001 fn test_serde_pace_route_with_policy() {
3002 let policy = TransportPolicy::new("custom")
3003 .primary(vec!["ble-hci0"])
3004 .alternate(vec!["iroh-wlan0"]);
3005 let route = CollectionTransportRoute::Pace {
3006 policy_override: Some(policy),
3007 };
3008 let json = serde_json::to_string(&route).unwrap();
3009 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
3010 if let CollectionTransportRoute::Pace {
3011 policy_override: Some(p),
3012 } = roundtrip
3013 {
3014 assert_eq!(p.name, "custom");
3015 assert_eq!(p.primary, vec!["ble-hci0"]);
3016 assert_eq!(p.alternate, vec!["iroh-wlan0"]);
3017 } else {
3018 panic!("Expected Pace with policy_override");
3019 }
3020 }
3021
3022 #[test]
3023 fn test_serde_collection_route_config() {
3024 let config = CollectionRouteConfig {
3025 collection: "sensors".to_string(),
3026 route: CollectionTransportRoute::Fixed {
3027 transport_type: TransportType::LoRa,
3028 },
3029 priority: MessagePriority::High,
3030 };
3031 let json = serde_json::to_string(&config).unwrap();
3032 let roundtrip: CollectionRouteConfig = serde_json::from_str(&json).unwrap();
3033 assert_eq!(roundtrip.collection, "sensors");
3034 assert_eq!(roundtrip.priority, MessagePriority::High);
3035 }
3036
3037 #[test]
3038 fn test_serde_collection_route_table() {
3039 let table = CollectionRouteTable::new()
3040 .with_collection(CollectionRouteConfig {
3041 collection: "a".to_string(),
3042 route: CollectionTransportRoute::Fixed {
3043 transport_type: TransportType::Quic,
3044 },
3045 priority: MessagePriority::Normal,
3046 })
3047 .with_collection(CollectionRouteConfig {
3048 collection: "b".to_string(),
3049 route: CollectionTransportRoute::Bypass {
3050 encoding: MessageEncoding::Json,
3051 ttl_ms: 1000,
3052 bypass_transport: BypassTransport::Unicast,
3053 },
3054 priority: MessagePriority::Critical,
3055 });
3056
3057 let json = serde_json::to_string(&table).unwrap();
3058 let roundtrip: CollectionRouteTable = serde_json::from_str(&json).unwrap();
3059 assert!(roundtrip.has_collection("a"));
3060 assert!(roundtrip.has_collection("b"));
3061 assert!(roundtrip.is_bypass("b"));
3062 assert!(!roundtrip.is_bypass("a"));
3063 }
3064
3065 #[test]
3066 fn test_serde_transport_type() {
3067 let types = vec![
3069 TransportType::Quic,
3070 TransportType::BluetoothClassic,
3071 TransportType::BluetoothLE,
3072 TransportType::WifiDirect,
3073 TransportType::LoRa,
3074 TransportType::TacticalRadio,
3075 TransportType::Satellite,
3076 TransportType::Custom(42),
3077 ];
3078 for tt in types {
3079 let json = serde_json::to_string(&tt).unwrap();
3080 let roundtrip: TransportType = serde_json::from_str(&json).unwrap();
3081 assert_eq!(roundtrip, tt, "Failed round-trip for {:?}", tt);
3082 }
3083 }
3084
3085 #[test]
3090 fn test_route_collection_unknown_falls_through() {
3091 let config = TransportManagerConfig::default();
3092 let mut manager = TransportManager::new(config);
3093
3094 let peer = NodeId::new("peer-1".to_string());
3095 let quic =
3096 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3097 manager.register(quic);
3098
3099 let requirements = MessageRequirements::default();
3100 let decision = manager.route_collection("unknown", &peer, &requirements);
3102 assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
3103 }
3104
3105 #[test]
3106 fn test_route_collection_fixed_routes_correctly() {
3107 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3108 collection: "telemetry".to_string(),
3109 route: CollectionTransportRoute::Fixed {
3110 transport_type: TransportType::BluetoothLE,
3111 },
3112 priority: MessagePriority::Normal,
3113 });
3114
3115 let config = TransportManagerConfig {
3116 collection_routes: table,
3117 ..Default::default()
3118 };
3119 let mut manager = TransportManager::new(config);
3120
3121 let peer = NodeId::new("peer-1".to_string());
3122 let ble = Arc::new(
3123 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3124 );
3125 manager.register(ble);
3126
3127 let requirements = MessageRequirements::default();
3128 let decision = manager.route_collection("telemetry", &peer, &requirements);
3129 assert_eq!(
3130 decision,
3131 RouteDecision::Transport(TransportType::BluetoothLE)
3132 );
3133 }
3134
3135 #[test]
3136 fn test_route_collection_fixed_unavailable_returns_no_route() {
3137 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3138 collection: "telemetry".to_string(),
3139 route: CollectionTransportRoute::Fixed {
3140 transport_type: TransportType::BluetoothLE,
3141 },
3142 priority: MessagePriority::Normal,
3143 });
3144
3145 let config = TransportManagerConfig {
3146 collection_routes: table,
3147 ..Default::default()
3148 };
3149 let mut manager = TransportManager::new(config);
3150
3151 let peer = NodeId::new("peer-1".to_string());
3152
3153 let ble = Arc::new(
3155 MockTransport::new(TransportCapabilities::bluetooth_le())
3156 .with_peer(peer.clone())
3157 .unavailable(),
3158 );
3159 manager.register(ble);
3160
3161 let requirements = MessageRequirements::default();
3162 let decision = manager.route_collection("telemetry", &peer, &requirements);
3163 assert_eq!(decision, RouteDecision::NoRoute);
3164 }
3165
3166 #[test]
3167 fn test_route_collection_fixed_not_registered_returns_no_route() {
3168 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3169 collection: "telemetry".to_string(),
3170 route: CollectionTransportRoute::Fixed {
3171 transport_type: TransportType::BluetoothLE,
3172 },
3173 priority: MessagePriority::Normal,
3174 });
3175
3176 let config = TransportManagerConfig {
3177 collection_routes: table,
3178 ..Default::default()
3179 };
3180 let manager = TransportManager::new(config);
3181
3182 let peer = NodeId::new("peer-1".to_string());
3183 let requirements = MessageRequirements::default();
3185 let decision = manager.route_collection("telemetry", &peer, &requirements);
3186 assert_eq!(decision, RouteDecision::NoRoute);
3187 }
3188
3189 #[tokio::test]
3190 async fn test_route_collection_bypass_with_channel() {
3191 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3192 collection: "position".to_string(),
3193 route: CollectionTransportRoute::Bypass {
3194 encoding: MessageEncoding::Raw,
3195 ttl_ms: 200,
3196 bypass_transport: BypassTransport::Broadcast,
3197 },
3198 priority: MessagePriority::High,
3199 });
3200
3201 let config = TransportManagerConfig {
3202 collection_routes: table,
3203 ..Default::default()
3204 };
3205 let mut manager = TransportManager::new(config);
3206
3207 let bypass_config = BypassChannelConfig::new();
3209 let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
3210 manager.set_bypass_channel(bypass);
3211
3212 let peer = NodeId::new("peer-1".to_string());
3213 let requirements = MessageRequirements::default();
3214 let decision = manager.route_collection("position", &peer, &requirements);
3215 assert_eq!(decision, RouteDecision::Bypass);
3216 }
3217
3218 #[test]
3219 fn test_route_collection_bypass_without_channel() {
3220 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3221 collection: "position".to_string(),
3222 route: CollectionTransportRoute::Bypass {
3223 encoding: MessageEncoding::Raw,
3224 ttl_ms: 200,
3225 bypass_transport: BypassTransport::Broadcast,
3226 },
3227 priority: MessagePriority::High,
3228 });
3229
3230 let config = TransportManagerConfig {
3231 collection_routes: table,
3232 ..Default::default()
3233 };
3234 let manager = TransportManager::new(config);
3235
3236 let peer = NodeId::new("peer-1".to_string());
3237 let requirements = MessageRequirements::default();
3238 let decision = manager.route_collection("position", &peer, &requirements);
3240 assert_eq!(decision, RouteDecision::NoRoute);
3241 }
3242
3243 #[test]
3244 fn test_route_collection_pace_routes_correctly() {
3245 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3246
3247 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3248 collection: "sync_data".to_string(),
3249 route: CollectionTransportRoute::Pace {
3250 policy_override: None,
3251 },
3252 priority: MessagePriority::Normal,
3253 });
3254
3255 let config = TransportManagerConfig {
3256 collection_routes: table,
3257 default_policy: Some(policy),
3258 ..Default::default()
3259 };
3260 let manager = TransportManager::new(config);
3261
3262 let peer = NodeId::new("peer-1".to_string());
3263
3264 let inst = TransportInstance::new(
3266 "iroh-eth0",
3267 TransportType::Quic,
3268 TransportCapabilities::quic(),
3269 );
3270 let t = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3271 manager.register_instance(inst, t);
3272
3273 let requirements = MessageRequirements::default();
3274 let decision = manager.route_collection("sync_data", &peer, &requirements);
3275 assert_eq!(
3276 decision,
3277 RouteDecision::TransportInstance("iroh-eth0".to_string())
3278 );
3279 }
3280
3281 #[test]
3282 fn test_route_collection_pace_no_available_returns_no_route() {
3283 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3284
3285 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3286 collection: "sync_data".to_string(),
3287 route: CollectionTransportRoute::Pace {
3288 policy_override: None,
3289 },
3290 priority: MessagePriority::Normal,
3291 });
3292
3293 let config = TransportManagerConfig {
3294 collection_routes: table,
3295 default_policy: Some(policy),
3296 ..Default::default()
3297 };
3298 let manager = TransportManager::new(config);
3299
3300 let peer = NodeId::new("peer-1".to_string());
3301 let requirements = MessageRequirements::default();
3303 let decision = manager.route_collection("sync_data", &peer, &requirements);
3304 assert_eq!(decision, RouteDecision::NoRoute);
3305 }
3306
3307 #[test]
3308 fn test_route_collection_pace_with_policy_override() {
3309 let default_policy = TransportPolicy::new("default").primary(vec!["nonexistent"]);
3311
3312 let override_policy = TransportPolicy::new("override").primary(vec!["ble-hci0"]);
3314
3315 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3316 collection: "ble_data".to_string(),
3317 route: CollectionTransportRoute::Pace {
3318 policy_override: Some(override_policy),
3319 },
3320 priority: MessagePriority::Normal,
3321 });
3322
3323 let config = TransportManagerConfig {
3324 collection_routes: table,
3325 default_policy: Some(default_policy),
3326 ..Default::default()
3327 };
3328 let manager = TransportManager::new(config);
3329
3330 let peer = NodeId::new("peer-1".to_string());
3331 let inst = TransportInstance::new(
3332 "ble-hci0",
3333 TransportType::BluetoothLE,
3334 TransportCapabilities::bluetooth_le(),
3335 );
3336 let t = Arc::new(
3337 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3338 );
3339 manager.register_instance(inst, t);
3340
3341 let requirements = MessageRequirements::default();
3342 let decision = manager.route_collection("ble_data", &peer, &requirements);
3343 assert_eq!(
3344 decision,
3345 RouteDecision::TransportInstance("ble-hci0".to_string())
3346 );
3347 }
3348
3349 #[test]
3354 fn test_route_decision_transport_instance_variant() {
3355 let decision = RouteDecision::TransportInstance("iroh-eth0".to_string());
3356 assert_eq!(
3357 decision,
3358 RouteDecision::TransportInstance("iroh-eth0".to_string())
3359 );
3360 assert_ne!(decision, RouteDecision::Bypass);
3361 assert_ne!(decision, RouteDecision::NoRoute);
3362 assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
3363 }
3364
3365 #[test]
3366 fn test_route_decision_transport_instance_debug() {
3367 let decision = RouteDecision::TransportInstance("ble-hci0".to_string());
3368 let debug = format!("{:?}", decision);
3369 assert!(debug.contains("TransportInstance"));
3370 assert!(debug.contains("ble-hci0"));
3371 }
3372
3373 #[test]
3374 fn test_route_decision_transport_instance_clone() {
3375 let original = RouteDecision::TransportInstance("iroh-wlan0".to_string());
3376 let cloned = original.clone();
3377 assert_eq!(original, cloned);
3378 }
3379
3380 #[test]
3385 fn test_get_collection_route_found() {
3386 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3387 collection: "telemetry".to_string(),
3388 route: CollectionTransportRoute::Fixed {
3389 transport_type: TransportType::Quic,
3390 },
3391 priority: MessagePriority::High,
3392 });
3393
3394 let config = TransportManagerConfig {
3395 collection_routes: table,
3396 ..Default::default()
3397 };
3398 let manager = TransportManager::new(config);
3399
3400 let route = manager.get_collection_route("telemetry");
3401 assert!(route.is_some());
3402 assert_eq!(route.unwrap().collection, "telemetry");
3403 assert_eq!(route.unwrap().priority, MessagePriority::High);
3404 }
3405
3406 #[test]
3407 fn test_get_collection_route_not_found() {
3408 let config = TransportManagerConfig::default();
3409 let manager = TransportManager::new(config);
3410
3411 assert!(manager.get_collection_route("nonexistent").is_none());
3412 }
3413}