1use std::collections::HashMap;
60use std::net::SocketAddr;
61use std::sync::{Arc, RwLock};
62
63use super::bypass::{
64 BypassMessage, BypassTarget, BypassTransport, MessageEncoding, UdpBypassChannel,
65};
66use super::capabilities::{
67 MessagePriority, MessageRequirements, PaceLevel, PeerDistance, RangeMode, Transport,
68 TransportId, TransportInstance, TransportMode, TransportPolicy, TransportType,
69};
70use super::{NodeId, Result, TransportError};
71use serde::{Deserialize, Serialize};
72use std::collections::HashSet;
73use tokio::sync::broadcast;
74use tokio::sync::RwLock as TokioRwLock;
75
76type TransportInstanceMap = HashMap<TransportId, (TransportInstance, Arc<dyn Transport>)>;
78
79#[derive(Debug, Clone)]
85pub struct TransportManagerConfig {
86 pub preference_order: Vec<TransportType>,
89
90 pub enable_fallback: bool,
92
93 pub cache_peer_transport: bool,
95
96 pub switch_threshold: i32,
98
99 pub default_policy: Option<TransportPolicy>,
102
103 pub transport_mode: TransportMode,
105
106 pub collection_routes: CollectionRouteTable,
112}
113
114impl Default for TransportManagerConfig {
115 fn default() -> Self {
116 Self {
117 preference_order: vec![
118 TransportType::Quic,
119 TransportType::WifiDirect,
120 TransportType::BluetoothLE,
121 TransportType::LoRa,
122 ],
123 enable_fallback: true,
124 cache_peer_transport: true,
125 switch_threshold: 10,
126 default_policy: None,
127 transport_mode: TransportMode::Single,
128 collection_routes: CollectionRouteTable::default(),
129 }
130 }
131}
132
133impl TransportManagerConfig {
134 pub fn with_policy(policy: TransportPolicy) -> Self {
136 Self {
137 default_policy: Some(policy),
138 ..Default::default()
139 }
140 }
141
142 pub fn with_mode(mut self, mode: TransportMode) -> Self {
144 self.transport_mode = mode;
145 self
146 }
147}
148
149pub struct TransportManager {
185 transports: HashMap<TransportType, Arc<dyn Transport>>,
187
188 transport_instances: RwLock<TransportInstanceMap>,
190
191 peer_transports: RwLock<HashMap<NodeId, TransportType>>,
193
194 peer_transport_ids: RwLock<HashMap<NodeId, TransportId>>,
196
197 peer_distances: RwLock<HashMap<NodeId, PeerDistance>>,
199
200 config: TransportManagerConfig,
202
203 bypass_channel: Option<Arc<TokioRwLock<UdpBypassChannel>>>,
208}
209
210impl TransportManager {
211 pub fn new(config: TransportManagerConfig) -> Self {
213 Self {
214 transports: HashMap::new(),
215 transport_instances: RwLock::new(HashMap::new()),
216 peer_transports: RwLock::new(HashMap::new()),
217 peer_transport_ids: RwLock::new(HashMap::new()),
218 peer_distances: RwLock::new(HashMap::new()),
219 config,
220 bypass_channel: None,
221 }
222 }
223
224 pub fn with_bypass(config: TransportManagerConfig, bypass: UdpBypassChannel) -> Self {
226 Self {
227 transports: HashMap::new(),
228 transport_instances: RwLock::new(HashMap::new()),
229 peer_transports: RwLock::new(HashMap::new()),
230 peer_transport_ids: RwLock::new(HashMap::new()),
231 peer_distances: RwLock::new(HashMap::new()),
232 config,
233 bypass_channel: Some(Arc::new(TokioRwLock::new(bypass))),
234 }
235 }
236
237 pub fn set_bypass_channel(&mut self, bypass: UdpBypassChannel) {
239 self.bypass_channel = Some(Arc::new(TokioRwLock::new(bypass)));
240 }
241
242 pub fn has_bypass_channel(&self) -> bool {
244 self.bypass_channel.is_some()
245 }
246
247 pub async fn is_bypass_collection(&self, collection: &str) -> bool {
249 if let Some(ref bypass) = self.bypass_channel {
250 bypass.read().await.is_bypass_collection(collection)
251 } else {
252 false
253 }
254 }
255
256 pub fn register(&mut self, transport: Arc<dyn Transport>) {
260 let transport_type = transport.capabilities().transport_type;
261 self.transports.insert(transport_type, transport);
262 }
263
264 pub fn unregister(&mut self, transport_type: TransportType) -> Option<Arc<dyn Transport>> {
268 self.transports.remove(&transport_type)
269 }
270
271 pub fn get_transport(&self, transport_type: TransportType) -> Option<&Arc<dyn Transport>> {
273 self.transports.get(&transport_type)
274 }
275
276 pub fn registered_transports(&self) -> Vec<TransportType> {
278 self.transports.keys().copied().collect()
279 }
280
281 pub fn available_transports(&self, peer_id: &NodeId) -> Vec<TransportType> {
283 self.transports
284 .iter()
285 .filter(|(_, t)| t.is_available() && t.can_reach(peer_id))
286 .map(|(tt, _)| *tt)
287 .collect()
288 }
289
290 pub fn register_instance(&self, instance: TransportInstance, transport: Arc<dyn Transport>) {
311 let id = instance.id.clone();
312 self.transport_instances
313 .write()
314 .unwrap()
315 .insert(id, (instance, transport));
316 }
317
318 pub fn unregister_instance(
320 &self,
321 id: &TransportId,
322 ) -> Option<(TransportInstance, Arc<dyn Transport>)> {
323 self.transport_instances.write().unwrap().remove(id)
324 }
325
326 pub fn get_instance(&self, id: &TransportId) -> Option<Arc<dyn Transport>> {
328 self.transport_instances
329 .read()
330 .unwrap()
331 .get(id)
332 .map(|(_, t)| Arc::clone(t))
333 }
334
335 pub fn registered_instance_ids(&self) -> Vec<TransportId> {
337 self.transport_instances
338 .read()
339 .unwrap()
340 .keys()
341 .cloned()
342 .collect()
343 }
344
345 pub fn available_instance_ids(&self) -> HashSet<TransportId> {
347 self.transport_instances
348 .read()
349 .unwrap()
350 .iter()
351 .filter(|(_, (inst, transport))| inst.available && transport.is_available())
352 .map(|(id, _)| id.clone())
353 .collect()
354 }
355
356 pub fn available_instances_for_peer(&self, peer_id: &NodeId) -> Vec<TransportId> {
358 self.transport_instances
359 .read()
360 .unwrap()
361 .iter()
362 .filter(|(_, (inst, transport))| {
363 inst.available && transport.is_available() && transport.can_reach(peer_id)
364 })
365 .map(|(id, _)| id.clone())
366 .collect()
367 }
368
369 pub fn current_pace_level(&self) -> PaceLevel {
373 match &self.config.default_policy {
374 Some(policy) => policy.current_level(&self.available_instance_ids()),
375 None => {
376 if !self.available_instance_ids().is_empty() {
378 PaceLevel::Primary
379 } else {
380 PaceLevel::None
381 }
382 }
383 }
384 }
385
386 pub fn select_transports_pace(
403 &self,
404 peer_id: &NodeId,
405 requirements: &MessageRequirements,
406 ) -> Vec<TransportId> {
407 let policy = match &self.config.default_policy {
408 Some(p) => p,
409 None => return Vec::new(), };
411
412 let instances = self.transport_instances.read().unwrap();
413 let available_for_peer: HashSet<_> = instances
414 .iter()
415 .filter(|(_, (inst, transport))| {
416 inst.available
417 && transport.is_available()
418 && transport.can_reach(peer_id)
419 && transport.capabilities().meets_requirements(requirements)
420 })
421 .map(|(id, _)| id.clone())
422 .collect();
423
424 let candidates: Vec<TransportId> = policy
426 .ordered()
427 .filter(|id| available_for_peer.contains(*id))
428 .cloned()
429 .collect();
430
431 match &self.config.transport_mode {
433 TransportMode::Single => candidates.into_iter().take(1).collect(),
434 TransportMode::Redundant {
435 min_paths,
436 max_paths,
437 } => {
438 let min = *min_paths as usize;
439 let max = max_paths.map(|m| m as usize).unwrap_or(candidates.len());
440 candidates.into_iter().take(max.max(min)).collect()
441 }
442 TransportMode::Bonded => candidates, TransportMode::LoadBalanced { .. } => candidates, }
445 }
446
447 pub fn select_transport_pace(
451 &self,
452 peer_id: &NodeId,
453 requirements: &MessageRequirements,
454 ) -> Option<TransportId> {
455 self.select_transports_pace(peer_id, requirements)
456 .into_iter()
457 .next()
458 }
459
460 pub fn record_success_pace(&self, peer_id: &NodeId, transport_id: TransportId) {
462 if self.config.cache_peer_transport {
463 self.peer_transport_ids
464 .write()
465 .unwrap()
466 .insert(peer_id.clone(), transport_id);
467 }
468 }
469
470 pub fn clear_cache_pace(&self, peer_id: &NodeId) {
472 self.peer_transport_ids.write().unwrap().remove(peer_id);
473 }
474
475 pub fn select_transport(
491 &self,
492 peer_id: &NodeId,
493 requirements: &MessageRequirements,
494 ) -> Option<TransportType> {
495 if self.config.cache_peer_transport {
497 if let Some(&cached) = self.peer_transports.read().unwrap().get(peer_id) {
498 if let Some(transport) = self.transports.get(&cached) {
500 if transport.is_available()
501 && transport.can_reach(peer_id)
502 && transport.capabilities().meets_requirements(requirements)
503 {
504 return Some(cached);
505 }
506 }
507 }
508 }
509
510 let candidates: Vec<_> = self
512 .available_transports(peer_id)
513 .into_iter()
514 .filter_map(|tt| {
515 let transport = self.transports.get(&tt)?;
516 let caps = transport.capabilities();
517
518 if !caps.meets_requirements(requirements) {
520 return None;
521 }
522
523 if let Some(max_latency) = requirements.max_latency_ms {
525 let est_delivery = transport.estimate_delivery_ms(requirements.message_size);
526 if est_delivery > max_latency {
527 return None;
528 }
529 }
530
531 let preference_bonus = self
533 .config
534 .preference_order
535 .iter()
536 .position(|&t| t == tt)
537 .map(|idx| 20 - (idx as i32 * 5))
538 .unwrap_or(0);
539
540 let score = transport.calculate_score(requirements, preference_bonus);
541 Some((tt, score))
542 })
543 .collect();
544
545 candidates
547 .into_iter()
548 .max_by_key(|(_, score)| *score)
549 .map(|(tt, _)| tt)
550 }
551
552 pub fn select_transport_for_distance(
557 &self,
558 peer_id: &NodeId,
559 requirements: &MessageRequirements,
560 ) -> Option<(TransportType, Option<RangeMode>)> {
561 let transport_type = self.select_transport(peer_id, requirements)?;
562
563 let distance = self
565 .peer_distances
566 .read()
567 .unwrap()
568 .get(peer_id)
569 .map(|d| d.distance_meters);
570
571 let range_mode = if let Some(_dist) = distance {
573 None } else {
577 None
578 };
579
580 Some((transport_type, range_mode))
581 }
582
583 pub fn record_success(&self, peer_id: &NodeId, transport_type: TransportType) {
587 if self.config.cache_peer_transport {
588 self.peer_transports
589 .write()
590 .unwrap()
591 .insert(peer_id.clone(), transport_type);
592 }
593 }
594
595 pub fn clear_cache(&self, peer_id: &NodeId) {
599 self.peer_transports.write().unwrap().remove(peer_id);
600 }
601
602 pub fn update_peer_distance(&self, distance: PeerDistance) {
604 self.peer_distances
605 .write()
606 .unwrap()
607 .insert(distance.peer_id.clone(), distance);
608 }
609
610 pub fn get_peer_distance(&self, peer_id: &NodeId) -> Option<PeerDistance> {
612 self.peer_distances.read().unwrap().get(peer_id).cloned()
613 }
614
615 pub async fn connect(
619 &self,
620 peer_id: &NodeId,
621 requirements: &MessageRequirements,
622 ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
623 let transport_type = self
624 .select_transport(peer_id, requirements)
625 .ok_or_else(|| {
626 TransportError::PeerNotFound(format!("No suitable transport for {}", peer_id))
627 })?;
628
629 let transport = self
630 .transports
631 .get(&transport_type)
632 .ok_or(TransportError::NotStarted)?;
633
634 let connection = transport.connect(peer_id).await?;
635
636 self.record_success(peer_id, transport_type);
638
639 Ok((transport_type, connection))
640 }
641
642 pub async fn connect_with_fallback(
646 &self,
647 peer_id: &NodeId,
648 requirements: &MessageRequirements,
649 ) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
650 let candidates: Vec<_> = self
652 .available_transports(peer_id)
653 .into_iter()
654 .filter_map(|tt| {
655 let transport = self.transports.get(&tt)?;
656 if !transport.capabilities().meets_requirements(requirements) {
657 return None;
658 }
659 let preference_bonus = self
660 .config
661 .preference_order
662 .iter()
663 .position(|&t| t == tt)
664 .map(|idx| 20 - (idx as i32 * 5))
665 .unwrap_or(0);
666 let score = transport.calculate_score(requirements, preference_bonus);
667 Some((tt, score))
668 })
669 .collect();
670
671 let mut sorted: Vec<_> = candidates;
672 sorted.sort_by(|a, b| b.1.cmp(&a.1)); if sorted.is_empty() {
675 return Err(TransportError::PeerNotFound(format!(
676 "No suitable transport for {}",
677 peer_id
678 )));
679 }
680
681 let mut last_error = None;
682
683 for (transport_type, _) in sorted {
684 let transport = match self.transports.get(&transport_type) {
685 Some(t) => t,
686 None => continue,
687 };
688
689 match transport.connect(peer_id).await {
690 Ok(conn) => {
691 self.record_success(peer_id, transport_type);
692 return Ok((transport_type, conn));
693 }
694 Err(e) => {
695 if !self.config.enable_fallback {
696 return Err(e);
697 }
698 last_error = Some(e);
699 self.clear_cache(peer_id);
700 }
701 }
702 }
703
704 Err(last_error.unwrap_or_else(|| {
705 TransportError::PeerNotFound(format!("All transports failed for {}", peer_id))
706 }))
707 }
708
709 pub async fn send_bypass(
740 &self,
741 collection: &str,
742 data: &[u8],
743 target: Option<SocketAddr>,
744 ) -> Result<()> {
745 let bypass = self
746 .bypass_channel
747 .as_ref()
748 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
749
750 bypass
751 .read()
752 .await
753 .send_to_collection(collection, target, data)
754 .await
755 .map_err(|e| TransportError::Other(e.to_string().into()))
756 }
757
758 pub async fn send_bypass_to(
768 &self,
769 target: BypassTarget,
770 collection: &str,
771 data: &[u8],
772 ) -> Result<()> {
773 let bypass = self
774 .bypass_channel
775 .as_ref()
776 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
777
778 bypass
779 .read()
780 .await
781 .send(target, collection, data)
782 .await
783 .map_err(|e| TransportError::Other(e.to_string().into()))
784 }
785
786 pub async fn subscribe_bypass(&self) -> Result<broadcast::Receiver<BypassMessage>> {
805 let bypass = self
806 .bypass_channel
807 .as_ref()
808 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
809
810 Ok(bypass.read().await.subscribe())
811 }
812
813 pub async fn subscribe_bypass_collection(
827 &self,
828 collection: &str,
829 ) -> Result<(u32, broadcast::Receiver<BypassMessage>)> {
830 let bypass = self
831 .bypass_channel
832 .as_ref()
833 .ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
834
835 Ok(bypass.read().await.subscribe_collection(collection))
836 }
837
838 pub fn route_collection(
858 &self,
859 collection: &str,
860 peer_id: &NodeId,
861 requirements: &MessageRequirements,
862 ) -> RouteDecision {
863 let route_config = match self.config.collection_routes.get(collection) {
864 Some(config) => config,
865 None => return self.route_message(peer_id, requirements),
866 };
867
868 match &route_config.route {
869 CollectionTransportRoute::Bypass { .. } => {
870 if self.bypass_channel.is_some() {
871 RouteDecision::Bypass
872 } else {
873 RouteDecision::NoRoute
874 }
875 }
876 CollectionTransportRoute::Fixed { transport_type } => {
877 if let Some(transport) = self.transports.get(transport_type) {
879 if transport.is_available() && transport.can_reach(peer_id) {
880 RouteDecision::Transport(*transport_type)
881 } else {
882 RouteDecision::NoRoute
883 }
884 } else {
885 RouteDecision::NoRoute
886 }
887 }
888 CollectionTransportRoute::Pace { policy_override } => {
889 match self.select_transport_pace_with_policy(
890 peer_id,
891 requirements,
892 policy_override.as_ref(),
893 ) {
894 Some(id) => RouteDecision::TransportInstance(id),
895 None => RouteDecision::NoRoute,
896 }
897 }
898 }
899 }
900
901 fn select_transport_pace_with_policy(
906 &self,
907 peer_id: &NodeId,
908 requirements: &MessageRequirements,
909 policy_override: Option<&TransportPolicy>,
910 ) -> Option<TransportId> {
911 let policy = policy_override.or(self.config.default_policy.as_ref())?;
912
913 let instances = self.transport_instances.read().unwrap();
914 let available_for_peer: HashSet<_> = instances
915 .iter()
916 .filter(|(_, (inst, transport))| {
917 inst.available
918 && transport.is_available()
919 && transport.can_reach(peer_id)
920 && transport.capabilities().meets_requirements(requirements)
921 })
922 .map(|(id, _)| id.clone())
923 .collect();
924
925 policy
926 .ordered()
927 .find(|id| available_for_peer.contains(*id))
928 .cloned()
929 }
930
931 pub fn get_collection_route(&self, collection: &str) -> Option<&CollectionRouteConfig> {
933 self.config.collection_routes.get(collection)
934 }
935
936 pub fn route_message(
950 &self,
951 peer_id: &NodeId,
952 requirements: &MessageRequirements,
953 ) -> RouteDecision {
954 if requirements.bypass_sync && self.bypass_channel.is_some() {
956 return RouteDecision::Bypass;
957 }
958 match self.select_transport(peer_id, requirements) {
962 Some(transport_type) => RouteDecision::Transport(transport_type),
963 None => RouteDecision::NoRoute,
964 }
965 }
966}
967
968#[derive(Debug, Clone, PartialEq, Eq)]
970pub enum RouteDecision {
971 Bypass,
973 Transport(TransportType),
975 TransportInstance(TransportId),
977 NoRoute,
979}
980
981#[derive(Debug, Clone, Serialize, Deserialize)]
1012#[serde(tag = "transport", rename_all = "snake_case")]
1013pub enum CollectionTransportRoute {
1014 Fixed { transport_type: TransportType },
1016 Bypass {
1018 encoding: MessageEncoding,
1019 ttl_ms: u64,
1020 bypass_transport: BypassTransport,
1021 },
1022 Pace {
1024 policy_override: Option<TransportPolicy>,
1025 },
1026}
1027
1028#[derive(Debug, Clone, Serialize, Deserialize)]
1032pub struct CollectionRouteConfig {
1033 pub collection: String,
1035 pub route: CollectionTransportRoute,
1037 pub priority: MessagePriority,
1039}
1040
1041#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1068pub struct CollectionRouteTable {
1069 collections: Vec<CollectionRouteConfig>,
1070}
1071
1072impl CollectionRouteTable {
1073 pub fn new() -> Self {
1075 Self::default()
1076 }
1077
1078 pub fn with_collection(mut self, config: CollectionRouteConfig) -> Self {
1080 self.collections.push(config);
1081 self
1082 }
1083
1084 pub fn get(&self, collection: &str) -> Option<&CollectionRouteConfig> {
1086 self.collections.iter().find(|c| c.collection == collection)
1087 }
1088
1089 pub fn has_collection(&self, collection: &str) -> bool {
1091 self.collections.iter().any(|c| c.collection == collection)
1092 }
1093
1094 pub fn is_bypass(&self, collection: &str) -> bool {
1096 self.get(collection)
1097 .map(|c| matches!(c.route, CollectionTransportRoute::Bypass { .. }))
1098 .unwrap_or(false)
1099 }
1100}
1101
1102impl std::fmt::Debug for TransportManager {
1103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104 f.debug_struct("TransportManager")
1105 .field("transports", &self.transports.keys().collect::<Vec<_>>())
1106 .field("config", &self.config)
1107 .finish()
1108 }
1109}
1110
1111#[cfg(test)]
1116mod tests {
1117 use super::*;
1118 use crate::transport::bypass::{BypassChannelConfig, UdpBypassChannel};
1119 use crate::transport::capabilities::{MessagePriority, TransportCapabilities};
1120 use crate::transport::{MeshConnection, MeshTransport, PeerEventReceiver};
1121 use async_trait::async_trait;
1122 use std::time::Instant;
1123 use tokio::sync::mpsc;
1124
1125 struct MockTransport {
1127 caps: TransportCapabilities,
1128 available: bool,
1129 reachable_peers: Vec<NodeId>,
1130 signal: Option<u8>,
1131 }
1132
1133 impl MockTransport {
1134 fn new(caps: TransportCapabilities) -> Self {
1135 Self {
1136 caps,
1137 available: true,
1138 reachable_peers: vec![],
1139 signal: None,
1140 }
1141 }
1142
1143 fn with_peer(mut self, peer: NodeId) -> Self {
1144 self.reachable_peers.push(peer);
1145 self
1146 }
1147
1148 #[allow(dead_code)]
1149 fn with_signal(mut self, signal: u8) -> Self {
1150 self.signal = Some(signal);
1151 self
1152 }
1153
1154 fn unavailable(mut self) -> Self {
1155 self.available = false;
1156 self
1157 }
1158 }
1159
1160 struct MockConnection {
1161 peer_id: NodeId,
1162 connected_at: Instant,
1163 }
1164
1165 impl MeshConnection for MockConnection {
1166 fn peer_id(&self) -> &NodeId {
1167 &self.peer_id
1168 }
1169
1170 fn is_alive(&self) -> bool {
1171 true
1172 }
1173
1174 fn connected_at(&self) -> Instant {
1175 self.connected_at
1176 }
1177 }
1178
1179 #[async_trait]
1180 impl MeshTransport for MockTransport {
1181 async fn start(&self) -> Result<()> {
1182 Ok(())
1183 }
1184
1185 async fn stop(&self) -> Result<()> {
1186 Ok(())
1187 }
1188
1189 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>> {
1190 if self.reachable_peers.contains(peer_id) {
1191 Ok(Box::new(MockConnection {
1192 peer_id: peer_id.clone(),
1193 connected_at: Instant::now(),
1194 }))
1195 } else {
1196 Err(TransportError::PeerNotFound(peer_id.to_string()))
1197 }
1198 }
1199
1200 async fn disconnect(&self, _peer_id: &NodeId) -> Result<()> {
1201 Ok(())
1202 }
1203
1204 fn get_connection(&self, _peer_id: &NodeId) -> Option<Box<dyn MeshConnection>> {
1205 None
1206 }
1207
1208 fn peer_count(&self) -> usize {
1209 0
1210 }
1211
1212 fn connected_peers(&self) -> Vec<NodeId> {
1213 vec![]
1214 }
1215
1216 fn subscribe_peer_events(&self) -> PeerEventReceiver {
1217 let (_tx, rx) = mpsc::channel(1);
1218 rx
1219 }
1220 }
1221
1222 impl Transport for MockTransport {
1223 fn capabilities(&self) -> &TransportCapabilities {
1224 &self.caps
1225 }
1226
1227 fn is_available(&self) -> bool {
1228 self.available
1229 }
1230
1231 fn signal_quality(&self) -> Option<u8> {
1232 self.signal
1233 }
1234
1235 fn can_reach(&self, peer_id: &NodeId) -> bool {
1236 self.reachable_peers.contains(peer_id)
1237 }
1238 }
1239
1240 #[test]
1241 fn test_register_transport() {
1242 let config = TransportManagerConfig::default();
1243 let mut manager = TransportManager::new(config);
1244
1245 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1246 manager.register(transport);
1247
1248 assert!(manager.get_transport(TransportType::Quic).is_some());
1249 assert!(manager.get_transport(TransportType::LoRa).is_none());
1250 }
1251
1252 #[test]
1253 fn test_unregister_transport() {
1254 let config = TransportManagerConfig::default();
1255 let mut manager = TransportManager::new(config);
1256
1257 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1258 manager.register(transport);
1259
1260 let removed = manager.unregister(TransportType::Quic);
1261 assert!(removed.is_some());
1262 assert!(manager.get_transport(TransportType::Quic).is_none());
1263 }
1264
1265 #[test]
1266 fn test_available_transports() {
1267 let config = TransportManagerConfig::default();
1268 let mut manager = TransportManager::new(config);
1269
1270 let peer = NodeId::new("peer-1".to_string());
1271
1272 let quic =
1274 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1275 manager.register(quic);
1276
1277 let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1279 manager.register(ble);
1280
1281 let lora = Arc::new(
1283 MockTransport::new(TransportCapabilities::lora(7))
1284 .unavailable()
1285 .with_peer(peer.clone()),
1286 );
1287 manager.register(lora);
1288
1289 let available = manager.available_transports(&peer);
1290 assert_eq!(available.len(), 1);
1291 assert!(available.contains(&TransportType::Quic));
1292 }
1293
1294 #[test]
1295 fn test_select_transport_by_reliability() {
1296 let config = TransportManagerConfig::default();
1297 let mut manager = TransportManager::new(config);
1298
1299 let peer = NodeId::new("peer-1".to_string());
1300
1301 let quic =
1303 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1304 manager.register(quic);
1305
1306 let lora =
1308 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1309 manager.register(lora);
1310
1311 let requirements = MessageRequirements {
1313 reliable: true,
1314 ..Default::default()
1315 };
1316
1317 let selected = manager.select_transport(&peer, &requirements);
1318 assert_eq!(selected, Some(TransportType::Quic));
1319 }
1320
1321 #[test]
1322 fn test_select_transport_by_preference() {
1323 let config = TransportManagerConfig {
1324 preference_order: vec![TransportType::BluetoothLE, TransportType::Quic],
1325 ..Default::default()
1326 };
1327 let mut manager = TransportManager::new(config);
1328
1329 let peer = NodeId::new("peer-1".to_string());
1330
1331 let quic =
1333 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1334 manager.register(quic);
1335
1336 let ble = Arc::new(
1337 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1338 );
1339 manager.register(ble);
1340
1341 let requirements = MessageRequirements::default();
1342 let selected = manager.select_transport(&peer, &requirements);
1343
1344 assert_eq!(selected, Some(TransportType::BluetoothLE));
1346 }
1347
1348 #[test]
1349 fn test_select_transport_by_latency() {
1350 let config = TransportManagerConfig::default();
1351 let mut manager = TransportManager::new(config);
1352
1353 let peer = NodeId::new("peer-1".to_string());
1354
1355 let quic =
1357 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1358 manager.register(quic);
1359
1360 let mut lora_caps = TransportCapabilities::lora(7);
1362 lora_caps.reliable = true; let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1364 manager.register(lora);
1365
1366 let requirements = MessageRequirements {
1368 priority: MessagePriority::High,
1369 reliable: true,
1370 ..Default::default()
1371 };
1372
1373 let selected = manager.select_transport(&peer, &requirements);
1374 assert_eq!(selected, Some(TransportType::Quic));
1375 }
1376
1377 #[test]
1378 fn test_select_transport_with_latency_requirement() {
1379 let config = TransportManagerConfig::default();
1380 let mut manager = TransportManager::new(config);
1381
1382 let peer = NodeId::new("peer-1".to_string());
1383
1384 let quic =
1386 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1387 manager.register(quic);
1388
1389 let mut lora_caps = TransportCapabilities::lora(12);
1391 lora_caps.reliable = true;
1392 let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
1393 manager.register(lora);
1394
1395 let requirements = MessageRequirements {
1397 reliable: true,
1398 max_latency_ms: Some(50),
1399 ..Default::default()
1400 };
1401
1402 let selected = manager.select_transport(&peer, &requirements);
1403 assert_eq!(selected, Some(TransportType::Quic));
1404 }
1405
1406 #[test]
1407 fn test_select_transport_no_match() {
1408 let config = TransportManagerConfig::default();
1409 let mut manager = TransportManager::new(config);
1410
1411 let peer = NodeId::new("peer-1".to_string());
1412
1413 let lora =
1415 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
1416 manager.register(lora);
1417
1418 let requirements = MessageRequirements {
1420 reliable: true,
1421 ..Default::default()
1422 };
1423
1424 let selected = manager.select_transport(&peer, &requirements);
1425 assert_eq!(selected, None);
1426 }
1427
1428 #[test]
1429 fn test_peer_transport_caching() {
1430 let config = TransportManagerConfig {
1431 cache_peer_transport: true,
1432 ..Default::default()
1433 };
1434 let mut manager = TransportManager::new(config);
1435
1436 let peer = NodeId::new("peer-1".to_string());
1437
1438 let quic =
1439 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1440 manager.register(quic);
1441
1442 let ble = Arc::new(
1443 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1444 );
1445 manager.register(ble);
1446
1447 manager.record_success(&peer, TransportType::BluetoothLE);
1449
1450 let requirements = MessageRequirements::default();
1452 let selected = manager.select_transport(&peer, &requirements);
1453 assert_eq!(selected, Some(TransportType::BluetoothLE));
1454
1455 manager.clear_cache(&peer);
1457
1458 let selected = manager.select_transport(&peer, &requirements);
1460 assert_eq!(selected, Some(TransportType::Quic));
1462 }
1463
1464 #[test]
1465 fn test_power_sensitive_selection() {
1466 let config = TransportManagerConfig {
1468 preference_order: vec![],
1469 ..Default::default()
1470 };
1471 let mut manager = TransportManager::new(config);
1472
1473 let peer = NodeId::new("peer-1".to_string());
1474
1475 let quic =
1477 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1478 manager.register(quic);
1479
1480 let ble = Arc::new(
1482 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1483 );
1484 manager.register(ble);
1485
1486 let requirements = MessageRequirements {
1488 power_sensitive: true,
1489 ..Default::default()
1490 };
1491
1492 let selected = manager.select_transport(&peer, &requirements);
1493 assert_eq!(selected, Some(TransportType::BluetoothLE));
1495 }
1496
1497 #[tokio::test]
1498 async fn test_connect_selects_transport() {
1499 let config = TransportManagerConfig::default();
1500 let mut manager = TransportManager::new(config);
1501
1502 let peer = NodeId::new("peer-1".to_string());
1503
1504 let quic =
1505 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1506 manager.register(quic);
1507
1508 let requirements = MessageRequirements::default();
1509 let result = manager.connect(&peer, &requirements).await;
1510
1511 assert!(result.is_ok());
1512 let (transport_type, conn) = result.unwrap();
1513 assert_eq!(transport_type, TransportType::Quic);
1514 assert_eq!(conn.peer_id(), &peer);
1515 }
1516
1517 #[tokio::test]
1518 async fn test_connect_with_fallback() {
1519 let config = TransportManagerConfig {
1520 enable_fallback: true,
1521 ..Default::default()
1522 };
1523 let mut manager = TransportManager::new(config);
1524
1525 let peer = NodeId::new("peer-1".to_string());
1526
1527 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1529 manager.register(quic);
1530
1531 let ble = Arc::new(
1533 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
1534 );
1535 manager.register(ble);
1536
1537 let requirements = MessageRequirements::default();
1538 let result = manager.connect_with_fallback(&peer, &requirements).await;
1539
1540 assert!(result.is_ok());
1541 let (transport_type, _) = result.unwrap();
1542 assert_eq!(transport_type, TransportType::BluetoothLE);
1543 }
1544
1545 #[test]
1546 fn test_distance_tracking() {
1547 let config = TransportManagerConfig::default();
1548 let manager = TransportManager::new(config);
1549
1550 let peer = NodeId::new("peer-1".to_string());
1551
1552 let distance = PeerDistance {
1553 peer_id: peer.clone(),
1554 distance_meters: 500,
1555 source: super::super::capabilities::DistanceSource::Gps {
1556 confidence_meters: 10,
1557 },
1558 last_updated: Instant::now(),
1559 };
1560
1561 manager.update_peer_distance(distance);
1562
1563 let retrieved = manager.get_peer_distance(&peer);
1564 assert!(retrieved.is_some());
1565 assert_eq!(retrieved.unwrap().distance_meters, 500);
1566 }
1567
1568 #[tokio::test]
1573 async fn test_no_bypass_channel_by_default() {
1574 let config = TransportManagerConfig::default();
1575 let manager = TransportManager::new(config);
1576
1577 assert!(!manager.has_bypass_channel());
1578 assert!(!manager.is_bypass_collection("test").await);
1579 }
1580
1581 #[test]
1582 fn test_route_message_without_bypass() {
1583 let config = TransportManagerConfig::default();
1584 let mut manager = TransportManager::new(config);
1585
1586 let peer = NodeId::new("peer-1".to_string());
1587
1588 let quic =
1589 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1590 manager.register(quic);
1591
1592 let requirements = MessageRequirements::default();
1594 let decision = manager.route_message(&peer, &requirements);
1595 assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1596
1597 let bypass_req = MessageRequirements {
1600 bypass_sync: true,
1601 max_latency_ms: Some(100), ..Default::default()
1603 };
1604 let decision = manager.route_message(&peer, &bypass_req);
1605 assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
1607 }
1608
1609 #[tokio::test]
1610 async fn test_subscribe_bypass_not_configured() {
1611 let config = TransportManagerConfig::default();
1612 let manager = TransportManager::new(config);
1613
1614 let result = manager.subscribe_bypass().await;
1615 assert!(result.is_err());
1616 }
1617
1618 #[test]
1619 fn test_route_decision_equality() {
1620 assert_eq!(RouteDecision::Bypass, RouteDecision::Bypass);
1621 assert_eq!(
1622 RouteDecision::Transport(TransportType::Quic),
1623 RouteDecision::Transport(TransportType::Quic)
1624 );
1625 assert_ne!(RouteDecision::Bypass, RouteDecision::NoRoute);
1626 assert_ne!(
1627 RouteDecision::Transport(TransportType::Quic),
1628 RouteDecision::Transport(TransportType::LoRa)
1629 );
1630 }
1631
1632 #[test]
1637 fn test_register_instance() {
1638 let config = TransportManagerConfig::default();
1639 let manager = TransportManager::new(config);
1640
1641 let peer = NodeId::new("peer-1".to_string());
1642 let instance = TransportInstance::new(
1643 "iroh-eth0",
1644 TransportType::Quic,
1645 TransportCapabilities::quic(),
1646 );
1647 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer));
1648
1649 manager.register_instance(instance, transport);
1650
1651 assert!(manager.get_instance(&"iroh-eth0".to_string()).is_some());
1652 assert!(manager.get_instance(&"nonexistent".to_string()).is_none());
1653 }
1654
1655 #[test]
1656 fn test_unregister_instance() {
1657 let config = TransportManagerConfig::default();
1658 let manager = TransportManager::new(config);
1659
1660 let instance = TransportInstance::new(
1661 "iroh-eth0",
1662 TransportType::Quic,
1663 TransportCapabilities::quic(),
1664 );
1665 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1666
1667 manager.register_instance(instance, transport);
1668
1669 let removed = manager.unregister_instance(&"iroh-eth0".to_string());
1670 assert!(removed.is_some());
1671 let (inst, _) = removed.unwrap();
1672 assert_eq!(inst.id, "iroh-eth0");
1673
1674 assert!(manager.get_instance(&"iroh-eth0".to_string()).is_none());
1676
1677 let removed_again = manager.unregister_instance(&"iroh-eth0".to_string());
1679 assert!(removed_again.is_none());
1680 }
1681
1682 #[test]
1683 fn test_registered_instance_ids() {
1684 let config = TransportManagerConfig::default();
1685 let manager = TransportManager::new(config);
1686
1687 assert!(manager.registered_instance_ids().is_empty());
1689
1690 let inst1 = TransportInstance::new(
1691 "iroh-eth0",
1692 TransportType::Quic,
1693 TransportCapabilities::quic(),
1694 );
1695 let inst2 = TransportInstance::new(
1696 "lora-915",
1697 TransportType::LoRa,
1698 TransportCapabilities::lora(7),
1699 );
1700
1701 manager.register_instance(
1702 inst1,
1703 Arc::new(MockTransport::new(TransportCapabilities::quic())),
1704 );
1705 manager.register_instance(
1706 inst2,
1707 Arc::new(MockTransport::new(TransportCapabilities::lora(7))),
1708 );
1709
1710 let ids = manager.registered_instance_ids();
1711 assert_eq!(ids.len(), 2);
1712 assert!(ids.contains(&"iroh-eth0".to_string()));
1713 assert!(ids.contains(&"lora-915".to_string()));
1714 }
1715
1716 #[test]
1717 fn test_available_instance_ids() {
1718 let config = TransportManagerConfig::default();
1719 let manager = TransportManager::new(config);
1720
1721 let inst1 = TransportInstance::new(
1723 "iroh-eth0",
1724 TransportType::Quic,
1725 TransportCapabilities::quic(),
1726 );
1727 let transport1 = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1728 manager.register_instance(inst1, transport1);
1729
1730 let inst2 = TransportInstance::new(
1732 "lora-off",
1733 TransportType::LoRa,
1734 TransportCapabilities::lora(7),
1735 );
1736 let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)).unavailable());
1737 manager.register_instance(inst2, transport2);
1738
1739 let mut inst3 = TransportInstance::new(
1741 "ble-disabled",
1742 TransportType::BluetoothLE,
1743 TransportCapabilities::bluetooth_le(),
1744 );
1745 inst3.available = false;
1746 let transport3 = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1747 manager.register_instance(inst3, transport3);
1748
1749 let available = manager.available_instance_ids();
1750 assert_eq!(available.len(), 1);
1751 assert!(available.contains("iroh-eth0"));
1752 }
1753
1754 #[test]
1755 fn test_available_instances_for_peer() {
1756 let config = TransportManagerConfig::default();
1757 let manager = TransportManager::new(config);
1758
1759 let peer = NodeId::new("peer-1".to_string());
1760
1761 let inst1 = TransportInstance::new(
1763 "iroh-eth0",
1764 TransportType::Quic,
1765 TransportCapabilities::quic(),
1766 );
1767 let transport1 =
1768 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1769 manager.register_instance(inst1, transport1);
1770
1771 let inst2 = TransportInstance::new(
1773 "lora-915",
1774 TransportType::LoRa,
1775 TransportCapabilities::lora(7),
1776 );
1777 let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1778 manager.register_instance(inst2, transport2);
1779
1780 let inst3 = TransportInstance::new(
1782 "ble-off",
1783 TransportType::BluetoothLE,
1784 TransportCapabilities::bluetooth_le(),
1785 );
1786 let transport3 = Arc::new(
1787 MockTransport::new(TransportCapabilities::bluetooth_le())
1788 .with_peer(peer.clone())
1789 .unavailable(),
1790 );
1791 manager.register_instance(inst3, transport3);
1792
1793 let for_peer = manager.available_instances_for_peer(&peer);
1794 assert_eq!(for_peer.len(), 1);
1795 assert_eq!(for_peer[0], "iroh-eth0");
1796 }
1797
1798 #[test]
1803 fn test_current_pace_level_no_policy_with_available() {
1804 let config = TransportManagerConfig::default();
1805 let manager = TransportManager::new(config);
1806
1807 let inst = TransportInstance::new(
1809 "iroh-eth0",
1810 TransportType::Quic,
1811 TransportCapabilities::quic(),
1812 );
1813 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1814 manager.register_instance(inst, transport);
1815
1816 assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1818 }
1819
1820 #[test]
1821 fn test_current_pace_level_no_policy_none_available() {
1822 let config = TransportManagerConfig::default();
1823 let manager = TransportManager::new(config);
1824
1825 assert_eq!(manager.current_pace_level(), PaceLevel::None);
1827 }
1828
1829 #[test]
1830 fn test_current_pace_level_no_policy_all_unavailable() {
1831 let config = TransportManagerConfig::default();
1832 let manager = TransportManager::new(config);
1833
1834 let inst = TransportInstance::new(
1836 "iroh-eth0",
1837 TransportType::Quic,
1838 TransportCapabilities::quic(),
1839 );
1840 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).unavailable());
1841 manager.register_instance(inst, transport);
1842
1843 assert_eq!(manager.current_pace_level(), PaceLevel::None);
1844 }
1845
1846 #[test]
1847 fn test_current_pace_level_with_policy_primary() {
1848 let policy = TransportPolicy::new("test")
1849 .primary(vec!["iroh-eth0"])
1850 .alternate(vec!["lora-915"])
1851 .emergency(vec!["ble-mesh"]);
1852
1853 let config = TransportManagerConfig::with_policy(policy);
1854 let manager = TransportManager::new(config);
1855
1856 let inst = TransportInstance::new(
1858 "iroh-eth0",
1859 TransportType::Quic,
1860 TransportCapabilities::quic(),
1861 );
1862 let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
1863 manager.register_instance(inst, transport);
1864
1865 assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
1866 }
1867
1868 #[test]
1869 fn test_current_pace_level_with_policy_alternate() {
1870 let policy = TransportPolicy::new("test")
1871 .primary(vec!["iroh-eth0"])
1872 .alternate(vec!["lora-915"])
1873 .emergency(vec!["ble-mesh"]);
1874
1875 let config = TransportManagerConfig::with_policy(policy);
1876 let manager = TransportManager::new(config);
1877
1878 let inst = TransportInstance::new(
1880 "lora-915",
1881 TransportType::LoRa,
1882 TransportCapabilities::lora(7),
1883 );
1884 let transport = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
1885 manager.register_instance(inst, transport);
1886
1887 assert_eq!(manager.current_pace_level(), PaceLevel::Alternate);
1888 }
1889
1890 #[test]
1891 fn test_current_pace_level_with_policy_emergency() {
1892 let policy = TransportPolicy::new("test")
1893 .primary(vec!["iroh-eth0"])
1894 .alternate(vec!["lora-915"])
1895 .emergency(vec!["ble-mesh"]);
1896
1897 let config = TransportManagerConfig::with_policy(policy);
1898 let manager = TransportManager::new(config);
1899
1900 let inst = TransportInstance::new(
1902 "ble-mesh",
1903 TransportType::BluetoothLE,
1904 TransportCapabilities::bluetooth_le(),
1905 );
1906 let transport = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
1907 manager.register_instance(inst, transport);
1908
1909 assert_eq!(manager.current_pace_level(), PaceLevel::Emergency);
1910 }
1911
1912 #[test]
1913 fn test_current_pace_level_with_policy_none_available() {
1914 let policy = TransportPolicy::new("test")
1915 .primary(vec!["iroh-eth0"])
1916 .alternate(vec!["lora-915"]);
1917
1918 let config = TransportManagerConfig::with_policy(policy);
1919 let manager = TransportManager::new(config);
1920
1921 assert_eq!(manager.current_pace_level(), PaceLevel::None);
1923 }
1924
1925 #[test]
1930 fn test_select_transports_pace_no_policy() {
1931 let config = TransportManagerConfig::default();
1932 let manager = TransportManager::new(config);
1933
1934 let peer = NodeId::new("peer-1".to_string());
1935 let requirements = MessageRequirements::default();
1936
1937 let selected = manager.select_transports_pace(&peer, &requirements);
1939 assert!(selected.is_empty());
1940 }
1941
1942 #[test]
1943 fn test_select_transports_pace_single_mode() {
1944 let policy = TransportPolicy::new("test")
1945 .primary(vec!["iroh-eth0", "iroh-wlan0"])
1946 .alternate(vec!["lora-915"]);
1947
1948 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
1949 let manager = TransportManager::new(config);
1950
1951 let peer = NodeId::new("peer-1".to_string());
1952
1953 let inst1 = TransportInstance::new(
1955 "iroh-eth0",
1956 TransportType::Quic,
1957 TransportCapabilities::quic(),
1958 );
1959 let t1 =
1960 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1961 manager.register_instance(inst1, t1);
1962
1963 let inst2 = TransportInstance::new(
1964 "iroh-wlan0",
1965 TransportType::Quic,
1966 TransportCapabilities::quic(),
1967 );
1968 let t2 =
1969 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1970 manager.register_instance(inst2, t2);
1971
1972 let requirements = MessageRequirements::default();
1973 let selected = manager.select_transports_pace(&peer, &requirements);
1974
1975 assert_eq!(selected.len(), 1);
1977 assert_eq!(selected[0], "iroh-eth0");
1978 }
1979
1980 #[test]
1981 fn test_select_transports_pace_redundant_mode() {
1982 let policy = TransportPolicy::new("test")
1983 .primary(vec!["iroh-eth0", "iroh-wlan0"])
1984 .alternate(vec!["lora-915"]);
1985
1986 let config =
1987 TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(2));
1988 let manager = TransportManager::new(config);
1989
1990 let peer = NodeId::new("peer-1".to_string());
1991
1992 let inst1 = TransportInstance::new(
1993 "iroh-eth0",
1994 TransportType::Quic,
1995 TransportCapabilities::quic(),
1996 );
1997 let t1 =
1998 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
1999 manager.register_instance(inst1, t1);
2000
2001 let inst2 = TransportInstance::new(
2002 "iroh-wlan0",
2003 TransportType::Quic,
2004 TransportCapabilities::quic(),
2005 );
2006 let t2 =
2007 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2008 manager.register_instance(inst2, t2);
2009
2010 let inst3 = TransportInstance::new(
2011 "lora-915",
2012 TransportType::LoRa,
2013 TransportCapabilities::lora(7),
2014 );
2015 let t3 =
2016 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2017 manager.register_instance(inst3, t3);
2018
2019 let requirements = MessageRequirements::default();
2020 let selected = manager.select_transports_pace(&peer, &requirements);
2021
2022 assert!(selected.len() >= 2);
2024 }
2025
2026 #[test]
2027 fn test_select_transports_pace_redundant_bounded() {
2028 let policy = TransportPolicy::new("test").primary(vec!["t1", "t2", "t3", "t4"]);
2029
2030 let config = TransportManagerConfig::with_policy(policy)
2031 .with_mode(TransportMode::redundant_bounded(1, 2));
2032 let manager = TransportManager::new(config);
2033
2034 let peer = NodeId::new("peer-1".to_string());
2035
2036 for name in &["t1", "t2", "t3", "t4"] {
2038 let inst =
2039 TransportInstance::new(*name, TransportType::Quic, TransportCapabilities::quic());
2040 let t =
2041 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2042 manager.register_instance(inst, t);
2043 }
2044
2045 let requirements = MessageRequirements::default();
2046 let selected = manager.select_transports_pace(&peer, &requirements);
2047
2048 assert_eq!(selected.len(), 2);
2050 }
2051
2052 #[test]
2053 fn test_select_transports_pace_bonded_mode() {
2054 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2055
2056 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2057 let manager = TransportManager::new(config);
2058
2059 let peer = NodeId::new("peer-1".to_string());
2060
2061 let inst1 = TransportInstance::new(
2062 "iroh-eth0",
2063 TransportType::Quic,
2064 TransportCapabilities::quic(),
2065 );
2066 let t1 =
2067 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2068 manager.register_instance(inst1, t1);
2069
2070 let inst2 = TransportInstance::new(
2071 "iroh-wlan0",
2072 TransportType::Quic,
2073 TransportCapabilities::quic(),
2074 );
2075 let t2 =
2076 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2077 manager.register_instance(inst2, t2);
2078
2079 let requirements = MessageRequirements::default();
2080 let selected = manager.select_transports_pace(&peer, &requirements);
2081
2082 assert_eq!(selected.len(), 2);
2084 }
2085
2086 #[test]
2087 fn test_select_transports_pace_load_balanced_mode() {
2088 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2089
2090 let config = TransportManagerConfig::with_policy(policy)
2091 .with_mode(TransportMode::LoadBalanced { weights: None });
2092 let manager = TransportManager::new(config);
2093
2094 let peer = NodeId::new("peer-1".to_string());
2095
2096 let inst1 = TransportInstance::new(
2097 "iroh-eth0",
2098 TransportType::Quic,
2099 TransportCapabilities::quic(),
2100 );
2101 let t1 =
2102 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2103 manager.register_instance(inst1, t1);
2104
2105 let inst2 = TransportInstance::new(
2106 "iroh-wlan0",
2107 TransportType::Quic,
2108 TransportCapabilities::quic(),
2109 );
2110 let t2 =
2111 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2112 manager.register_instance(inst2, t2);
2113
2114 let requirements = MessageRequirements::default();
2115 let selected = manager.select_transports_pace(&peer, &requirements);
2116
2117 assert_eq!(selected.len(), 2);
2119 }
2120
2121 #[test]
2122 fn test_select_transports_pace_filters_by_requirements() {
2123 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2124
2125 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2126 let manager = TransportManager::new(config);
2127
2128 let peer = NodeId::new("peer-1".to_string());
2129
2130 let inst1 = TransportInstance::new(
2132 "iroh-eth0",
2133 TransportType::Quic,
2134 TransportCapabilities::quic(),
2135 );
2136 let t1 =
2137 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2138 manager.register_instance(inst1, t1);
2139
2140 let inst2 = TransportInstance::new(
2142 "lora-915",
2143 TransportType::LoRa,
2144 TransportCapabilities::lora(7),
2145 );
2146 let t2 =
2147 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2148 manager.register_instance(inst2, t2);
2149
2150 let requirements = MessageRequirements {
2152 reliable: true,
2153 ..Default::default()
2154 };
2155 let selected = manager.select_transports_pace(&peer, &requirements);
2156
2157 assert_eq!(selected.len(), 1);
2158 assert_eq!(selected[0], "iroh-eth0");
2159 }
2160
2161 #[test]
2162 fn test_select_transports_pace_filters_unreachable_peer() {
2163 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
2164
2165 let config = TransportManagerConfig::with_policy(policy);
2166 let manager = TransportManager::new(config);
2167
2168 let peer = NodeId::new("peer-1".to_string());
2169
2170 let inst1 = TransportInstance::new(
2172 "iroh-eth0",
2173 TransportType::Quic,
2174 TransportCapabilities::quic(),
2175 );
2176 let t1 =
2177 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2178 manager.register_instance(inst1, t1);
2179
2180 let inst2 = TransportInstance::new(
2182 "lora-915",
2183 TransportType::LoRa,
2184 TransportCapabilities::lora(7),
2185 );
2186 let t2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
2187 manager.register_instance(inst2, t2);
2188
2189 let requirements = MessageRequirements::default();
2190 let selected = manager.select_transports_pace(&peer, &requirements);
2191
2192 assert_eq!(selected.len(), 1);
2193 assert_eq!(selected[0], "iroh-eth0");
2194 }
2195
2196 #[test]
2201 fn test_select_transport_pace_returns_first() {
2202 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
2203
2204 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
2205 let manager = TransportManager::new(config);
2206
2207 let peer = NodeId::new("peer-1".to_string());
2208
2209 let inst1 = TransportInstance::new(
2210 "iroh-eth0",
2211 TransportType::Quic,
2212 TransportCapabilities::quic(),
2213 );
2214 let t1 =
2215 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2216 manager.register_instance(inst1, t1);
2217
2218 let inst2 = TransportInstance::new(
2219 "iroh-wlan0",
2220 TransportType::Quic,
2221 TransportCapabilities::quic(),
2222 );
2223 let t2 =
2224 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2225 manager.register_instance(inst2, t2);
2226
2227 let requirements = MessageRequirements::default();
2228 let selected = manager.select_transport_pace(&peer, &requirements);
2229
2230 assert_eq!(selected, Some("iroh-eth0".to_string()));
2231 }
2232
2233 #[test]
2234 fn test_select_transport_pace_returns_none_no_policy() {
2235 let config = TransportManagerConfig::default();
2236 let manager = TransportManager::new(config);
2237
2238 let peer = NodeId::new("peer-1".to_string());
2239 let requirements = MessageRequirements::default();
2240
2241 assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2242 }
2243
2244 #[test]
2245 fn test_select_transport_pace_returns_none_no_candidates() {
2246 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
2247
2248 let config = TransportManagerConfig::with_policy(policy);
2249 let manager = TransportManager::new(config);
2250
2251 let peer = NodeId::new("peer-1".to_string());
2252 let requirements = MessageRequirements::default();
2253
2254 assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
2256 }
2257
2258 #[test]
2263 fn test_record_success_pace_caching_enabled() {
2264 let config = TransportManagerConfig {
2265 cache_peer_transport: true,
2266 ..Default::default()
2267 };
2268 let manager = TransportManager::new(config);
2269
2270 let peer = NodeId::new("peer-1".to_string());
2271 manager.record_success_pace(&peer, "iroh-eth0".to_string());
2272
2273 let cached = manager.peer_transport_ids.read().unwrap();
2274 assert_eq!(cached.get(&peer), Some(&"iroh-eth0".to_string()));
2275 }
2276
2277 #[test]
2278 fn test_record_success_pace_caching_disabled() {
2279 let config = TransportManagerConfig {
2280 cache_peer_transport: false,
2281 ..Default::default()
2282 };
2283 let manager = TransportManager::new(config);
2284
2285 let peer = NodeId::new("peer-1".to_string());
2286 manager.record_success_pace(&peer, "iroh-eth0".to_string());
2287
2288 let cached = manager.peer_transport_ids.read().unwrap();
2289 assert!(cached.get(&peer).is_none());
2290 }
2291
2292 #[test]
2293 fn test_clear_cache_pace() {
2294 let config = TransportManagerConfig {
2295 cache_peer_transport: true,
2296 ..Default::default()
2297 };
2298 let manager = TransportManager::new(config);
2299
2300 let peer = NodeId::new("peer-1".to_string());
2301 manager.record_success_pace(&peer, "iroh-eth0".to_string());
2302
2303 assert!(manager
2305 .peer_transport_ids
2306 .read()
2307 .unwrap()
2308 .get(&peer)
2309 .is_some());
2310
2311 manager.clear_cache_pace(&peer);
2312
2313 assert!(manager
2315 .peer_transport_ids
2316 .read()
2317 .unwrap()
2318 .get(&peer)
2319 .is_none());
2320 }
2321
2322 #[test]
2323 fn test_clear_cache_pace_nonexistent_peer() {
2324 let config = TransportManagerConfig::default();
2325 let manager = TransportManager::new(config);
2326
2327 let peer = NodeId::new("nonexistent".to_string());
2328
2329 manager.clear_cache_pace(&peer);
2331 }
2332
2333 #[test]
2338 fn test_select_transport_for_distance_no_distance() {
2339 let config = TransportManagerConfig::default();
2340 let mut manager = TransportManager::new(config);
2341
2342 let peer = NodeId::new("peer-1".to_string());
2343 let quic =
2344 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2345 manager.register(quic);
2346
2347 let requirements = MessageRequirements::default();
2348 let result = manager.select_transport_for_distance(&peer, &requirements);
2349
2350 assert!(result.is_some());
2351 let (transport_type, range_mode) = result.unwrap();
2352 assert_eq!(transport_type, TransportType::Quic);
2353 assert!(range_mode.is_none());
2354 }
2355
2356 #[test]
2357 fn test_select_transport_for_distance_with_distance() {
2358 let config = TransportManagerConfig::default();
2359 let mut manager = TransportManager::new(config);
2360
2361 let peer = NodeId::new("peer-1".to_string());
2362 let quic =
2363 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2364 manager.register(quic);
2365
2366 let distance = PeerDistance {
2368 peer_id: peer.clone(),
2369 distance_meters: 1000,
2370 source: super::super::capabilities::DistanceSource::Configured,
2371 last_updated: Instant::now(),
2372 };
2373 manager.update_peer_distance(distance);
2374
2375 let requirements = MessageRequirements::default();
2376 let result = manager.select_transport_for_distance(&peer, &requirements);
2377
2378 assert!(result.is_some());
2379 let (transport_type, range_mode) = result.unwrap();
2380 assert_eq!(transport_type, TransportType::Quic);
2381 assert!(range_mode.is_none());
2383 }
2384
2385 #[test]
2386 fn test_select_transport_for_distance_no_suitable_transport() {
2387 let config = TransportManagerConfig::default();
2388 let manager = TransportManager::new(config);
2389
2390 let peer = NodeId::new("peer-1".to_string());
2391 let requirements = MessageRequirements::default();
2392
2393 let result = manager.select_transport_for_distance(&peer, &requirements);
2394 assert!(result.is_none());
2395 }
2396
2397 #[test]
2402 fn test_config_with_policy() {
2403 let policy = TransportPolicy::new("tactical")
2404 .primary(vec!["iroh-eth0"])
2405 .alternate(vec!["lora-915"]);
2406
2407 let config = TransportManagerConfig::with_policy(policy);
2408
2409 assert!(config.default_policy.is_some());
2410 let p = config.default_policy.unwrap();
2411 assert_eq!(p.name, "tactical");
2412 assert_eq!(p.primary.len(), 1);
2413 assert_eq!(p.alternate.len(), 1);
2414 assert!(config.enable_fallback);
2416 assert!(config.cache_peer_transport);
2417 assert_eq!(config.switch_threshold, 10);
2418 assert!(matches!(config.transport_mode, TransportMode::Single));
2419 }
2420
2421 #[test]
2422 fn test_config_with_mode() {
2423 let config = TransportManagerConfig::default().with_mode(TransportMode::Bonded);
2424
2425 assert!(matches!(config.transport_mode, TransportMode::Bonded));
2426 }
2427
2428 #[test]
2429 fn test_config_with_policy_and_mode_chained() {
2430 let policy = TransportPolicy::new("test").primary(vec!["t1"]);
2431 let config =
2432 TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(3));
2433
2434 assert!(config.default_policy.is_some());
2435 assert!(matches!(
2436 config.transport_mode,
2437 TransportMode::Redundant {
2438 min_paths: 3,
2439 max_paths: None
2440 }
2441 ));
2442 }
2443
2444 #[tokio::test]
2449 async fn test_connect_no_suitable_transport() {
2450 let config = TransportManagerConfig::default();
2451 let manager = TransportManager::new(config);
2452
2453 let peer = NodeId::new("peer-1".to_string());
2454 let requirements = MessageRequirements::default();
2455
2456 let result = manager.connect(&peer, &requirements).await;
2457 assert!(result.is_err());
2458 match result {
2459 Err(TransportError::PeerNotFound(_)) => {} Err(other) => panic!("Expected PeerNotFound, got: {}", other),
2461 Ok(_) => panic!("Expected error but got Ok"),
2462 }
2463 }
2464
2465 #[tokio::test]
2466 async fn test_connect_unreachable_peer() {
2467 let config = TransportManagerConfig::default();
2468 let mut manager = TransportManager::new(config);
2469
2470 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2472 manager.register(quic);
2473
2474 let peer = NodeId::new("unreachable-peer".to_string());
2475 let requirements = MessageRequirements::default();
2476
2477 let result = manager.connect(&peer, &requirements).await;
2478 assert!(result.is_err());
2479 }
2480
2481 #[tokio::test]
2486 async fn test_connect_with_fallback_disabled() {
2487 let config = TransportManagerConfig {
2488 enable_fallback: false,
2489 ..Default::default()
2490 };
2491 let mut manager = TransportManager::new(config);
2492
2493 let peer = NodeId::new("peer-1".to_string());
2494
2495 let quic =
2497 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2498 manager.register(quic);
2499
2500 let ble = Arc::new(
2502 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2503 );
2504 manager.register(ble);
2505
2506 let peer_unreachable = NodeId::new("nobody".to_string());
2513 let requirements = MessageRequirements::default();
2514
2515 let result = manager
2516 .connect_with_fallback(&peer_unreachable, &requirements)
2517 .await;
2518 assert!(result.is_err());
2519 }
2520
2521 #[tokio::test]
2522 async fn test_connect_with_fallback_no_candidates() {
2523 let config = TransportManagerConfig::default();
2524 let manager = TransportManager::new(config);
2525
2526 let peer = NodeId::new("peer-1".to_string());
2527 let requirements = MessageRequirements::default();
2528
2529 let result = manager.connect_with_fallback(&peer, &requirements).await;
2530 assert!(result.is_err());
2531 match result {
2532 Err(ref e) => {
2533 let err_msg = format!("{}", e);
2534 assert!(err_msg.contains("No suitable transport"));
2535 }
2536 Ok(_) => panic!("Expected error but got Ok"),
2537 }
2538 }
2539
2540 #[test]
2545 fn test_route_message_no_route() {
2546 let config = TransportManagerConfig::default();
2547 let manager = TransportManager::new(config);
2548
2549 let peer = NodeId::new("peer-1".to_string());
2550 let requirements = MessageRequirements::default();
2551
2552 let decision = manager.route_message(&peer, &requirements);
2554 assert_eq!(decision, RouteDecision::NoRoute);
2555 }
2556
2557 #[test]
2558 fn test_route_message_bypass_requested_no_channel() {
2559 let config = TransportManagerConfig::default();
2560 let manager = TransportManager::new(config);
2561
2562 let peer = NodeId::new("peer-1".to_string());
2563 let requirements = MessageRequirements {
2564 bypass_sync: true,
2565 ..Default::default()
2566 };
2567
2568 let decision = manager.route_message(&peer, &requirements);
2570 assert_eq!(decision, RouteDecision::NoRoute);
2571 }
2572
2573 #[test]
2578 fn test_route_decision_no_route() {
2579 let decision = RouteDecision::NoRoute;
2580 assert_eq!(decision, RouteDecision::NoRoute);
2581 assert_ne!(decision, RouteDecision::Bypass);
2582 assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
2583 }
2584
2585 #[test]
2586 fn test_route_decision_debug() {
2587 let bypass = RouteDecision::Bypass;
2588 let transport = RouteDecision::Transport(TransportType::LoRa);
2589 let no_route = RouteDecision::NoRoute;
2590
2591 assert!(format!("{:?}", bypass).contains("Bypass"));
2592 assert!(format!("{:?}", transport).contains("LoRa"));
2593 assert!(format!("{:?}", no_route).contains("NoRoute"));
2594 }
2595
2596 #[test]
2597 fn test_route_decision_clone() {
2598 let original = RouteDecision::Transport(TransportType::BluetoothLE);
2599 let cloned = original.clone();
2600 assert_eq!(original, cloned);
2601 }
2602
2603 #[test]
2608 fn test_transport_manager_debug() {
2609 let config = TransportManagerConfig::default();
2610 let mut manager = TransportManager::new(config);
2611
2612 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2613 manager.register(quic);
2614
2615 let debug_str = format!("{:?}", manager);
2616 assert!(debug_str.contains("TransportManager"));
2617 assert!(debug_str.contains("Quic"));
2618 }
2619
2620 #[test]
2621 fn test_registered_transports() {
2622 let config = TransportManagerConfig::default();
2623 let mut manager = TransportManager::new(config);
2624
2625 assert!(manager.registered_transports().is_empty());
2626
2627 let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
2628 let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
2629 manager.register(quic);
2630 manager.register(ble);
2631
2632 let registered = manager.registered_transports();
2633 assert_eq!(registered.len(), 2);
2634 assert!(registered.contains(&TransportType::Quic));
2635 assert!(registered.contains(&TransportType::BluetoothLE));
2636 }
2637
2638 #[tokio::test]
2639 async fn test_set_bypass_channel() {
2640 let config = TransportManagerConfig::default();
2641 let mut manager = TransportManager::new(config);
2642
2643 assert!(!manager.has_bypass_channel());
2644
2645 let bypass_config = BypassChannelConfig::new();
2646 let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
2647 manager.set_bypass_channel(bypass);
2648
2649 assert!(manager.has_bypass_channel());
2650 }
2651
2652 #[test]
2653 fn test_record_success_caching_disabled() {
2654 let config = TransportManagerConfig {
2655 cache_peer_transport: false,
2656 ..Default::default()
2657 };
2658 let manager = TransportManager::new(config);
2659
2660 let peer = NodeId::new("peer-1".to_string());
2661 manager.record_success(&peer, TransportType::Quic);
2662
2663 let cached = manager.peer_transports.read().unwrap();
2665 assert!(cached.get(&peer).is_none());
2666 }
2667
2668 #[test]
2669 fn test_select_transport_cached_transport_invalid() {
2670 let config = TransportManagerConfig {
2671 cache_peer_transport: true,
2672 ..Default::default()
2673 };
2674 let mut manager = TransportManager::new(config);
2675
2676 let peer = NodeId::new("peer-1".to_string());
2677
2678 let ble = Arc::new(
2680 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
2681 );
2682 manager.register(ble);
2683
2684 manager.record_success(&peer, TransportType::LoRa);
2686
2687 let requirements = MessageRequirements::default();
2688 let selected = manager.select_transport(&peer, &requirements);
2689
2690 assert_eq!(selected, Some(TransportType::BluetoothLE));
2692 }
2693
2694 #[test]
2695 fn test_select_transport_cached_transport_unavailable() {
2696 let config = TransportManagerConfig {
2697 cache_peer_transport: true,
2698 ..Default::default()
2699 };
2700 let mut manager = TransportManager::new(config);
2701
2702 let peer = NodeId::new("peer-1".to_string());
2703
2704 let quic =
2706 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
2707 manager.register(quic);
2708
2709 let ble = Arc::new(
2711 MockTransport::new(TransportCapabilities::bluetooth_le())
2712 .with_peer(peer.clone())
2713 .unavailable(),
2714 );
2715 manager.register(ble);
2716
2717 manager.record_success(&peer, TransportType::BluetoothLE);
2719
2720 let requirements = MessageRequirements::default();
2721 let selected = manager.select_transport(&peer, &requirements);
2722
2723 assert_eq!(selected, Some(TransportType::Quic));
2725 }
2726
2727 #[test]
2728 fn test_pace_fallback_order() {
2729 let policy = TransportPolicy::new("test")
2731 .primary(vec!["dead-transport"])
2732 .alternate(vec!["lora-915"]);
2733
2734 let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
2735 let manager = TransportManager::new(config);
2736
2737 let peer = NodeId::new("peer-1".to_string());
2738
2739 let inst = TransportInstance::new(
2741 "lora-915",
2742 TransportType::LoRa,
2743 TransportCapabilities::lora(7),
2744 );
2745 let t =
2746 Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
2747 manager.register_instance(inst, t);
2748
2749 let requirements = MessageRequirements::default();
2750 let selected = manager.select_transports_pace(&peer, &requirements);
2751
2752 assert_eq!(selected.len(), 1);
2754 assert_eq!(selected[0], "lora-915");
2755 }
2756
2757 #[test]
2758 fn test_get_peer_distance_none() {
2759 let config = TransportManagerConfig::default();
2760 let manager = TransportManager::new(config);
2761
2762 let peer = NodeId::new("unknown-peer".to_string());
2763 assert!(manager.get_peer_distance(&peer).is_none());
2764 }
2765
2766 #[tokio::test]
2767 async fn test_send_bypass_not_configured() {
2768 let config = TransportManagerConfig::default();
2769 let manager = TransportManager::new(config);
2770
2771 let result = manager.send_bypass("test_collection", b"hello", None).await;
2772 assert!(result.is_err());
2773 let err_msg = format!("{}", result.unwrap_err());
2774 assert!(err_msg.contains("Bypass channel not configured"));
2775 }
2776
2777 #[tokio::test]
2778 async fn test_send_bypass_to_not_configured() {
2779 let config = TransportManagerConfig::default();
2780 let manager = TransportManager::new(config);
2781
2782 let target = BypassTarget::Broadcast { port: 5150 };
2783 let result = manager
2784 .send_bypass_to(target, "test_collection", b"hello")
2785 .await;
2786 assert!(result.is_err());
2787 let err_msg = format!("{}", result.unwrap_err());
2788 assert!(err_msg.contains("Bypass channel not configured"));
2789 }
2790
2791 #[tokio::test]
2792 async fn test_subscribe_bypass_collection_not_configured() {
2793 let config = TransportManagerConfig::default();
2794 let manager = TransportManager::new(config);
2795
2796 let result = manager.subscribe_bypass_collection("test").await;
2797 assert!(result.is_err());
2798 }
2799
2800 #[test]
2805 fn test_route_table_empty_returns_none() {
2806 let table = CollectionRouteTable::new();
2807 assert!(table.get("anything").is_none());
2808 assert!(!table.has_collection("anything"));
2809 assert!(!table.is_bypass("anything"));
2810 }
2811
2812 #[test]
2813 fn test_route_table_builder_and_lookup() {
2814 let table = CollectionRouteTable::new()
2815 .with_collection(CollectionRouteConfig {
2816 collection: "telemetry".to_string(),
2817 route: CollectionTransportRoute::Fixed {
2818 transport_type: TransportType::BluetoothLE,
2819 },
2820 priority: MessagePriority::Normal,
2821 })
2822 .with_collection(CollectionRouteConfig {
2823 collection: "position".to_string(),
2824 route: CollectionTransportRoute::Bypass {
2825 encoding: MessageEncoding::Raw,
2826 ttl_ms: 200,
2827 bypass_transport: BypassTransport::Broadcast,
2828 },
2829 priority: MessagePriority::High,
2830 });
2831
2832 assert!(table.has_collection("telemetry"));
2833 assert!(table.has_collection("position"));
2834 assert!(!table.has_collection("unknown"));
2835
2836 let telemetry = table.get("telemetry").unwrap();
2837 assert!(matches!(
2838 telemetry.route,
2839 CollectionTransportRoute::Fixed {
2840 transport_type: TransportType::BluetoothLE
2841 }
2842 ));
2843 assert_eq!(telemetry.priority, MessagePriority::Normal);
2844
2845 let position = table.get("position").unwrap();
2846 assert_eq!(position.priority, MessagePriority::High);
2847 }
2848
2849 #[test]
2850 fn test_route_table_is_bypass() {
2851 let table = CollectionRouteTable::new()
2852 .with_collection(CollectionRouteConfig {
2853 collection: "bypass_col".to_string(),
2854 route: CollectionTransportRoute::Bypass {
2855 encoding: MessageEncoding::Raw,
2856 ttl_ms: 100,
2857 bypass_transport: BypassTransport::Unicast,
2858 },
2859 priority: MessagePriority::Normal,
2860 })
2861 .with_collection(CollectionRouteConfig {
2862 collection: "fixed_col".to_string(),
2863 route: CollectionTransportRoute::Fixed {
2864 transport_type: TransportType::Quic,
2865 },
2866 priority: MessagePriority::Normal,
2867 })
2868 .with_collection(CollectionRouteConfig {
2869 collection: "pace_col".to_string(),
2870 route: CollectionTransportRoute::Pace {
2871 policy_override: None,
2872 },
2873 priority: MessagePriority::Normal,
2874 });
2875
2876 assert!(table.is_bypass("bypass_col"));
2877 assert!(!table.is_bypass("fixed_col"));
2878 assert!(!table.is_bypass("pace_col"));
2879 assert!(!table.is_bypass("nonexistent"));
2880 }
2881
2882 #[test]
2887 fn test_serde_fixed_route() {
2888 let route = CollectionTransportRoute::Fixed {
2889 transport_type: TransportType::BluetoothLE,
2890 };
2891 let json = serde_json::to_string(&route).unwrap();
2892 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2893 assert!(matches!(
2894 roundtrip,
2895 CollectionTransportRoute::Fixed {
2896 transport_type: TransportType::BluetoothLE
2897 }
2898 ));
2899 }
2900
2901 #[test]
2902 fn test_serde_bypass_route() {
2903 let route = CollectionTransportRoute::Bypass {
2904 encoding: MessageEncoding::Raw,
2905 ttl_ms: 500,
2906 bypass_transport: BypassTransport::Broadcast,
2907 };
2908 let json = serde_json::to_string(&route).unwrap();
2909 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2910 if let CollectionTransportRoute::Bypass {
2911 encoding,
2912 ttl_ms,
2913 bypass_transport,
2914 } = roundtrip
2915 {
2916 assert_eq!(encoding, MessageEncoding::Raw);
2917 assert_eq!(ttl_ms, 500);
2918 assert_eq!(bypass_transport, BypassTransport::Broadcast);
2919 } else {
2920 panic!("Expected Bypass variant");
2921 }
2922 }
2923
2924 #[test]
2925 fn test_serde_pace_route() {
2926 let route = CollectionTransportRoute::Pace {
2927 policy_override: None,
2928 };
2929 let json = serde_json::to_string(&route).unwrap();
2930 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2931 assert!(matches!(
2932 roundtrip,
2933 CollectionTransportRoute::Pace {
2934 policy_override: None
2935 }
2936 ));
2937 }
2938
2939 #[test]
2940 fn test_serde_pace_route_with_policy() {
2941 let policy = TransportPolicy::new("custom")
2942 .primary(vec!["ble-hci0"])
2943 .alternate(vec!["iroh-wlan0"]);
2944 let route = CollectionTransportRoute::Pace {
2945 policy_override: Some(policy),
2946 };
2947 let json = serde_json::to_string(&route).unwrap();
2948 let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
2949 if let CollectionTransportRoute::Pace {
2950 policy_override: Some(p),
2951 } = roundtrip
2952 {
2953 assert_eq!(p.name, "custom");
2954 assert_eq!(p.primary, vec!["ble-hci0"]);
2955 assert_eq!(p.alternate, vec!["iroh-wlan0"]);
2956 } else {
2957 panic!("Expected Pace with policy_override");
2958 }
2959 }
2960
2961 #[test]
2962 fn test_serde_collection_route_config() {
2963 let config = CollectionRouteConfig {
2964 collection: "sensors".to_string(),
2965 route: CollectionTransportRoute::Fixed {
2966 transport_type: TransportType::LoRa,
2967 },
2968 priority: MessagePriority::High,
2969 };
2970 let json = serde_json::to_string(&config).unwrap();
2971 let roundtrip: CollectionRouteConfig = serde_json::from_str(&json).unwrap();
2972 assert_eq!(roundtrip.collection, "sensors");
2973 assert_eq!(roundtrip.priority, MessagePriority::High);
2974 }
2975
2976 #[test]
2977 fn test_serde_collection_route_table() {
2978 let table = CollectionRouteTable::new()
2979 .with_collection(CollectionRouteConfig {
2980 collection: "a".to_string(),
2981 route: CollectionTransportRoute::Fixed {
2982 transport_type: TransportType::Quic,
2983 },
2984 priority: MessagePriority::Normal,
2985 })
2986 .with_collection(CollectionRouteConfig {
2987 collection: "b".to_string(),
2988 route: CollectionTransportRoute::Bypass {
2989 encoding: MessageEncoding::Json,
2990 ttl_ms: 1000,
2991 bypass_transport: BypassTransport::Unicast,
2992 },
2993 priority: MessagePriority::Critical,
2994 });
2995
2996 let json = serde_json::to_string(&table).unwrap();
2997 let roundtrip: CollectionRouteTable = serde_json::from_str(&json).unwrap();
2998 assert!(roundtrip.has_collection("a"));
2999 assert!(roundtrip.has_collection("b"));
3000 assert!(roundtrip.is_bypass("b"));
3001 assert!(!roundtrip.is_bypass("a"));
3002 }
3003
3004 #[test]
3005 fn test_serde_transport_type() {
3006 let types = vec![
3008 TransportType::Quic,
3009 TransportType::BluetoothClassic,
3010 TransportType::BluetoothLE,
3011 TransportType::WifiDirect,
3012 TransportType::LoRa,
3013 TransportType::TacticalRadio,
3014 TransportType::Satellite,
3015 TransportType::Custom(42),
3016 ];
3017 for tt in types {
3018 let json = serde_json::to_string(&tt).unwrap();
3019 let roundtrip: TransportType = serde_json::from_str(&json).unwrap();
3020 assert_eq!(roundtrip, tt, "Failed round-trip for {:?}", tt);
3021 }
3022 }
3023
3024 #[test]
3029 fn test_route_collection_unknown_falls_through() {
3030 let config = TransportManagerConfig::default();
3031 let mut manager = TransportManager::new(config);
3032
3033 let peer = NodeId::new("peer-1".to_string());
3034 let quic =
3035 Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3036 manager.register(quic);
3037
3038 let requirements = MessageRequirements::default();
3039 let decision = manager.route_collection("unknown", &peer, &requirements);
3041 assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
3042 }
3043
3044 #[test]
3045 fn test_route_collection_fixed_routes_correctly() {
3046 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3047 collection: "telemetry".to_string(),
3048 route: CollectionTransportRoute::Fixed {
3049 transport_type: TransportType::BluetoothLE,
3050 },
3051 priority: MessagePriority::Normal,
3052 });
3053
3054 let config = TransportManagerConfig {
3055 collection_routes: table,
3056 ..Default::default()
3057 };
3058 let mut manager = TransportManager::new(config);
3059
3060 let peer = NodeId::new("peer-1".to_string());
3061 let ble = Arc::new(
3062 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3063 );
3064 manager.register(ble);
3065
3066 let requirements = MessageRequirements::default();
3067 let decision = manager.route_collection("telemetry", &peer, &requirements);
3068 assert_eq!(
3069 decision,
3070 RouteDecision::Transport(TransportType::BluetoothLE)
3071 );
3072 }
3073
3074 #[test]
3075 fn test_route_collection_fixed_unavailable_returns_no_route() {
3076 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3077 collection: "telemetry".to_string(),
3078 route: CollectionTransportRoute::Fixed {
3079 transport_type: TransportType::BluetoothLE,
3080 },
3081 priority: MessagePriority::Normal,
3082 });
3083
3084 let config = TransportManagerConfig {
3085 collection_routes: table,
3086 ..Default::default()
3087 };
3088 let mut manager = TransportManager::new(config);
3089
3090 let peer = NodeId::new("peer-1".to_string());
3091
3092 let ble = Arc::new(
3094 MockTransport::new(TransportCapabilities::bluetooth_le())
3095 .with_peer(peer.clone())
3096 .unavailable(),
3097 );
3098 manager.register(ble);
3099
3100 let requirements = MessageRequirements::default();
3101 let decision = manager.route_collection("telemetry", &peer, &requirements);
3102 assert_eq!(decision, RouteDecision::NoRoute);
3103 }
3104
3105 #[test]
3106 fn test_route_collection_fixed_not_registered_returns_no_route() {
3107 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3108 collection: "telemetry".to_string(),
3109 route: CollectionTransportRoute::Fixed {
3110 transport_type: TransportType::BluetoothLE,
3111 },
3112 priority: MessagePriority::Normal,
3113 });
3114
3115 let config = TransportManagerConfig {
3116 collection_routes: table,
3117 ..Default::default()
3118 };
3119 let manager = TransportManager::new(config);
3120
3121 let peer = NodeId::new("peer-1".to_string());
3122 let requirements = MessageRequirements::default();
3124 let decision = manager.route_collection("telemetry", &peer, &requirements);
3125 assert_eq!(decision, RouteDecision::NoRoute);
3126 }
3127
3128 #[tokio::test]
3129 async fn test_route_collection_bypass_with_channel() {
3130 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3131 collection: "position".to_string(),
3132 route: CollectionTransportRoute::Bypass {
3133 encoding: MessageEncoding::Raw,
3134 ttl_ms: 200,
3135 bypass_transport: BypassTransport::Broadcast,
3136 },
3137 priority: MessagePriority::High,
3138 });
3139
3140 let config = TransportManagerConfig {
3141 collection_routes: table,
3142 ..Default::default()
3143 };
3144 let mut manager = TransportManager::new(config);
3145
3146 let bypass_config = BypassChannelConfig::new();
3148 let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
3149 manager.set_bypass_channel(bypass);
3150
3151 let peer = NodeId::new("peer-1".to_string());
3152 let requirements = MessageRequirements::default();
3153 let decision = manager.route_collection("position", &peer, &requirements);
3154 assert_eq!(decision, RouteDecision::Bypass);
3155 }
3156
3157 #[test]
3158 fn test_route_collection_bypass_without_channel() {
3159 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3160 collection: "position".to_string(),
3161 route: CollectionTransportRoute::Bypass {
3162 encoding: MessageEncoding::Raw,
3163 ttl_ms: 200,
3164 bypass_transport: BypassTransport::Broadcast,
3165 },
3166 priority: MessagePriority::High,
3167 });
3168
3169 let config = TransportManagerConfig {
3170 collection_routes: table,
3171 ..Default::default()
3172 };
3173 let manager = TransportManager::new(config);
3174
3175 let peer = NodeId::new("peer-1".to_string());
3176 let requirements = MessageRequirements::default();
3177 let decision = manager.route_collection("position", &peer, &requirements);
3179 assert_eq!(decision, RouteDecision::NoRoute);
3180 }
3181
3182 #[test]
3183 fn test_route_collection_pace_routes_correctly() {
3184 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3185
3186 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3187 collection: "sync_data".to_string(),
3188 route: CollectionTransportRoute::Pace {
3189 policy_override: None,
3190 },
3191 priority: MessagePriority::Normal,
3192 });
3193
3194 let config = TransportManagerConfig {
3195 collection_routes: table,
3196 default_policy: Some(policy),
3197 ..Default::default()
3198 };
3199 let manager = TransportManager::new(config);
3200
3201 let peer = NodeId::new("peer-1".to_string());
3202
3203 let inst = TransportInstance::new(
3205 "iroh-eth0",
3206 TransportType::Quic,
3207 TransportCapabilities::quic(),
3208 );
3209 let t = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
3210 manager.register_instance(inst, t);
3211
3212 let requirements = MessageRequirements::default();
3213 let decision = manager.route_collection("sync_data", &peer, &requirements);
3214 assert_eq!(
3215 decision,
3216 RouteDecision::TransportInstance("iroh-eth0".to_string())
3217 );
3218 }
3219
3220 #[test]
3221 fn test_route_collection_pace_no_available_returns_no_route() {
3222 let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
3223
3224 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3225 collection: "sync_data".to_string(),
3226 route: CollectionTransportRoute::Pace {
3227 policy_override: None,
3228 },
3229 priority: MessagePriority::Normal,
3230 });
3231
3232 let config = TransportManagerConfig {
3233 collection_routes: table,
3234 default_policy: Some(policy),
3235 ..Default::default()
3236 };
3237 let manager = TransportManager::new(config);
3238
3239 let peer = NodeId::new("peer-1".to_string());
3240 let requirements = MessageRequirements::default();
3242 let decision = manager.route_collection("sync_data", &peer, &requirements);
3243 assert_eq!(decision, RouteDecision::NoRoute);
3244 }
3245
3246 #[test]
3247 fn test_route_collection_pace_with_policy_override() {
3248 let default_policy = TransportPolicy::new("default").primary(vec!["nonexistent"]);
3250
3251 let override_policy = TransportPolicy::new("override").primary(vec!["ble-hci0"]);
3253
3254 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3255 collection: "ble_data".to_string(),
3256 route: CollectionTransportRoute::Pace {
3257 policy_override: Some(override_policy),
3258 },
3259 priority: MessagePriority::Normal,
3260 });
3261
3262 let config = TransportManagerConfig {
3263 collection_routes: table,
3264 default_policy: Some(default_policy),
3265 ..Default::default()
3266 };
3267 let manager = TransportManager::new(config);
3268
3269 let peer = NodeId::new("peer-1".to_string());
3270 let inst = TransportInstance::new(
3271 "ble-hci0",
3272 TransportType::BluetoothLE,
3273 TransportCapabilities::bluetooth_le(),
3274 );
3275 let t = Arc::new(
3276 MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
3277 );
3278 manager.register_instance(inst, t);
3279
3280 let requirements = MessageRequirements::default();
3281 let decision = manager.route_collection("ble_data", &peer, &requirements);
3282 assert_eq!(
3283 decision,
3284 RouteDecision::TransportInstance("ble-hci0".to_string())
3285 );
3286 }
3287
3288 #[test]
3293 fn test_route_decision_transport_instance_variant() {
3294 let decision = RouteDecision::TransportInstance("iroh-eth0".to_string());
3295 assert_eq!(
3296 decision,
3297 RouteDecision::TransportInstance("iroh-eth0".to_string())
3298 );
3299 assert_ne!(decision, RouteDecision::Bypass);
3300 assert_ne!(decision, RouteDecision::NoRoute);
3301 assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
3302 }
3303
3304 #[test]
3305 fn test_route_decision_transport_instance_debug() {
3306 let decision = RouteDecision::TransportInstance("ble-hci0".to_string());
3307 let debug = format!("{:?}", decision);
3308 assert!(debug.contains("TransportInstance"));
3309 assert!(debug.contains("ble-hci0"));
3310 }
3311
3312 #[test]
3313 fn test_route_decision_transport_instance_clone() {
3314 let original = RouteDecision::TransportInstance("iroh-wlan0".to_string());
3315 let cloned = original.clone();
3316 assert_eq!(original, cloned);
3317 }
3318
3319 #[test]
3324 fn test_get_collection_route_found() {
3325 let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
3326 collection: "telemetry".to_string(),
3327 route: CollectionTransportRoute::Fixed {
3328 transport_type: TransportType::Quic,
3329 },
3330 priority: MessagePriority::High,
3331 });
3332
3333 let config = TransportManagerConfig {
3334 collection_routes: table,
3335 ..Default::default()
3336 };
3337 let manager = TransportManager::new(config);
3338
3339 let route = manager.get_collection_route("telemetry");
3340 assert!(route.is_some());
3341 assert_eq!(route.unwrap().collection, "telemetry");
3342 assert_eq!(route.unwrap().priority, MessagePriority::High);
3343 }
3344
3345 #[test]
3346 fn test_get_collection_route_not_found() {
3347 let config = TransportManagerConfig::default();
3348 let manager = TransportManager::new(config);
3349
3350 assert!(manager.get_collection_route("nonexistent").is_none());
3351 }
3352}