Skip to main content

peat_mesh/transport/
mod.rs

1//! Transport abstraction for mesh topology connections
2//!
3//! This module provides backend-agnostic types and traits for establishing
4//! P2P connections in the mesh network. It enables `TopologyManager` and
5//! related components to work with any transport backend.
6//!
7//! ## Core Types
8//!
9//! - **NodeId**: Mesh network node identifier
10//! - **MeshTransport**: Connection establishment and management trait
11//! - **MeshConnection**: Active connection to a peer trait
12//! - **PeerEvent**: Connection lifecycle events
13
14use async_trait::async_trait;
15use std::error::Error as StdError;
16use std::fmt;
17use std::time::Instant;
18use tokio::sync::mpsc;
19
20// Submodules moved from peat-protocol (ADR-049 Phase 2)
21pub mod bypass;
22pub mod capabilities;
23pub mod health;
24pub mod manager;
25pub mod reconnection;
26
27#[cfg(feature = "lite-bridge")]
28pub mod lite;
29#[cfg(feature = "lite-bridge")]
30pub mod lite_ota;
31
32#[cfg(feature = "bluetooth")]
33pub mod btle;
34
35// Re-exports from submodules
36pub use bypass::{
37    BypassChannelConfig, BypassCollectionConfig, BypassError, BypassHeader, BypassMessage,
38    BypassMetrics, BypassMetricsSnapshot, BypassTarget, BypassTransport, MessageEncoding,
39    UdpBypassChannel, UdpConfig,
40};
41pub use capabilities::{
42    ConfigurableTransport, DistanceSource, MessagePriority, MessageRequirements, PaceLevel,
43    PeerDistance, RangeMode, RangeModeConfig, Transport, TransportCapabilities, TransportId,
44    TransportInstance, TransportMode, TransportPolicy, TransportType,
45};
46pub use health::{HealthMonitor, HeartbeatConfig};
47pub use manager::{
48    CollectionRouteConfig, CollectionRouteTable, CollectionTransportRoute, RouteDecision,
49    TransportManager, TransportManagerConfig,
50};
51
52#[cfg(feature = "lite-bridge")]
53pub use lite::{
54    CrdtType, LiteCapabilities, LiteCapabilitiesExt, LiteDocumentBridge, LiteMeshTransport,
55    LiteMessage, LitePeerState, LiteSyncMode, LiteTransportConfig, MessageType as LiteMessageType,
56    OrSetElement, QueryRequest, FULL_CRDT,
57};
58
59#[cfg(feature = "lite-bridge")]
60pub use lite_ota::{FirmwareImage, OtaSender, OtaStatusInfo};
61
62#[cfg(feature = "bluetooth")]
63pub use btle::PeatBleTransport;
64
65// =============================================================================
66// Node Identity
67// =============================================================================
68
69/// Node identifier in the mesh network
70///
71/// Uniquely identifies a node in the mesh. This is separate from
72/// backend-specific IDs (e.g., Iroh's EndpointId, Ditto's peer ID).
73#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct NodeId(String);
75
76impl NodeId {
77    /// Create a new node ID from a string
78    pub fn new(id: String) -> Self {
79        Self(id)
80    }
81
82    /// Get the node ID as a string slice
83    pub fn as_str(&self) -> &str {
84        &self.0
85    }
86}
87
88impl fmt::Display for NodeId {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(f, "{}", self.0)
91    }
92}
93
94impl From<String> for NodeId {
95    fn from(id: String) -> Self {
96        Self(id)
97    }
98}
99
100impl From<&str> for NodeId {
101    fn from(id: &str) -> Self {
102        Self(id.to_string())
103    }
104}
105
106// =============================================================================
107// Peer Events
108// =============================================================================
109
110/// Peer connection lifecycle events
111///
112/// Applications can subscribe to these events to react to peer state changes.
113#[derive(Debug, Clone)]
114pub enum PeerEvent {
115    /// New peer connected successfully
116    Connected {
117        /// The peer's node ID
118        peer_id: NodeId,
119        /// When the connection was established
120        connected_at: Instant,
121    },
122
123    /// Peer disconnected
124    Disconnected {
125        /// The peer's node ID
126        peer_id: NodeId,
127        /// Reason for disconnection (if known)
128        reason: DisconnectReason,
129        /// How long the connection was active
130        connection_duration: std::time::Duration,
131    },
132
133    /// Connection quality degraded
134    Degraded {
135        /// The peer's node ID
136        peer_id: NodeId,
137        /// Current health metrics
138        health: ConnectionHealth,
139    },
140
141    /// Attempting to reconnect to a peer
142    Reconnecting {
143        /// The peer's node ID
144        peer_id: NodeId,
145        /// Current attempt number (1-indexed)
146        attempt: u32,
147        /// Maximum attempts configured (None = infinite)
148        max_attempts: Option<u32>,
149    },
150
151    /// Reconnection attempt failed
152    ReconnectFailed {
153        /// The peer's node ID
154        peer_id: NodeId,
155        /// Current attempt number
156        attempt: u32,
157        /// Error message
158        error: String,
159        /// Whether more retries will be attempted
160        will_retry: bool,
161    },
162}
163
164/// Reason for peer disconnection
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub enum DisconnectReason {
167    /// Remote peer initiated close
168    RemoteClosed,
169    /// Connection timed out
170    Timeout,
171    /// Network error occurred
172    NetworkError(String),
173    /// Local side requested disconnect
174    LocalClosed,
175    /// Connection was idle too long
176    IdleTimeout,
177    /// Application-level error
178    ApplicationError(String),
179    /// Unknown reason
180    Unknown,
181}
182
183impl fmt::Display for DisconnectReason {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        match self {
186            DisconnectReason::RemoteClosed => write!(f, "remote closed"),
187            DisconnectReason::Timeout => write!(f, "timeout"),
188            DisconnectReason::NetworkError(e) => write!(f, "network error: {}", e),
189            DisconnectReason::LocalClosed => write!(f, "local closed"),
190            DisconnectReason::IdleTimeout => write!(f, "idle timeout"),
191            DisconnectReason::ApplicationError(e) => write!(f, "application error: {}", e),
192            DisconnectReason::Unknown => write!(f, "unknown"),
193        }
194    }
195}
196
197/// Connection health metrics
198#[derive(Debug, Clone)]
199pub struct ConnectionHealth {
200    /// Round-trip time in milliseconds (smoothed average)
201    pub rtt_ms: u32,
202    /// RTT variance in milliseconds
203    pub rtt_variance_ms: u32,
204    /// Estimated packet loss percentage (0-100)
205    pub packet_loss_percent: u8,
206    /// Current connection state
207    pub state: ConnectionState,
208    /// Last successful communication
209    pub last_activity: Instant,
210}
211
212impl Default for ConnectionHealth {
213    fn default() -> Self {
214        Self {
215            rtt_ms: 0,
216            rtt_variance_ms: 0,
217            packet_loss_percent: 0,
218            state: ConnectionState::Healthy,
219            last_activity: Instant::now(),
220        }
221    }
222}
223
224/// Connection state for health monitoring
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum ConnectionState {
227    /// Connection is healthy
228    Healthy,
229    /// Connection is degraded (high latency/loss)
230    Degraded,
231    /// Connection is suspected dead (missed heartbeats)
232    Suspect,
233    /// Connection confirmed dead
234    Dead,
235}
236
237impl fmt::Display for ConnectionState {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match self {
240            ConnectionState::Healthy => write!(f, "healthy"),
241            ConnectionState::Degraded => write!(f, "degraded"),
242            ConnectionState::Suspect => write!(f, "suspect"),
243            ConnectionState::Dead => write!(f, "dead"),
244        }
245    }
246}
247
248// =============================================================================
249// Error Types
250// =============================================================================
251
252/// Error type for mesh transport operations
253#[derive(Debug)]
254pub enum TransportError {
255    /// Connection failed to establish
256    ConnectionFailed(String),
257    /// Peer not found or unreachable
258    PeerNotFound(String),
259    /// Connection already exists
260    AlreadyConnected(String),
261    /// Transport not started
262    NotStarted,
263    /// Generic transport error
264    Other(Box<dyn StdError + Send + Sync>),
265}
266
267impl fmt::Display for TransportError {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        match self {
270            TransportError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
271            TransportError::PeerNotFound(msg) => write!(f, "Peer not found: {}", msg),
272            TransportError::AlreadyConnected(msg) => write!(f, "Already connected: {}", msg),
273            TransportError::NotStarted => write!(f, "Transport not started"),
274            TransportError::Other(err) => write!(f, "Transport error: {}", err),
275        }
276    }
277}
278
279impl StdError for TransportError {
280    fn source(&self) -> Option<&(dyn StdError + 'static)> {
281        match self {
282            TransportError::Other(err) => Some(err.as_ref()),
283            _ => None,
284        }
285    }
286}
287
288/// Result type alias for transport operations
289pub type Result<T> = std::result::Result<T, TransportError>;
290
291/// Channel capacity for peer events
292pub const PEER_EVENT_CHANNEL_CAPACITY: usize = 256;
293
294/// Type alias for peer event receiver
295pub type PeerEventReceiver = mpsc::Receiver<PeerEvent>;
296
297/// Type alias for peer event sender
298pub type PeerEventSender = mpsc::Sender<PeerEvent>;
299
300// =============================================================================
301// Transport Traits
302// =============================================================================
303
304/// Transport abstraction for mesh topology connections
305///
306/// This trait defines the connection management operations needed by
307/// `TopologyManager` to establish parent-child relationships in the mesh.
308#[async_trait]
309pub trait MeshTransport: Send + Sync {
310    /// Start the transport layer
311    async fn start(&self) -> Result<()>;
312
313    /// Stop the transport layer
314    async fn stop(&self) -> Result<()>;
315
316    /// Connect to a peer by node ID
317    async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnection>>;
318
319    /// Disconnect from a peer
320    async fn disconnect(&self, peer_id: &NodeId) -> Result<()>;
321
322    /// Get an existing connection to a peer
323    fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn MeshConnection>>;
324
325    /// Get the number of connected peers
326    fn peer_count(&self) -> usize;
327
328    /// Get list of connected peer IDs
329    fn connected_peers(&self) -> Vec<NodeId>;
330
331    /// Check if connected to a specific peer
332    fn is_connected(&self, peer_id: &NodeId) -> bool {
333        self.get_connection(peer_id).is_some()
334    }
335
336    /// Send data to a connected peer.
337    ///
338    /// Returns the number of bytes sent.
339    async fn send_to(&self, peer_id: &NodeId, data: &[u8]) -> Result<usize> {
340        let _ = (peer_id, data);
341        Err(TransportError::ConnectionFailed(
342            "send not implemented".into(),
343        ))
344    }
345
346    /// Subscribe to peer connection events
347    fn subscribe_peer_events(&self) -> PeerEventReceiver;
348
349    /// Get connection health for a specific peer
350    fn get_peer_health(&self, peer_id: &NodeId) -> Option<ConnectionHealth> {
351        self.get_connection(peer_id)
352            .map(|_| ConnectionHealth::default())
353    }
354}
355
356/// Active connection to a mesh peer
357///
358/// This trait abstracts over backend-specific connection types.
359pub trait MeshConnection: Send + Sync {
360    /// Get the remote peer's node ID
361    fn peer_id(&self) -> &NodeId;
362
363    /// Check if connection is still alive
364    fn is_alive(&self) -> bool;
365
366    /// Get the time when this connection was established
367    fn connected_at(&self) -> Instant;
368
369    /// Get the disconnect reason if the connection is closed
370    fn disconnect_reason(&self) -> Option<DisconnectReason> {
371        if self.is_alive() {
372            None
373        } else {
374            Some(DisconnectReason::Unknown)
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use std::collections::HashSet;
383
384    #[test]
385    fn test_node_id_creation() {
386        let id = NodeId::new("node-123".to_string());
387        assert_eq!(id.as_str(), "node-123");
388        assert_eq!(id.to_string(), "node-123");
389    }
390
391    #[test]
392    fn test_node_id_from_string() {
393        let id: NodeId = "node-456".into();
394        assert_eq!(id.as_str(), "node-456");
395    }
396
397    #[test]
398    fn test_node_id_from_str() {
399        let id: NodeId = NodeId::from("node-789");
400        assert_eq!(id.as_str(), "node-789");
401    }
402
403    #[test]
404    fn test_node_id_equality() {
405        let id1 = NodeId::new("node-123".to_string());
406        let id2 = NodeId::new("node-123".to_string());
407        let id3 = NodeId::new("node-456".to_string());
408
409        assert_eq!(id1, id2);
410        assert_ne!(id1, id3);
411    }
412
413    #[test]
414    fn test_node_id_hash() {
415        let mut set = HashSet::new();
416        set.insert(NodeId::new("a".into()));
417        set.insert(NodeId::new("a".into()));
418        set.insert(NodeId::new("b".into()));
419        assert_eq!(set.len(), 2);
420    }
421
422    #[test]
423    fn test_node_id_display() {
424        let id = NodeId::new("display-me".into());
425        assert_eq!(format!("{}", id), "display-me");
426    }
427
428    // --- DisconnectReason ---
429
430    #[test]
431    fn test_disconnect_reason_display() {
432        assert_eq!(DisconnectReason::RemoteClosed.to_string(), "remote closed");
433        assert_eq!(DisconnectReason::Timeout.to_string(), "timeout");
434        assert_eq!(
435            DisconnectReason::NetworkError("reset".into()).to_string(),
436            "network error: reset"
437        );
438        assert_eq!(DisconnectReason::LocalClosed.to_string(), "local closed");
439        assert_eq!(DisconnectReason::IdleTimeout.to_string(), "idle timeout");
440        assert_eq!(
441            DisconnectReason::ApplicationError("bug".into()).to_string(),
442            "application error: bug"
443        );
444        assert_eq!(DisconnectReason::Unknown.to_string(), "unknown");
445    }
446
447    #[test]
448    fn test_disconnect_reason_equality() {
449        assert_eq!(DisconnectReason::Timeout, DisconnectReason::Timeout);
450        assert_ne!(DisconnectReason::Timeout, DisconnectReason::Unknown);
451        assert_eq!(
452            DisconnectReason::NetworkError("x".into()),
453            DisconnectReason::NetworkError("x".into()),
454        );
455    }
456
457    // --- ConnectionState ---
458
459    #[test]
460    fn test_connection_state_display() {
461        assert_eq!(ConnectionState::Healthy.to_string(), "healthy");
462        assert_eq!(ConnectionState::Degraded.to_string(), "degraded");
463        assert_eq!(ConnectionState::Suspect.to_string(), "suspect");
464        assert_eq!(ConnectionState::Dead.to_string(), "dead");
465    }
466
467    #[test]
468    fn test_connection_state_equality() {
469        assert_eq!(ConnectionState::Healthy, ConnectionState::Healthy);
470        assert_ne!(ConnectionState::Healthy, ConnectionState::Dead);
471    }
472
473    // --- ConnectionHealth ---
474
475    #[test]
476    fn test_connection_health_default() {
477        let h = ConnectionHealth::default();
478        assert_eq!(h.rtt_ms, 0);
479        assert_eq!(h.rtt_variance_ms, 0);
480        assert_eq!(h.packet_loss_percent, 0);
481        assert_eq!(h.state, ConnectionState::Healthy);
482    }
483
484    // --- TransportError ---
485
486    #[test]
487    fn test_transport_error_display() {
488        assert_eq!(
489            TransportError::ConnectionFailed("timeout".into()).to_string(),
490            "Connection failed: timeout"
491        );
492        assert_eq!(
493            TransportError::PeerNotFound("node-123".into()).to_string(),
494            "Peer not found: node-123"
495        );
496        assert_eq!(
497            TransportError::AlreadyConnected("node-1".into()).to_string(),
498            "Already connected: node-1"
499        );
500        assert_eq!(
501            TransportError::NotStarted.to_string(),
502            "Transport not started"
503        );
504    }
505
506    #[test]
507    fn test_transport_error_other() {
508        let inner = std::io::Error::new(std::io::ErrorKind::Other, "boom");
509        let err = TransportError::Other(Box::new(inner));
510        assert!(err.to_string().contains("boom"));
511    }
512
513    #[test]
514    fn test_transport_error_source() {
515        use std::error::Error;
516
517        let err = TransportError::NotStarted;
518        assert!(err.source().is_none());
519
520        let inner = std::io::Error::new(std::io::ErrorKind::Other, "boom");
521        let err = TransportError::Other(Box::new(inner));
522        assert!(err.source().is_some());
523    }
524
525    // --- PeerEvent construction ---
526
527    #[test]
528    fn test_peer_event_connected() {
529        let evt = PeerEvent::Connected {
530            peer_id: NodeId::new("p1".into()),
531            connected_at: Instant::now(),
532        };
533        if let PeerEvent::Connected { peer_id, .. } = evt {
534            assert_eq!(peer_id.as_str(), "p1");
535        }
536    }
537
538    #[test]
539    fn test_peer_event_disconnected() {
540        let evt = PeerEvent::Disconnected {
541            peer_id: NodeId::new("p1".into()),
542            reason: DisconnectReason::Timeout,
543            connection_duration: std::time::Duration::from_secs(60),
544        };
545        if let PeerEvent::Disconnected {
546            reason,
547            connection_duration,
548            ..
549        } = evt
550        {
551            assert_eq!(reason, DisconnectReason::Timeout);
552            assert_eq!(connection_duration.as_secs(), 60);
553        }
554    }
555
556    #[test]
557    fn test_peer_event_degraded() {
558        let evt = PeerEvent::Degraded {
559            peer_id: NodeId::new("p1".into()),
560            health: ConnectionHealth::default(),
561        };
562        if let PeerEvent::Degraded { health, .. } = evt {
563            assert_eq!(health.state, ConnectionState::Healthy);
564        }
565    }
566
567    #[test]
568    fn test_peer_event_reconnecting() {
569        let evt = PeerEvent::Reconnecting {
570            peer_id: NodeId::new("p1".into()),
571            attempt: 3,
572            max_attempts: Some(5),
573        };
574        if let PeerEvent::Reconnecting {
575            attempt,
576            max_attempts,
577            ..
578        } = evt
579        {
580            assert_eq!(attempt, 3);
581            assert_eq!(max_attempts, Some(5));
582        }
583    }
584
585    #[test]
586    fn test_peer_event_reconnect_failed() {
587        let evt = PeerEvent::ReconnectFailed {
588            peer_id: NodeId::new("p1".into()),
589            attempt: 5,
590            error: "timeout".into(),
591            will_retry: false,
592        };
593        if let PeerEvent::ReconnectFailed {
594            will_retry, error, ..
595        } = evt
596        {
597            assert!(!will_retry);
598            assert_eq!(error, "timeout");
599        }
600    }
601
602    // --- Trait default implementations ---
603
604    struct TestConnection {
605        pid: NodeId,
606        alive: bool,
607    }
608
609    impl MeshConnection for TestConnection {
610        fn peer_id(&self) -> &NodeId {
611            &self.pid
612        }
613        fn is_alive(&self) -> bool {
614            self.alive
615        }
616        fn connected_at(&self) -> Instant {
617            Instant::now()
618        }
619    }
620
621    #[test]
622    fn test_mesh_connection_disconnect_reason_alive() {
623        let conn = TestConnection {
624            pid: NodeId::new("p".into()),
625            alive: true,
626        };
627        assert!(conn.disconnect_reason().is_none());
628    }
629
630    #[test]
631    fn test_mesh_connection_disconnect_reason_dead() {
632        let conn = TestConnection {
633            pid: NodeId::new("p".into()),
634            alive: false,
635        };
636        assert_eq!(conn.disconnect_reason(), Some(DisconnectReason::Unknown));
637    }
638
639    // --- MeshTransport::send_to default ---
640
641    struct MinimalTransport;
642
643    #[async_trait::async_trait]
644    impl MeshTransport for MinimalTransport {
645        async fn start(&self) -> Result<()> {
646            Ok(())
647        }
648        async fn stop(&self) -> Result<()> {
649            Ok(())
650        }
651        async fn connect(&self, _: &NodeId) -> Result<Box<dyn MeshConnection>> {
652            Err(TransportError::NotStarted)
653        }
654        async fn disconnect(&self, _: &NodeId) -> Result<()> {
655            Ok(())
656        }
657        fn get_connection(&self, _: &NodeId) -> Option<Box<dyn MeshConnection>> {
658            None
659        }
660        fn peer_count(&self) -> usize {
661            0
662        }
663        fn connected_peers(&self) -> Vec<NodeId> {
664            vec![]
665        }
666        fn subscribe_peer_events(&self) -> PeerEventReceiver {
667            let (_tx, rx) = tokio::sync::mpsc::channel(1);
668            rx
669        }
670    }
671
672    #[tokio::test]
673    async fn test_send_to_default_returns_error() {
674        let transport = MinimalTransport;
675        let peer = NodeId::new("peer-1".into());
676        let result = transport.send_to(&peer, b"hello").await;
677        assert!(result.is_err());
678        let err = result.unwrap_err();
679        assert!(
680            matches!(err, TransportError::ConnectionFailed(msg) if msg.contains("send not implemented"))
681        );
682    }
683}