use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use super::bypass::{
BypassMessage, BypassTarget, BypassTransport, MessageEncoding, UdpBypassChannel,
};
use super::capabilities::{
MessagePriority, MessageRequirements, PaceLevel, PeerDistance, RangeMode, Transport,
TransportId, TransportInstance, TransportMode, TransportPolicy, TransportType,
};
use super::{NodeId, Result, TransportError};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tokio::sync::broadcast;
use tokio::sync::RwLock as TokioRwLock;
type TransportInstanceMap = HashMap<TransportId, (TransportInstance, Arc<dyn Transport>)>;
#[derive(Debug, Clone)]
pub struct TransportManagerConfig {
pub preference_order: Vec<TransportType>,
pub enable_fallback: bool,
pub cache_peer_transport: bool,
pub switch_threshold: i32,
pub default_policy: Option<TransportPolicy>,
pub transport_mode: TransportMode,
pub collection_routes: CollectionRouteTable,
}
impl Default for TransportManagerConfig {
fn default() -> Self {
Self {
preference_order: vec![
TransportType::Quic,
TransportType::WifiDirect,
TransportType::BluetoothLE,
TransportType::LoRa,
],
enable_fallback: true,
cache_peer_transport: true,
switch_threshold: 10,
default_policy: None,
transport_mode: TransportMode::Single,
collection_routes: CollectionRouteTable::default(),
}
}
}
impl TransportManagerConfig {
pub fn with_policy(policy: TransportPolicy) -> Self {
Self {
default_policy: Some(policy),
..Default::default()
}
}
pub fn with_mode(mut self, mode: TransportMode) -> Self {
self.transport_mode = mode;
self
}
}
pub struct TransportManager {
transports: HashMap<TransportType, Arc<dyn Transport>>,
transport_instances: RwLock<TransportInstanceMap>,
peer_transports: RwLock<HashMap<NodeId, TransportType>>,
peer_transport_ids: RwLock<HashMap<NodeId, TransportId>>,
peer_distances: RwLock<HashMap<NodeId, PeerDistance>>,
config: TransportManagerConfig,
bypass_channel: Option<Arc<TokioRwLock<UdpBypassChannel>>>,
}
impl TransportManager {
pub fn new(config: TransportManagerConfig) -> Self {
Self {
transports: HashMap::new(),
transport_instances: RwLock::new(HashMap::new()),
peer_transports: RwLock::new(HashMap::new()),
peer_transport_ids: RwLock::new(HashMap::new()),
peer_distances: RwLock::new(HashMap::new()),
config,
bypass_channel: None,
}
}
pub fn with_bypass(config: TransportManagerConfig, bypass: UdpBypassChannel) -> Self {
Self {
transports: HashMap::new(),
transport_instances: RwLock::new(HashMap::new()),
peer_transports: RwLock::new(HashMap::new()),
peer_transport_ids: RwLock::new(HashMap::new()),
peer_distances: RwLock::new(HashMap::new()),
config,
bypass_channel: Some(Arc::new(TokioRwLock::new(bypass))),
}
}
pub fn set_bypass_channel(&mut self, bypass: UdpBypassChannel) {
self.bypass_channel = Some(Arc::new(TokioRwLock::new(bypass)));
}
pub fn has_bypass_channel(&self) -> bool {
self.bypass_channel.is_some()
}
pub async fn is_bypass_collection(&self, collection: &str) -> bool {
if let Some(ref bypass) = self.bypass_channel {
bypass.read().await.is_bypass_collection(collection)
} else {
false
}
}
pub fn register(&mut self, transport: Arc<dyn Transport>) {
let transport_type = transport.capabilities().transport_type;
self.transports.insert(transport_type, transport);
}
pub fn unregister(&mut self, transport_type: TransportType) -> Option<Arc<dyn Transport>> {
self.transports.remove(&transport_type)
}
pub fn get_transport(&self, transport_type: TransportType) -> Option<&Arc<dyn Transport>> {
self.transports.get(&transport_type)
}
pub fn registered_transports(&self) -> Vec<TransportType> {
self.transports.keys().copied().collect()
}
pub fn available_transports(&self, peer_id: &NodeId) -> Vec<TransportType> {
self.transports
.iter()
.filter(|(_, t)| t.is_available() && t.can_reach(peer_id))
.map(|(tt, _)| *tt)
.collect()
}
pub fn register_instance(&self, instance: TransportInstance, transport: Arc<dyn Transport>) {
let id = instance.id.clone();
self.transport_instances
.write()
.unwrap()
.insert(id, (instance, transport));
}
pub fn unregister_instance(
&self,
id: &TransportId,
) -> Option<(TransportInstance, Arc<dyn Transport>)> {
self.transport_instances
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(id)
}
pub fn get_instance(&self, id: &TransportId) -> Option<Arc<dyn Transport>> {
self.transport_instances
.read()
.unwrap()
.get(id)
.map(|(_, t)| Arc::clone(t))
}
pub fn registered_instance_ids(&self) -> Vec<TransportId> {
self.transport_instances
.read()
.unwrap()
.keys()
.cloned()
.collect()
}
pub fn available_instance_ids(&self) -> HashSet<TransportId> {
self.transport_instances
.read()
.unwrap()
.iter()
.filter(|(_, (inst, transport))| inst.available && transport.is_available())
.map(|(id, _)| id.clone())
.collect()
}
pub fn available_instances_for_peer(&self, peer_id: &NodeId) -> Vec<TransportId> {
self.transport_instances
.read()
.unwrap()
.iter()
.filter(|(_, (inst, transport))| {
inst.available && transport.is_available() && transport.can_reach(peer_id)
})
.map(|(id, _)| id.clone())
.collect()
}
pub fn current_pace_level(&self) -> PaceLevel {
match &self.config.default_policy {
Some(policy) => policy.current_level(&self.available_instance_ids()),
None => {
if !self.available_instance_ids().is_empty() {
PaceLevel::Primary
} else {
PaceLevel::None
}
}
}
}
pub fn select_transports_pace(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> Vec<TransportId> {
let policy = match &self.config.default_policy {
Some(p) => p,
None => return Vec::new(), };
let instances = self
.transport_instances
.read()
.unwrap_or_else(|e| e.into_inner());
let available_for_peer: HashSet<_> = instances
.iter()
.filter(|(_, (inst, transport))| {
inst.available
&& transport.is_available()
&& transport.can_reach(peer_id)
&& transport.capabilities().meets_requirements(requirements)
})
.map(|(id, _)| id.clone())
.collect();
let candidates: Vec<TransportId> = policy
.ordered()
.filter(|id| available_for_peer.contains(*id))
.cloned()
.collect();
match &self.config.transport_mode {
TransportMode::Single => candidates.into_iter().take(1).collect(),
TransportMode::Redundant {
min_paths,
max_paths,
} => {
let min = *min_paths as usize;
let max = max_paths.map(|m| m as usize).unwrap_or(candidates.len());
candidates.into_iter().take(max.max(min)).collect()
}
TransportMode::Bonded => candidates, TransportMode::LoadBalanced { .. } => candidates, }
}
pub fn select_transport_pace(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> Option<TransportId> {
self.select_transports_pace(peer_id, requirements)
.into_iter()
.next()
}
pub fn record_success_pace(&self, peer_id: &NodeId, transport_id: TransportId) {
if self.config.cache_peer_transport {
self.peer_transport_ids
.write()
.unwrap()
.insert(peer_id.clone(), transport_id);
}
}
pub fn clear_cache_pace(&self, peer_id: &NodeId) {
self.peer_transport_ids
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(peer_id);
}
pub fn select_transport(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> Option<TransportType> {
if self.config.cache_peer_transport {
if let Some(&cached) = self
.peer_transports
.read()
.unwrap_or_else(|e| e.into_inner())
.get(peer_id)
{
if let Some(transport) = self.transports.get(&cached) {
if transport.is_available()
&& transport.can_reach(peer_id)
&& transport.capabilities().meets_requirements(requirements)
{
return Some(cached);
}
}
}
}
let candidates: Vec<_> = self
.available_transports(peer_id)
.into_iter()
.filter_map(|tt| {
let transport = self.transports.get(&tt)?;
let caps = transport.capabilities();
if !caps.meets_requirements(requirements) {
return None;
}
if let Some(max_latency) = requirements.max_latency_ms {
let est_delivery = transport.estimate_delivery_ms(requirements.message_size);
if est_delivery > max_latency {
return None;
}
}
let preference_bonus = self
.config
.preference_order
.iter()
.position(|&t| t == tt)
.map(|idx| 20 - (idx as i32 * 5))
.unwrap_or(0);
let score = transport.calculate_score(requirements, preference_bonus);
Some((tt, score))
})
.collect();
candidates
.into_iter()
.max_by_key(|(_, score)| *score)
.map(|(tt, _)| tt)
}
pub fn select_transport_for_distance(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> Option<(TransportType, Option<RangeMode>)> {
let transport_type = self.select_transport(peer_id, requirements)?;
let distance = self
.peer_distances
.read()
.unwrap()
.get(peer_id)
.map(|d| d.distance_meters);
let range_mode = if let Some(_dist) = distance {
None } else {
None
};
Some((transport_type, range_mode))
}
pub fn record_success(&self, peer_id: &NodeId, transport_type: TransportType) {
if self.config.cache_peer_transport {
self.peer_transports
.write()
.unwrap()
.insert(peer_id.clone(), transport_type);
}
}
pub fn clear_cache(&self, peer_id: &NodeId) {
self.peer_transports
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(peer_id);
}
pub fn update_peer_distance(&self, distance: PeerDistance) {
self.peer_distances
.write()
.unwrap()
.insert(distance.peer_id.clone(), distance);
}
pub fn get_peer_distance(&self, peer_id: &NodeId) -> Option<PeerDistance> {
self.peer_distances
.read()
.unwrap_or_else(|e| e.into_inner())
.get(peer_id)
.cloned()
}
pub async fn connect(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
let transport_type = self
.select_transport(peer_id, requirements)
.ok_or_else(|| {
TransportError::PeerNotFound(format!("No suitable transport for {}", peer_id))
})?;
let transport = self
.transports
.get(&transport_type)
.ok_or(TransportError::NotStarted)?;
let connection = transport.connect(peer_id).await?;
self.record_success(peer_id, transport_type);
Ok((transport_type, connection))
}
pub async fn connect_with_fallback(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> Result<(TransportType, Box<dyn super::MeshConnection>)> {
let candidates: Vec<_> = self
.available_transports(peer_id)
.into_iter()
.filter_map(|tt| {
let transport = self.transports.get(&tt)?;
if !transport.capabilities().meets_requirements(requirements) {
return None;
}
let preference_bonus = self
.config
.preference_order
.iter()
.position(|&t| t == tt)
.map(|idx| 20 - (idx as i32 * 5))
.unwrap_or(0);
let score = transport.calculate_score(requirements, preference_bonus);
Some((tt, score))
})
.collect();
let mut sorted: Vec<_> = candidates;
sorted.sort_by(|a, b| b.1.cmp(&a.1));
if sorted.is_empty() {
return Err(TransportError::PeerNotFound(format!(
"No suitable transport for {}",
peer_id
)));
}
let mut last_error = None;
for (transport_type, _) in sorted {
let transport = match self.transports.get(&transport_type) {
Some(t) => t,
None => continue,
};
match transport.connect(peer_id).await {
Ok(conn) => {
self.record_success(peer_id, transport_type);
return Ok((transport_type, conn));
}
Err(e) => {
if !self.config.enable_fallback {
return Err(e);
}
last_error = Some(e);
self.clear_cache(peer_id);
}
}
}
Err(last_error.unwrap_or_else(|| {
TransportError::PeerNotFound(format!("All transports failed for {}", peer_id))
}))
}
pub async fn send_bypass(
&self,
collection: &str,
data: &[u8],
target: Option<SocketAddr>,
) -> Result<()> {
let bypass = self
.bypass_channel
.as_ref()
.ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
bypass
.read()
.await
.send_to_collection(collection, target, data)
.await
.map_err(|e| TransportError::Other(e.to_string().into()))
}
pub async fn send_bypass_to(
&self,
target: BypassTarget,
collection: &str,
data: &[u8],
) -> Result<()> {
let bypass = self
.bypass_channel
.as_ref()
.ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
bypass
.read()
.await
.send(target, collection, data)
.await
.map_err(|e| TransportError::Other(e.to_string().into()))
}
pub async fn subscribe_bypass(&self) -> Result<broadcast::Receiver<BypassMessage>> {
let bypass = self
.bypass_channel
.as_ref()
.ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
Ok(bypass.read().await.subscribe())
}
pub async fn subscribe_bypass_collection(
&self,
collection: &str,
) -> Result<(u32, broadcast::Receiver<BypassMessage>)> {
let bypass = self
.bypass_channel
.as_ref()
.ok_or_else(|| TransportError::Other("Bypass channel not configured".into()))?;
Ok(bypass.read().await.subscribe_collection(collection))
}
pub fn route_collection(
&self,
collection: &str,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> RouteDecision {
let route_config = match self.config.collection_routes.get(collection) {
Some(config) => config,
None => return self.route_message(peer_id, requirements),
};
match &route_config.route {
CollectionTransportRoute::Bypass { .. } => {
if self.bypass_channel.is_some() {
RouteDecision::Bypass
} else {
RouteDecision::NoRoute
}
}
CollectionTransportRoute::Fixed { transport_type } => {
if let Some(transport) = self.transports.get(transport_type) {
if transport.is_available() && transport.can_reach(peer_id) {
RouteDecision::Transport(*transport_type)
} else {
RouteDecision::NoRoute
}
} else {
RouteDecision::NoRoute
}
}
CollectionTransportRoute::Pace { policy_override } => {
match self.select_transport_pace_with_policy(
peer_id,
requirements,
policy_override.as_ref(),
) {
Some(id) => RouteDecision::TransportInstance(id),
None => RouteDecision::NoRoute,
}
}
}
}
fn select_transport_pace_with_policy(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
policy_override: Option<&TransportPolicy>,
) -> Option<TransportId> {
let policy = policy_override.or(self.config.default_policy.as_ref())?;
let instances = self
.transport_instances
.read()
.unwrap_or_else(|e| e.into_inner());
let available_for_peer: HashSet<_> = instances
.iter()
.filter(|(_, (inst, transport))| {
inst.available
&& transport.is_available()
&& transport.can_reach(peer_id)
&& transport.capabilities().meets_requirements(requirements)
})
.map(|(id, _)| id.clone())
.collect();
policy
.ordered()
.find(|id| available_for_peer.contains(*id))
.cloned()
}
pub fn get_collection_route(&self, collection: &str) -> Option<&CollectionRouteConfig> {
self.config.collection_routes.get(collection)
}
pub fn route_message(
&self,
peer_id: &NodeId,
requirements: &MessageRequirements,
) -> RouteDecision {
if requirements.bypass_sync && self.bypass_channel.is_some() {
return RouteDecision::Bypass;
}
match self.select_transport(peer_id, requirements) {
Some(transport_type) => RouteDecision::Transport(transport_type),
None => RouteDecision::NoRoute,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RouteDecision {
Bypass,
Transport(TransportType),
TransportInstance(TransportId),
NoRoute,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "transport", rename_all = "snake_case")]
pub enum CollectionTransportRoute {
Fixed { transport_type: TransportType },
Bypass {
encoding: MessageEncoding,
ttl_ms: u64,
bypass_transport: BypassTransport,
},
Pace {
policy_override: Option<TransportPolicy>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionRouteConfig {
pub collection: String,
pub route: CollectionTransportRoute,
pub priority: MessagePriority,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CollectionRouteTable {
collections: Vec<CollectionRouteConfig>,
}
impl CollectionRouteTable {
pub fn new() -> Self {
Self::default()
}
pub fn with_collection(mut self, config: CollectionRouteConfig) -> Self {
self.collections.push(config);
self
}
pub fn get(&self, collection: &str) -> Option<&CollectionRouteConfig> {
self.collections.iter().find(|c| c.collection == collection)
}
pub fn has_collection(&self, collection: &str) -> bool {
self.collections.iter().any(|c| c.collection == collection)
}
pub fn is_bypass(&self, collection: &str) -> bool {
self.get(collection)
.map(|c| matches!(c.route, CollectionTransportRoute::Bypass { .. }))
.unwrap_or(false)
}
}
impl std::fmt::Debug for TransportManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransportManager")
.field("transports", &self.transports.keys().collect::<Vec<_>>())
.field("config", &self.config)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::bypass::{BypassChannelConfig, UdpBypassChannel};
use crate::transport::capabilities::{MessagePriority, TransportCapabilities};
use crate::transport::{MeshConnection, MeshTransport, PeerEventReceiver};
use async_trait::async_trait;
use std::time::Instant;
use tokio::sync::mpsc;
struct MockTransport {
caps: TransportCapabilities,
available: bool,
reachable_peers: Vec<NodeId>,
signal: Option<u8>,
}
impl MockTransport {
fn new(caps: TransportCapabilities) -> Self {
Self {
caps,
available: true,
reachable_peers: vec![],
signal: None,
}
}
fn with_peer(mut self, peer: NodeId) -> Self {
self.reachable_peers.push(peer);
self
}
#[allow(dead_code)]
fn with_signal(mut self, signal: u8) -> Self {
self.signal = Some(signal);
self
}
fn unavailable(mut self) -> Self {
self.available = false;
self
}
}
struct MockConnection {
peer_id: NodeId,
connected_at: Instant,
}
impl MeshConnection for MockConnection {
fn peer_id(&self) -> &NodeId {
&self.peer_id
}
fn is_alive(&self) -> bool {
true
}
fn connected_at(&self) -> Instant {
self.connected_at
}
}
#[async_trait]
impl MeshTransport for MockTransport {
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>> {
if self.reachable_peers.contains(peer_id) {
Ok(Box::new(MockConnection {
peer_id: peer_id.clone(),
connected_at: Instant::now(),
}))
} else {
Err(TransportError::PeerNotFound(peer_id.to_string()))
}
}
async fn disconnect(&self, _peer_id: &NodeId) -> Result<()> {
Ok(())
}
fn get_connection(&self, _peer_id: &NodeId) -> Option<Box<dyn MeshConnection>> {
None
}
fn peer_count(&self) -> usize {
0
}
fn connected_peers(&self) -> Vec<NodeId> {
vec![]
}
fn subscribe_peer_events(&self) -> PeerEventReceiver {
let (_tx, rx) = mpsc::channel(1);
rx
}
}
impl Transport for MockTransport {
fn capabilities(&self) -> &TransportCapabilities {
&self.caps
}
fn is_available(&self) -> bool {
self.available
}
fn signal_quality(&self) -> Option<u8> {
self.signal
}
fn can_reach(&self, peer_id: &NodeId) -> bool {
self.reachable_peers.contains(peer_id)
}
}
#[test]
fn test_register_transport() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register(transport);
assert!(manager.get_transport(TransportType::Quic).is_some());
assert!(manager.get_transport(TransportType::LoRa).is_none());
}
#[test]
fn test_unregister_transport() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register(transport);
let removed = manager.unregister(TransportType::Quic);
assert!(removed.is_some());
assert!(manager.get_transport(TransportType::Quic).is_none());
}
#[test]
fn test_available_transports() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
manager.register(ble);
let lora = Arc::new(
MockTransport::new(TransportCapabilities::lora(7))
.unavailable()
.with_peer(peer.clone()),
);
manager.register(lora);
let available = manager.available_transports(&peer);
assert_eq!(available.len(), 1);
assert!(available.contains(&TransportType::Quic));
}
#[test]
fn test_select_transport_by_reliability() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let lora =
Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
manager.register(lora);
let requirements = MessageRequirements {
reliable: true,
..Default::default()
};
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::Quic));
}
#[test]
fn test_select_transport_by_preference() {
let config = TransportManagerConfig {
preference_order: vec![TransportType::BluetoothLE, TransportType::Quic],
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
let requirements = MessageRequirements::default();
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::BluetoothLE));
}
#[test]
fn test_select_transport_by_latency() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let mut lora_caps = TransportCapabilities::lora(7);
lora_caps.reliable = true; let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
manager.register(lora);
let requirements = MessageRequirements {
priority: MessagePriority::High,
reliable: true,
..Default::default()
};
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::Quic));
}
#[test]
fn test_select_transport_with_latency_requirement() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let mut lora_caps = TransportCapabilities::lora(12);
lora_caps.reliable = true;
let lora = Arc::new(MockTransport::new(lora_caps).with_peer(peer.clone()));
manager.register(lora);
let requirements = MessageRequirements {
reliable: true,
max_latency_ms: Some(50),
..Default::default()
};
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::Quic));
}
#[test]
fn test_select_transport_no_match() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let lora =
Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
manager.register(lora);
let requirements = MessageRequirements {
reliable: true,
..Default::default()
};
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, None);
}
#[test]
fn test_peer_transport_caching() {
let config = TransportManagerConfig {
cache_peer_transport: true,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
manager.record_success(&peer, TransportType::BluetoothLE);
let requirements = MessageRequirements::default();
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::BluetoothLE));
manager.clear_cache(&peer);
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::Quic));
}
#[test]
fn test_power_sensitive_selection() {
let config = TransportManagerConfig {
preference_order: vec![],
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
let requirements = MessageRequirements {
power_sensitive: true,
..Default::default()
};
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::BluetoothLE));
}
#[tokio::test]
async fn test_connect_selects_transport() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let requirements = MessageRequirements::default();
let result = manager.connect(&peer, &requirements).await;
assert!(result.is_ok());
let (transport_type, conn) = result.unwrap();
assert_eq!(transport_type, TransportType::Quic);
assert_eq!(conn.peer_id(), &peer);
}
#[tokio::test]
async fn test_connect_with_fallback() {
let config = TransportManagerConfig {
enable_fallback: true,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register(quic);
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
let requirements = MessageRequirements::default();
let result = manager.connect_with_fallback(&peer, &requirements).await;
assert!(result.is_ok());
let (transport_type, _) = result.unwrap();
assert_eq!(transport_type, TransportType::BluetoothLE);
}
#[test]
fn test_distance_tracking() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let distance = PeerDistance {
peer_id: peer.clone(),
distance_meters: 500,
source: super::super::capabilities::DistanceSource::Gps {
confidence_meters: 10,
},
last_updated: Instant::now(),
};
manager.update_peer_distance(distance);
let retrieved = manager.get_peer_distance(&peer);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().distance_meters, 500);
}
#[tokio::test]
async fn test_no_bypass_channel_by_default() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
assert!(!manager.has_bypass_channel());
assert!(!manager.is_bypass_collection("test").await);
}
#[test]
fn test_route_message_without_bypass() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let requirements = MessageRequirements::default();
let decision = manager.route_message(&peer, &requirements);
assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
let bypass_req = MessageRequirements {
bypass_sync: true,
max_latency_ms: Some(100), ..Default::default()
};
let decision = manager.route_message(&peer, &bypass_req);
assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
}
#[tokio::test]
async fn test_subscribe_bypass_not_configured() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let result = manager.subscribe_bypass().await;
assert!(result.is_err());
}
#[test]
fn test_route_decision_equality() {
assert_eq!(RouteDecision::Bypass, RouteDecision::Bypass);
assert_eq!(
RouteDecision::Transport(TransportType::Quic),
RouteDecision::Transport(TransportType::Quic)
);
assert_ne!(RouteDecision::Bypass, RouteDecision::NoRoute);
assert_ne!(
RouteDecision::Transport(TransportType::Quic),
RouteDecision::Transport(TransportType::LoRa)
);
}
#[test]
fn test_register_instance() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let instance = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer));
manager.register_instance(instance, transport);
assert!(manager.get_instance(&"iroh-eth0".to_string()).is_some());
assert!(manager.get_instance(&"nonexistent".to_string()).is_none());
}
#[test]
fn test_unregister_instance() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let instance = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register_instance(instance, transport);
let removed = manager.unregister_instance(&"iroh-eth0".to_string());
assert!(removed.is_some());
let (inst, _) = removed.unwrap();
assert_eq!(inst.id, "iroh-eth0");
assert!(manager.get_instance(&"iroh-eth0".to_string()).is_none());
let removed_again = manager.unregister_instance(&"iroh-eth0".to_string());
assert!(removed_again.is_none());
}
#[test]
fn test_registered_instance_ids() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
assert!(manager.registered_instance_ids().is_empty());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let inst2 = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
manager.register_instance(
inst1,
Arc::new(MockTransport::new(TransportCapabilities::quic())),
);
manager.register_instance(
inst2,
Arc::new(MockTransport::new(TransportCapabilities::lora(7))),
);
let ids = manager.registered_instance_ids();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&"iroh-eth0".to_string()));
assert!(ids.contains(&"lora-915".to_string()));
}
#[test]
fn test_available_instance_ids() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport1 = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register_instance(inst1, transport1);
let inst2 = TransportInstance::new(
"lora-off",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)).unavailable());
manager.register_instance(inst2, transport2);
let mut inst3 = TransportInstance::new(
"ble-disabled",
TransportType::BluetoothLE,
TransportCapabilities::bluetooth_le(),
);
inst3.available = false;
let transport3 = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
manager.register_instance(inst3, transport3);
let available = manager.available_instance_ids();
assert_eq!(available.len(), 1);
assert!(available.contains("iroh-eth0"));
}
#[test]
fn test_available_instances_for_peer() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, transport1);
let inst2 = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let transport2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
manager.register_instance(inst2, transport2);
let inst3 = TransportInstance::new(
"ble-off",
TransportType::BluetoothLE,
TransportCapabilities::bluetooth_le(),
);
let transport3 = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le())
.with_peer(peer.clone())
.unavailable(),
);
manager.register_instance(inst3, transport3);
let for_peer = manager.available_instances_for_peer(&peer);
assert_eq!(for_peer.len(), 1);
assert_eq!(for_peer[0], "iroh-eth0");
}
#[test]
fn test_current_pace_level_no_policy_with_available() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let inst = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register_instance(inst, transport);
assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
}
#[test]
fn test_current_pace_level_no_policy_none_available() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
assert_eq!(manager.current_pace_level(), PaceLevel::None);
}
#[test]
fn test_current_pace_level_no_policy_all_unavailable() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let inst = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()).unavailable());
manager.register_instance(inst, transport);
assert_eq!(manager.current_pace_level(), PaceLevel::None);
}
#[test]
fn test_current_pace_level_with_policy_primary() {
let policy = TransportPolicy::new("test")
.primary(vec!["iroh-eth0"])
.alternate(vec!["lora-915"])
.emergency(vec!["ble-mesh"]);
let config = TransportManagerConfig::with_policy(policy);
let manager = TransportManager::new(config);
let inst = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register_instance(inst, transport);
assert_eq!(manager.current_pace_level(), PaceLevel::Primary);
}
#[test]
fn test_current_pace_level_with_policy_alternate() {
let policy = TransportPolicy::new("test")
.primary(vec!["iroh-eth0"])
.alternate(vec!["lora-915"])
.emergency(vec!["ble-mesh"]);
let config = TransportManagerConfig::with_policy(policy);
let manager = TransportManager::new(config);
let inst = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
manager.register_instance(inst, transport);
assert_eq!(manager.current_pace_level(), PaceLevel::Alternate);
}
#[test]
fn test_current_pace_level_with_policy_emergency() {
let policy = TransportPolicy::new("test")
.primary(vec!["iroh-eth0"])
.alternate(vec!["lora-915"])
.emergency(vec!["ble-mesh"]);
let config = TransportManagerConfig::with_policy(policy);
let manager = TransportManager::new(config);
let inst = TransportInstance::new(
"ble-mesh",
TransportType::BluetoothLE,
TransportCapabilities::bluetooth_le(),
);
let transport = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
manager.register_instance(inst, transport);
assert_eq!(manager.current_pace_level(), PaceLevel::Emergency);
}
#[test]
fn test_current_pace_level_with_policy_none_available() {
let policy = TransportPolicy::new("test")
.primary(vec!["iroh-eth0"])
.alternate(vec!["lora-915"]);
let config = TransportManagerConfig::with_policy(policy);
let manager = TransportManager::new(config);
assert_eq!(manager.current_pace_level(), PaceLevel::None);
}
#[test]
fn test_select_transports_pace_no_policy() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert!(selected.is_empty());
}
#[test]
fn test_select_transports_pace_single_mode() {
let policy = TransportPolicy::new("test")
.primary(vec!["iroh-eth0", "iroh-wlan0"])
.alternate(vec!["lora-915"]);
let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"iroh-wlan0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t2 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst2, t2);
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 1);
assert_eq!(selected[0], "iroh-eth0");
}
#[test]
fn test_select_transports_pace_redundant_mode() {
let policy = TransportPolicy::new("test")
.primary(vec!["iroh-eth0", "iroh-wlan0"])
.alternate(vec!["lora-915"]);
let config =
TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(2));
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"iroh-wlan0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t2 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst2, t2);
let inst3 = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let t3 =
Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
manager.register_instance(inst3, t3);
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert!(selected.len() >= 2);
}
#[test]
fn test_select_transports_pace_redundant_bounded() {
let policy = TransportPolicy::new("test").primary(vec!["t1", "t2", "t3", "t4"]);
let config = TransportManagerConfig::with_policy(policy)
.with_mode(TransportMode::redundant_bounded(1, 2));
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
for name in &["t1", "t2", "t3", "t4"] {
let inst =
TransportInstance::new(*name, TransportType::Quic, TransportCapabilities::quic());
let t =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst, t);
}
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_select_transports_pace_bonded_mode() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"iroh-wlan0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t2 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst2, t2);
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_select_transports_pace_load_balanced_mode() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
let config = TransportManagerConfig::with_policy(policy)
.with_mode(TransportMode::LoadBalanced { weights: None });
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"iroh-wlan0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t2 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst2, t2);
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_select_transports_pace_filters_by_requirements() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let t2 =
Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
manager.register_instance(inst2, t2);
let requirements = MessageRequirements {
reliable: true,
..Default::default()
};
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 1);
assert_eq!(selected[0], "iroh-eth0");
}
#[test]
fn test_select_transports_pace_filters_unreachable_peer() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "lora-915"]);
let config = TransportManagerConfig::with_policy(policy);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let t2 = Arc::new(MockTransport::new(TransportCapabilities::lora(7)));
manager.register_instance(inst2, t2);
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 1);
assert_eq!(selected[0], "iroh-eth0");
}
#[test]
fn test_select_transport_pace_returns_first() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0", "iroh-wlan0"]);
let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Bonded);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst1 = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t1 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst1, t1);
let inst2 = TransportInstance::new(
"iroh-wlan0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t2 =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst2, t2);
let requirements = MessageRequirements::default();
let selected = manager.select_transport_pace(&peer, &requirements);
assert_eq!(selected, Some("iroh-eth0".to_string()));
}
#[test]
fn test_select_transport_pace_returns_none_no_policy() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
}
#[test]
fn test_select_transport_pace_returns_none_no_candidates() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
let config = TransportManagerConfig::with_policy(policy);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
assert_eq!(manager.select_transport_pace(&peer, &requirements), None);
}
#[test]
fn test_record_success_pace_caching_enabled() {
let config = TransportManagerConfig {
cache_peer_transport: true,
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
manager.record_success_pace(&peer, "iroh-eth0".to_string());
let cached = manager
.peer_transport_ids
.read()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(cached.get(&peer), Some(&"iroh-eth0".to_string()));
}
#[test]
fn test_record_success_pace_caching_disabled() {
let config = TransportManagerConfig {
cache_peer_transport: false,
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
manager.record_success_pace(&peer, "iroh-eth0".to_string());
let cached = manager
.peer_transport_ids
.read()
.unwrap_or_else(|e| e.into_inner());
assert!(cached.get(&peer).is_none());
}
#[test]
fn test_clear_cache_pace() {
let config = TransportManagerConfig {
cache_peer_transport: true,
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
manager.record_success_pace(&peer, "iroh-eth0".to_string());
assert!(manager
.peer_transport_ids
.read()
.unwrap()
.get(&peer)
.is_some());
manager.clear_cache_pace(&peer);
assert!(manager
.peer_transport_ids
.read()
.unwrap()
.get(&peer)
.is_none());
}
#[test]
fn test_clear_cache_pace_nonexistent_peer() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("nonexistent".to_string());
manager.clear_cache_pace(&peer);
}
#[test]
fn test_select_transport_for_distance_no_distance() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let requirements = MessageRequirements::default();
let result = manager.select_transport_for_distance(&peer, &requirements);
assert!(result.is_some());
let (transport_type, range_mode) = result.unwrap();
assert_eq!(transport_type, TransportType::Quic);
assert!(range_mode.is_none());
}
#[test]
fn test_select_transport_for_distance_with_distance() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let distance = PeerDistance {
peer_id: peer.clone(),
distance_meters: 1000,
source: super::super::capabilities::DistanceSource::Configured,
last_updated: Instant::now(),
};
manager.update_peer_distance(distance);
let requirements = MessageRequirements::default();
let result = manager.select_transport_for_distance(&peer, &requirements);
assert!(result.is_some());
let (transport_type, range_mode) = result.unwrap();
assert_eq!(transport_type, TransportType::Quic);
assert!(range_mode.is_none());
}
#[test]
fn test_select_transport_for_distance_no_suitable_transport() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let result = manager.select_transport_for_distance(&peer, &requirements);
assert!(result.is_none());
}
#[test]
fn test_config_with_policy() {
let policy = TransportPolicy::new("tactical")
.primary(vec!["iroh-eth0"])
.alternate(vec!["lora-915"]);
let config = TransportManagerConfig::with_policy(policy);
assert!(config.default_policy.is_some());
let p = config.default_policy.unwrap();
assert_eq!(p.name, "tactical");
assert_eq!(p.primary.len(), 1);
assert_eq!(p.alternate.len(), 1);
assert!(config.enable_fallback);
assert!(config.cache_peer_transport);
assert_eq!(config.switch_threshold, 10);
assert!(matches!(config.transport_mode, TransportMode::Single));
}
#[test]
fn test_config_with_mode() {
let config = TransportManagerConfig::default().with_mode(TransportMode::Bonded);
assert!(matches!(config.transport_mode, TransportMode::Bonded));
}
#[test]
fn test_config_with_policy_and_mode_chained() {
let policy = TransportPolicy::new("test").primary(vec!["t1"]);
let config =
TransportManagerConfig::with_policy(policy).with_mode(TransportMode::redundant(3));
assert!(config.default_policy.is_some());
assert!(matches!(
config.transport_mode,
TransportMode::Redundant {
min_paths: 3,
max_paths: None
}
));
}
#[tokio::test]
async fn test_connect_no_suitable_transport() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let result = manager.connect(&peer, &requirements).await;
assert!(result.is_err());
match result {
Err(TransportError::PeerNotFound(_)) => {} Err(other) => panic!("Expected PeerNotFound, got: {}", other),
Ok(_) => panic!("Expected error but got Ok"),
}
}
#[tokio::test]
async fn test_connect_unreachable_peer() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register(quic);
let peer = NodeId::new("unreachable-peer".to_string());
let requirements = MessageRequirements::default();
let result = manager.connect(&peer, &requirements).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_connect_with_fallback_disabled() {
let config = TransportManagerConfig {
enable_fallback: false,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
let peer_unreachable = NodeId::new("nobody".to_string());
let requirements = MessageRequirements::default();
let result = manager
.connect_with_fallback(&peer_unreachable, &requirements)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_connect_with_fallback_no_candidates() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let result = manager.connect_with_fallback(&peer, &requirements).await;
assert!(result.is_err());
match result {
Err(ref e) => {
let err_msg = format!("{}", e);
assert!(err_msg.contains("No suitable transport"));
}
Ok(_) => panic!("Expected error but got Ok"),
}
}
#[test]
fn test_route_message_no_route() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let decision = manager.route_message(&peer, &requirements);
assert_eq!(decision, RouteDecision::NoRoute);
}
#[test]
fn test_route_message_bypass_requested_no_channel() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements {
bypass_sync: true,
..Default::default()
};
let decision = manager.route_message(&peer, &requirements);
assert_eq!(decision, RouteDecision::NoRoute);
}
#[test]
fn test_route_decision_no_route() {
let decision = RouteDecision::NoRoute;
assert_eq!(decision, RouteDecision::NoRoute);
assert_ne!(decision, RouteDecision::Bypass);
assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
}
#[test]
fn test_route_decision_debug() {
let bypass = RouteDecision::Bypass;
let transport = RouteDecision::Transport(TransportType::LoRa);
let no_route = RouteDecision::NoRoute;
assert!(format!("{:?}", bypass).contains("Bypass"));
assert!(format!("{:?}", transport).contains("LoRa"));
assert!(format!("{:?}", no_route).contains("NoRoute"));
}
#[test]
fn test_route_decision_clone() {
let original = RouteDecision::Transport(TransportType::BluetoothLE);
let cloned = original.clone();
assert_eq!(original, cloned);
}
#[test]
fn test_transport_manager_debug() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
manager.register(quic);
let debug_str = format!("{:?}", manager);
assert!(debug_str.contains("TransportManager"));
assert!(debug_str.contains("Quic"));
}
#[test]
fn test_registered_transports() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
assert!(manager.registered_transports().is_empty());
let quic = Arc::new(MockTransport::new(TransportCapabilities::quic()));
let ble = Arc::new(MockTransport::new(TransportCapabilities::bluetooth_le()));
manager.register(quic);
manager.register(ble);
let registered = manager.registered_transports();
assert_eq!(registered.len(), 2);
assert!(registered.contains(&TransportType::Quic));
assert!(registered.contains(&TransportType::BluetoothLE));
}
#[tokio::test]
async fn test_set_bypass_channel() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
assert!(!manager.has_bypass_channel());
let bypass_config = BypassChannelConfig::new();
let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
manager.set_bypass_channel(bypass);
assert!(manager.has_bypass_channel());
}
#[test]
fn test_record_success_caching_disabled() {
let config = TransportManagerConfig {
cache_peer_transport: false,
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
manager.record_success(&peer, TransportType::Quic);
let cached = manager
.peer_transports
.read()
.unwrap_or_else(|e| e.into_inner());
assert!(cached.get(&peer).is_none());
}
#[test]
fn test_select_transport_cached_transport_invalid() {
let config = TransportManagerConfig {
cache_peer_transport: true,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
manager.record_success(&peer, TransportType::LoRa);
let requirements = MessageRequirements::default();
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::BluetoothLE));
}
#[test]
fn test_select_transport_cached_transport_unavailable() {
let config = TransportManagerConfig {
cache_peer_transport: true,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le())
.with_peer(peer.clone())
.unavailable(),
);
manager.register(ble);
manager.record_success(&peer, TransportType::BluetoothLE);
let requirements = MessageRequirements::default();
let selected = manager.select_transport(&peer, &requirements);
assert_eq!(selected, Some(TransportType::Quic));
}
#[test]
fn test_pace_fallback_order() {
let policy = TransportPolicy::new("test")
.primary(vec!["dead-transport"])
.alternate(vec!["lora-915"]);
let config = TransportManagerConfig::with_policy(policy).with_mode(TransportMode::Single);
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst = TransportInstance::new(
"lora-915",
TransportType::LoRa,
TransportCapabilities::lora(7),
);
let t =
Arc::new(MockTransport::new(TransportCapabilities::lora(7)).with_peer(peer.clone()));
manager.register_instance(inst, t);
let requirements = MessageRequirements::default();
let selected = manager.select_transports_pace(&peer, &requirements);
assert_eq!(selected.len(), 1);
assert_eq!(selected[0], "lora-915");
}
#[test]
fn test_get_peer_distance_none() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let peer = NodeId::new("unknown-peer".to_string());
assert!(manager.get_peer_distance(&peer).is_none());
}
#[tokio::test]
async fn test_send_bypass_not_configured() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let result = manager.send_bypass("test_collection", b"hello", None).await;
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("Bypass channel not configured"));
}
#[tokio::test]
async fn test_send_bypass_to_not_configured() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let target = BypassTarget::Broadcast { port: 5150 };
let result = manager
.send_bypass_to(target, "test_collection", b"hello")
.await;
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("Bypass channel not configured"));
}
#[tokio::test]
async fn test_subscribe_bypass_collection_not_configured() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
let result = manager.subscribe_bypass_collection("test").await;
assert!(result.is_err());
}
#[test]
fn test_route_table_empty_returns_none() {
let table = CollectionRouteTable::new();
assert!(table.get("anything").is_none());
assert!(!table.has_collection("anything"));
assert!(!table.is_bypass("anything"));
}
#[test]
fn test_route_table_builder_and_lookup() {
let table = CollectionRouteTable::new()
.with_collection(CollectionRouteConfig {
collection: "telemetry".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE,
},
priority: MessagePriority::Normal,
})
.with_collection(CollectionRouteConfig {
collection: "position".to_string(),
route: CollectionTransportRoute::Bypass {
encoding: MessageEncoding::Raw,
ttl_ms: 200,
bypass_transport: BypassTransport::Broadcast,
},
priority: MessagePriority::High,
});
assert!(table.has_collection("telemetry"));
assert!(table.has_collection("position"));
assert!(!table.has_collection("unknown"));
let telemetry = table.get("telemetry").unwrap();
assert!(matches!(
telemetry.route,
CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE
}
));
assert_eq!(telemetry.priority, MessagePriority::Normal);
let position = table.get("position").unwrap();
assert_eq!(position.priority, MessagePriority::High);
}
#[test]
fn test_route_table_is_bypass() {
let table = CollectionRouteTable::new()
.with_collection(CollectionRouteConfig {
collection: "bypass_col".to_string(),
route: CollectionTransportRoute::Bypass {
encoding: MessageEncoding::Raw,
ttl_ms: 100,
bypass_transport: BypassTransport::Unicast,
},
priority: MessagePriority::Normal,
})
.with_collection(CollectionRouteConfig {
collection: "fixed_col".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::Quic,
},
priority: MessagePriority::Normal,
})
.with_collection(CollectionRouteConfig {
collection: "pace_col".to_string(),
route: CollectionTransportRoute::Pace {
policy_override: None,
},
priority: MessagePriority::Normal,
});
assert!(table.is_bypass("bypass_col"));
assert!(!table.is_bypass("fixed_col"));
assert!(!table.is_bypass("pace_col"));
assert!(!table.is_bypass("nonexistent"));
}
#[test]
fn test_serde_fixed_route() {
let route = CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE,
};
let json = serde_json::to_string(&route).unwrap();
let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
assert!(matches!(
roundtrip,
CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE
}
));
}
#[test]
fn test_serde_bypass_route() {
let route = CollectionTransportRoute::Bypass {
encoding: MessageEncoding::Raw,
ttl_ms: 500,
bypass_transport: BypassTransport::Broadcast,
};
let json = serde_json::to_string(&route).unwrap();
let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
if let CollectionTransportRoute::Bypass {
encoding,
ttl_ms,
bypass_transport,
} = roundtrip
{
assert_eq!(encoding, MessageEncoding::Raw);
assert_eq!(ttl_ms, 500);
assert_eq!(bypass_transport, BypassTransport::Broadcast);
} else {
panic!("Expected Bypass variant");
}
}
#[test]
fn test_serde_pace_route() {
let route = CollectionTransportRoute::Pace {
policy_override: None,
};
let json = serde_json::to_string(&route).unwrap();
let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
assert!(matches!(
roundtrip,
CollectionTransportRoute::Pace {
policy_override: None
}
));
}
#[test]
fn test_serde_pace_route_with_policy() {
let policy = TransportPolicy::new("custom")
.primary(vec!["ble-hci0"])
.alternate(vec!["iroh-wlan0"]);
let route = CollectionTransportRoute::Pace {
policy_override: Some(policy),
};
let json = serde_json::to_string(&route).unwrap();
let roundtrip: CollectionTransportRoute = serde_json::from_str(&json).unwrap();
if let CollectionTransportRoute::Pace {
policy_override: Some(p),
} = roundtrip
{
assert_eq!(p.name, "custom");
assert_eq!(p.primary, vec!["ble-hci0"]);
assert_eq!(p.alternate, vec!["iroh-wlan0"]);
} else {
panic!("Expected Pace with policy_override");
}
}
#[test]
fn test_serde_collection_route_config() {
let config = CollectionRouteConfig {
collection: "sensors".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::LoRa,
},
priority: MessagePriority::High,
};
let json = serde_json::to_string(&config).unwrap();
let roundtrip: CollectionRouteConfig = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.collection, "sensors");
assert_eq!(roundtrip.priority, MessagePriority::High);
}
#[test]
fn test_serde_collection_route_table() {
let table = CollectionRouteTable::new()
.with_collection(CollectionRouteConfig {
collection: "a".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::Quic,
},
priority: MessagePriority::Normal,
})
.with_collection(CollectionRouteConfig {
collection: "b".to_string(),
route: CollectionTransportRoute::Bypass {
encoding: MessageEncoding::Json,
ttl_ms: 1000,
bypass_transport: BypassTransport::Unicast,
},
priority: MessagePriority::Critical,
});
let json = serde_json::to_string(&table).unwrap();
let roundtrip: CollectionRouteTable = serde_json::from_str(&json).unwrap();
assert!(roundtrip.has_collection("a"));
assert!(roundtrip.has_collection("b"));
assert!(roundtrip.is_bypass("b"));
assert!(!roundtrip.is_bypass("a"));
}
#[test]
fn test_serde_transport_type() {
let types = vec![
TransportType::Quic,
TransportType::BluetoothClassic,
TransportType::BluetoothLE,
TransportType::WifiDirect,
TransportType::LoRa,
TransportType::TacticalRadio,
TransportType::Satellite,
TransportType::Custom(42),
];
for tt in types {
let json = serde_json::to_string(&tt).unwrap();
let roundtrip: TransportType = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip, tt, "Failed round-trip for {:?}", tt);
}
}
#[test]
fn test_route_collection_unknown_falls_through() {
let config = TransportManagerConfig::default();
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let quic =
Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register(quic);
let requirements = MessageRequirements::default();
let decision = manager.route_collection("unknown", &peer, &requirements);
assert_eq!(decision, RouteDecision::Transport(TransportType::Quic));
}
#[test]
fn test_route_collection_fixed_routes_correctly() {
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "telemetry".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE,
},
priority: MessagePriority::Normal,
});
let config = TransportManagerConfig {
collection_routes: table,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register(ble);
let requirements = MessageRequirements::default();
let decision = manager.route_collection("telemetry", &peer, &requirements);
assert_eq!(
decision,
RouteDecision::Transport(TransportType::BluetoothLE)
);
}
#[test]
fn test_route_collection_fixed_unavailable_returns_no_route() {
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "telemetry".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE,
},
priority: MessagePriority::Normal,
});
let config = TransportManagerConfig {
collection_routes: table,
..Default::default()
};
let mut manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let ble = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le())
.with_peer(peer.clone())
.unavailable(),
);
manager.register(ble);
let requirements = MessageRequirements::default();
let decision = manager.route_collection("telemetry", &peer, &requirements);
assert_eq!(decision, RouteDecision::NoRoute);
}
#[test]
fn test_route_collection_fixed_not_registered_returns_no_route() {
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "telemetry".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::BluetoothLE,
},
priority: MessagePriority::Normal,
});
let config = TransportManagerConfig {
collection_routes: table,
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let decision = manager.route_collection("telemetry", &peer, &requirements);
assert_eq!(decision, RouteDecision::NoRoute);
}
#[tokio::test]
async fn test_route_collection_bypass_with_channel() {
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "position".to_string(),
route: CollectionTransportRoute::Bypass {
encoding: MessageEncoding::Raw,
ttl_ms: 200,
bypass_transport: BypassTransport::Broadcast,
},
priority: MessagePriority::High,
});
let config = TransportManagerConfig {
collection_routes: table,
..Default::default()
};
let mut manager = TransportManager::new(config);
let bypass_config = BypassChannelConfig::new();
let bypass = UdpBypassChannel::new(bypass_config).await.unwrap();
manager.set_bypass_channel(bypass);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let decision = manager.route_collection("position", &peer, &requirements);
assert_eq!(decision, RouteDecision::Bypass);
}
#[test]
fn test_route_collection_bypass_without_channel() {
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "position".to_string(),
route: CollectionTransportRoute::Bypass {
encoding: MessageEncoding::Raw,
ttl_ms: 200,
bypass_transport: BypassTransport::Broadcast,
},
priority: MessagePriority::High,
});
let config = TransportManagerConfig {
collection_routes: table,
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let decision = manager.route_collection("position", &peer, &requirements);
assert_eq!(decision, RouteDecision::NoRoute);
}
#[test]
fn test_route_collection_pace_routes_correctly() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "sync_data".to_string(),
route: CollectionTransportRoute::Pace {
policy_override: None,
},
priority: MessagePriority::Normal,
});
let config = TransportManagerConfig {
collection_routes: table,
default_policy: Some(policy),
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst = TransportInstance::new(
"iroh-eth0",
TransportType::Quic,
TransportCapabilities::quic(),
);
let t = Arc::new(MockTransport::new(TransportCapabilities::quic()).with_peer(peer.clone()));
manager.register_instance(inst, t);
let requirements = MessageRequirements::default();
let decision = manager.route_collection("sync_data", &peer, &requirements);
assert_eq!(
decision,
RouteDecision::TransportInstance("iroh-eth0".to_string())
);
}
#[test]
fn test_route_collection_pace_no_available_returns_no_route() {
let policy = TransportPolicy::new("test").primary(vec!["iroh-eth0"]);
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "sync_data".to_string(),
route: CollectionTransportRoute::Pace {
policy_override: None,
},
priority: MessagePriority::Normal,
});
let config = TransportManagerConfig {
collection_routes: table,
default_policy: Some(policy),
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let requirements = MessageRequirements::default();
let decision = manager.route_collection("sync_data", &peer, &requirements);
assert_eq!(decision, RouteDecision::NoRoute);
}
#[test]
fn test_route_collection_pace_with_policy_override() {
let default_policy = TransportPolicy::new("default").primary(vec!["nonexistent"]);
let override_policy = TransportPolicy::new("override").primary(vec!["ble-hci0"]);
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "ble_data".to_string(),
route: CollectionTransportRoute::Pace {
policy_override: Some(override_policy),
},
priority: MessagePriority::Normal,
});
let config = TransportManagerConfig {
collection_routes: table,
default_policy: Some(default_policy),
..Default::default()
};
let manager = TransportManager::new(config);
let peer = NodeId::new("peer-1".to_string());
let inst = TransportInstance::new(
"ble-hci0",
TransportType::BluetoothLE,
TransportCapabilities::bluetooth_le(),
);
let t = Arc::new(
MockTransport::new(TransportCapabilities::bluetooth_le()).with_peer(peer.clone()),
);
manager.register_instance(inst, t);
let requirements = MessageRequirements::default();
let decision = manager.route_collection("ble_data", &peer, &requirements);
assert_eq!(
decision,
RouteDecision::TransportInstance("ble-hci0".to_string())
);
}
#[test]
fn test_route_decision_transport_instance_variant() {
let decision = RouteDecision::TransportInstance("iroh-eth0".to_string());
assert_eq!(
decision,
RouteDecision::TransportInstance("iroh-eth0".to_string())
);
assert_ne!(decision, RouteDecision::Bypass);
assert_ne!(decision, RouteDecision::NoRoute);
assert_ne!(decision, RouteDecision::Transport(TransportType::Quic));
}
#[test]
fn test_route_decision_transport_instance_debug() {
let decision = RouteDecision::TransportInstance("ble-hci0".to_string());
let debug = format!("{:?}", decision);
assert!(debug.contains("TransportInstance"));
assert!(debug.contains("ble-hci0"));
}
#[test]
fn test_route_decision_transport_instance_clone() {
let original = RouteDecision::TransportInstance("iroh-wlan0".to_string());
let cloned = original.clone();
assert_eq!(original, cloned);
}
#[test]
fn test_get_collection_route_found() {
let table = CollectionRouteTable::new().with_collection(CollectionRouteConfig {
collection: "telemetry".to_string(),
route: CollectionTransportRoute::Fixed {
transport_type: TransportType::Quic,
},
priority: MessagePriority::High,
});
let config = TransportManagerConfig {
collection_routes: table,
..Default::default()
};
let manager = TransportManager::new(config);
let route = manager.get_collection_route("telemetry");
assert!(route.is_some());
assert_eq!(route.unwrap().collection, "telemetry");
assert_eq!(route.unwrap().priority, MessagePriority::High);
}
#[test]
fn test_get_collection_route_not_found() {
let config = TransportManagerConfig::default();
let manager = TransportManager::new(config);
assert!(manager.get_collection_route("nonexistent").is_none());
}
}