use async_trait::async_trait;
use std::error::Error as StdError;
use std::fmt;
use std::time::Instant;
use tokio::sync::mpsc;
pub mod bypass;
pub mod capabilities;
pub mod health;
pub mod manager;
pub mod reconnection;
#[cfg(feature = "lite-bridge")]
pub mod lite;
#[cfg(feature = "lite-bridge")]
pub mod lite_ota;
#[cfg(feature = "bluetooth")]
pub mod btle;
pub use bypass::{
BypassChannelConfig, BypassCollectionConfig, BypassError, BypassHeader, BypassMessage,
BypassMetrics, BypassMetricsSnapshot, BypassTarget, BypassTransport, MessageEncoding,
UdpBypassChannel, UdpConfig,
};
pub use capabilities::{
ConfigurableTransport, DistanceSource, MessagePriority, MessageRequirements, PaceLevel,
PeerDistance, RangeMode, RangeModeConfig, Transport, TransportCapabilities, TransportId,
TransportInstance, TransportMode, TransportPolicy, TransportType,
};
pub use health::{HealthMonitor, HeartbeatConfig};
pub use manager::{
CollectionRouteConfig, CollectionRouteTable, CollectionTransportRoute, RouteDecision,
TransportManager, TransportManagerConfig,
};
#[cfg(feature = "lite-bridge")]
pub use lite::{
CrdtType, LiteCapabilities, LiteCapabilitiesExt, LiteDocumentBridge, LiteMeshTransport,
LiteMessage, LitePeerState, LiteSyncMode, LiteTransportConfig, MessageType as LiteMessageType,
OrSetElement, QueryRequest, FULL_CRDT,
};
#[cfg(feature = "lite-bridge")]
pub use lite_ota::{FirmwareImage, OtaSender, OtaStatusInfo};
#[cfg(feature = "bluetooth")]
pub use btle::PeatBleTransport;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeId(String);
impl NodeId {
pub fn new(id: String) -> Self {
Self(id)
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for NodeId {
fn from(id: String) -> Self {
Self(id)
}
}
impl From<&str> for NodeId {
fn from(id: &str) -> Self {
Self(id.to_string())
}
}
#[derive(Debug, Clone)]
pub enum PeerEvent {
Connected {
peer_id: NodeId,
connected_at: Instant,
},
Disconnected {
peer_id: NodeId,
reason: DisconnectReason,
connection_duration: std::time::Duration,
},
Degraded {
peer_id: NodeId,
health: ConnectionHealth,
},
Reconnecting {
peer_id: NodeId,
attempt: u32,
max_attempts: Option<u32>,
},
ReconnectFailed {
peer_id: NodeId,
attempt: u32,
error: String,
will_retry: bool,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DisconnectReason {
RemoteClosed,
Timeout,
NetworkError(String),
LocalClosed,
IdleTimeout,
ApplicationError(String),
Unknown,
}
impl fmt::Display for DisconnectReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DisconnectReason::RemoteClosed => write!(f, "remote closed"),
DisconnectReason::Timeout => write!(f, "timeout"),
DisconnectReason::NetworkError(e) => write!(f, "network error: {}", e),
DisconnectReason::LocalClosed => write!(f, "local closed"),
DisconnectReason::IdleTimeout => write!(f, "idle timeout"),
DisconnectReason::ApplicationError(e) => write!(f, "application error: {}", e),
DisconnectReason::Unknown => write!(f, "unknown"),
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectionHealth {
pub rtt_ms: u32,
pub rtt_variance_ms: u32,
pub packet_loss_percent: u8,
pub state: ConnectionState,
pub last_activity: Instant,
}
impl Default for ConnectionHealth {
fn default() -> Self {
Self {
rtt_ms: 0,
rtt_variance_ms: 0,
packet_loss_percent: 0,
state: ConnectionState::Healthy,
last_activity: Instant::now(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Healthy,
Degraded,
Suspect,
Dead,
}
impl fmt::Display for ConnectionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionState::Healthy => write!(f, "healthy"),
ConnectionState::Degraded => write!(f, "degraded"),
ConnectionState::Suspect => write!(f, "suspect"),
ConnectionState::Dead => write!(f, "dead"),
}
}
}
#[derive(Debug)]
pub enum TransportError {
ConnectionFailed(String),
PeerNotFound(String),
AlreadyConnected(String),
NotStarted,
Other(Box<dyn StdError + Send + Sync>),
}
impl fmt::Display for TransportError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransportError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
TransportError::PeerNotFound(msg) => write!(f, "Peer not found: {}", msg),
TransportError::AlreadyConnected(msg) => write!(f, "Already connected: {}", msg),
TransportError::NotStarted => write!(f, "Transport not started"),
TransportError::Other(err) => write!(f, "Transport error: {}", err),
}
}
}
impl StdError for TransportError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
TransportError::Other(err) => Some(err.as_ref()),
_ => None,
}
}
}
pub type Result<T> = std::result::Result<T, TransportError>;
pub const PEER_EVENT_CHANNEL_CAPACITY: usize = 256;
pub type PeerEventReceiver = mpsc::Receiver<PeerEvent>;
pub type PeerEventSender = mpsc::Sender<PeerEvent>;
#[async_trait]
pub trait MeshTransport: Send + Sync {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>>;
async fn disconnect(&self, peer_id: &NodeId) -> Result<()>;
fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn MeshConnection>>;
fn peer_count(&self) -> usize;
fn connected_peers(&self) -> Vec<NodeId>;
fn is_connected(&self, peer_id: &NodeId) -> bool {
self.get_connection(peer_id).is_some()
}
async fn send_to(&self, peer_id: &NodeId, data: &[u8]) -> Result<usize> {
let _ = (peer_id, data);
Err(TransportError::ConnectionFailed(
"send not implemented".into(),
))
}
fn subscribe_peer_events(&self) -> PeerEventReceiver;
fn get_peer_health(&self, peer_id: &NodeId) -> Option<ConnectionHealth> {
self.get_connection(peer_id)
.map(|_| ConnectionHealth::default())
}
}
pub trait MeshConnection: Send + Sync {
fn peer_id(&self) -> &NodeId;
fn is_alive(&self) -> bool;
fn connected_at(&self) -> Instant;
fn disconnect_reason(&self) -> Option<DisconnectReason> {
if self.is_alive() {
None
} else {
Some(DisconnectReason::Unknown)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
#[test]
fn test_node_id_creation() {
let id = NodeId::new("node-123".to_string());
assert_eq!(id.as_str(), "node-123");
assert_eq!(id.to_string(), "node-123");
}
#[test]
fn test_node_id_from_string() {
let id: NodeId = "node-456".into();
assert_eq!(id.as_str(), "node-456");
}
#[test]
fn test_node_id_from_str() {
let id: NodeId = NodeId::from("node-789");
assert_eq!(id.as_str(), "node-789");
}
#[test]
fn test_node_id_equality() {
let id1 = NodeId::new("node-123".to_string());
let id2 = NodeId::new("node-123".to_string());
let id3 = NodeId::new("node-456".to_string());
assert_eq!(id1, id2);
assert_ne!(id1, id3);
}
#[test]
fn test_node_id_hash() {
let mut set = HashSet::new();
set.insert(NodeId::new("a".into()));
set.insert(NodeId::new("a".into()));
set.insert(NodeId::new("b".into()));
assert_eq!(set.len(), 2);
}
#[test]
fn test_node_id_display() {
let id = NodeId::new("display-me".into());
assert_eq!(format!("{}", id), "display-me");
}
#[test]
fn test_disconnect_reason_display() {
assert_eq!(DisconnectReason::RemoteClosed.to_string(), "remote closed");
assert_eq!(DisconnectReason::Timeout.to_string(), "timeout");
assert_eq!(
DisconnectReason::NetworkError("reset".into()).to_string(),
"network error: reset"
);
assert_eq!(DisconnectReason::LocalClosed.to_string(), "local closed");
assert_eq!(DisconnectReason::IdleTimeout.to_string(), "idle timeout");
assert_eq!(
DisconnectReason::ApplicationError("bug".into()).to_string(),
"application error: bug"
);
assert_eq!(DisconnectReason::Unknown.to_string(), "unknown");
}
#[test]
fn test_disconnect_reason_equality() {
assert_eq!(DisconnectReason::Timeout, DisconnectReason::Timeout);
assert_ne!(DisconnectReason::Timeout, DisconnectReason::Unknown);
assert_eq!(
DisconnectReason::NetworkError("x".into()),
DisconnectReason::NetworkError("x".into()),
);
}
#[test]
fn test_connection_state_display() {
assert_eq!(ConnectionState::Healthy.to_string(), "healthy");
assert_eq!(ConnectionState::Degraded.to_string(), "degraded");
assert_eq!(ConnectionState::Suspect.to_string(), "suspect");
assert_eq!(ConnectionState::Dead.to_string(), "dead");
}
#[test]
fn test_connection_state_equality() {
assert_eq!(ConnectionState::Healthy, ConnectionState::Healthy);
assert_ne!(ConnectionState::Healthy, ConnectionState::Dead);
}
#[test]
fn test_connection_health_default() {
let h = ConnectionHealth::default();
assert_eq!(h.rtt_ms, 0);
assert_eq!(h.rtt_variance_ms, 0);
assert_eq!(h.packet_loss_percent, 0);
assert_eq!(h.state, ConnectionState::Healthy);
}
#[test]
fn test_transport_error_display() {
assert_eq!(
TransportError::ConnectionFailed("timeout".into()).to_string(),
"Connection failed: timeout"
);
assert_eq!(
TransportError::PeerNotFound("node-123".into()).to_string(),
"Peer not found: node-123"
);
assert_eq!(
TransportError::AlreadyConnected("node-1".into()).to_string(),
"Already connected: node-1"
);
assert_eq!(
TransportError::NotStarted.to_string(),
"Transport not started"
);
}
#[test]
fn test_transport_error_other() {
let inner = std::io::Error::new(std::io::ErrorKind::Other, "boom");
let err = TransportError::Other(Box::new(inner));
assert!(err.to_string().contains("boom"));
}
#[test]
fn test_transport_error_source() {
use std::error::Error;
let err = TransportError::NotStarted;
assert!(err.source().is_none());
let inner = std::io::Error::new(std::io::ErrorKind::Other, "boom");
let err = TransportError::Other(Box::new(inner));
assert!(err.source().is_some());
}
#[test]
fn test_peer_event_connected() {
let evt = PeerEvent::Connected {
peer_id: NodeId::new("p1".into()),
connected_at: Instant::now(),
};
if let PeerEvent::Connected { peer_id, .. } = evt {
assert_eq!(peer_id.as_str(), "p1");
}
}
#[test]
fn test_peer_event_disconnected() {
let evt = PeerEvent::Disconnected {
peer_id: NodeId::new("p1".into()),
reason: DisconnectReason::Timeout,
connection_duration: std::time::Duration::from_secs(60),
};
if let PeerEvent::Disconnected {
reason,
connection_duration,
..
} = evt
{
assert_eq!(reason, DisconnectReason::Timeout);
assert_eq!(connection_duration.as_secs(), 60);
}
}
#[test]
fn test_peer_event_degraded() {
let evt = PeerEvent::Degraded {
peer_id: NodeId::new("p1".into()),
health: ConnectionHealth::default(),
};
if let PeerEvent::Degraded { health, .. } = evt {
assert_eq!(health.state, ConnectionState::Healthy);
}
}
#[test]
fn test_peer_event_reconnecting() {
let evt = PeerEvent::Reconnecting {
peer_id: NodeId::new("p1".into()),
attempt: 3,
max_attempts: Some(5),
};
if let PeerEvent::Reconnecting {
attempt,
max_attempts,
..
} = evt
{
assert_eq!(attempt, 3);
assert_eq!(max_attempts, Some(5));
}
}
#[test]
fn test_peer_event_reconnect_failed() {
let evt = PeerEvent::ReconnectFailed {
peer_id: NodeId::new("p1".into()),
attempt: 5,
error: "timeout".into(),
will_retry: false,
};
if let PeerEvent::ReconnectFailed {
will_retry, error, ..
} = evt
{
assert!(!will_retry);
assert_eq!(error, "timeout");
}
}
struct TestConnection {
pid: NodeId,
alive: bool,
}
impl MeshConnection for TestConnection {
fn peer_id(&self) -> &NodeId {
&self.pid
}
fn is_alive(&self) -> bool {
self.alive
}
fn connected_at(&self) -> Instant {
Instant::now()
}
}
#[test]
fn test_mesh_connection_disconnect_reason_alive() {
let conn = TestConnection {
pid: NodeId::new("p".into()),
alive: true,
};
assert!(conn.disconnect_reason().is_none());
}
#[test]
fn test_mesh_connection_disconnect_reason_dead() {
let conn = TestConnection {
pid: NodeId::new("p".into()),
alive: false,
};
assert_eq!(conn.disconnect_reason(), Some(DisconnectReason::Unknown));
}
struct MinimalTransport;
#[async_trait::async_trait]
impl MeshTransport for MinimalTransport {
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn connect(&self, _: &NodeId) -> Result<Box<dyn MeshConnection>> {
Err(TransportError::NotStarted)
}
async fn disconnect(&self, _: &NodeId) -> Result<()> {
Ok(())
}
fn get_connection(&self, _: &NodeId) -> Option<Box<dyn MeshConnection>> {
None
}
fn peer_count(&self) -> usize {
0
}
fn connected_peers(&self) -> Vec<NodeId> {
vec![]
}
fn subscribe_peer_events(&self) -> PeerEventReceiver {
let (_tx, rx) = tokio::sync::mpsc::channel(1);
rx
}
}
#[tokio::test]
async fn test_send_to_default_returns_error() {
let transport = MinimalTransport;
let peer = NodeId::new("peer-1".into());
let result = transport.send_to(&peer, b"hello").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, TransportError::ConnectionFailed(msg) if msg.contains("send not implemented"))
);
}
}