1pub mod tcp;
8pub mod tor;
9pub mod udp;
10
11#[cfg(feature = "sim-transport")]
12pub mod sim;
13
14#[cfg(any(target_os = "linux", target_os = "macos"))]
15pub mod ethernet;
16
17#[cfg(target_os = "linux")]
18pub mod ble;
19
20#[cfg(target_os = "linux")]
21use ble::DefaultBleTransport;
22#[cfg(any(target_os = "linux", target_os = "macos"))]
23use ethernet::EthernetTransport;
24use secp256k1::XOnlyPublicKey;
25#[cfg(feature = "sim-transport")]
26use sim::SimTransport;
27use std::fmt;
28use std::net::SocketAddr;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30use tcp::TcpTransport;
31use thiserror::Error;
32use tor::TorTransport;
33use tor::control::TorMonitoringInfo;
34use udp::UdpTransport;
35
36#[derive(Clone, Debug)]
42pub struct ReceivedPacket {
43 pub transport_id: TransportId,
45 pub remote_addr: TransportAddr,
47 pub data: Vec<u8>,
49 pub timestamp_ms: u64,
51 #[doc(hidden)]
53 pub trace_enqueued_at: Option<Instant>,
54}
55
56impl ReceivedPacket {
57 pub fn new(transport_id: TransportId, remote_addr: TransportAddr, data: Vec<u8>) -> Self {
59 let timestamp_ms = SystemTime::now()
60 .duration_since(UNIX_EPOCH)
61 .map(|d| d.as_millis() as u64)
62 .unwrap_or(0);
63 Self {
64 transport_id,
65 remote_addr,
66 data,
67 timestamp_ms,
68 trace_enqueued_at: crate::perf_profile::stamp(),
69 }
70 }
71
72 pub fn with_timestamp(
74 transport_id: TransportId,
75 remote_addr: TransportAddr,
76 data: Vec<u8>,
77 timestamp_ms: u64,
78 ) -> Self {
79 Self {
80 transport_id,
81 remote_addr,
82 data,
83 timestamp_ms,
84 trace_enqueued_at: crate::perf_profile::stamp(),
85 }
86 }
87}
88
89pub type PacketTx = tokio::sync::mpsc::UnboundedSender<ReceivedPacket>;
107
108pub type PacketRx = tokio::sync::mpsc::UnboundedReceiver<ReceivedPacket>;
110
111pub fn packet_channel(_buffer: usize) -> (PacketTx, PacketRx) {
118 tokio::sync::mpsc::unbounded_channel()
119}
120
121#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
127pub struct TransportId(u32);
128
129impl TransportId {
130 pub fn new(id: u32) -> Self {
132 Self(id)
133 }
134
135 pub fn as_u32(&self) -> u32 {
137 self.0
138 }
139}
140
141impl fmt::Display for TransportId {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 write!(f, "transport:{}", self.0)
144 }
145}
146
147#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
149pub struct LinkId(u64);
150
151impl LinkId {
152 pub fn new(id: u64) -> Self {
154 Self(id)
155 }
156
157 pub fn as_u64(&self) -> u64 {
159 self.0
160 }
161}
162
163impl fmt::Display for LinkId {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 write!(f, "link:{}", self.0)
166 }
167}
168
169#[derive(Debug, Error)]
175pub enum TransportError {
176 #[error("transport not started")]
177 NotStarted,
178
179 #[error("transport already started")]
180 AlreadyStarted,
181
182 #[error("transport failed to start: {0}")]
183 StartFailed(String),
184
185 #[error("transport shutdown failed: {0}")]
186 ShutdownFailed(String),
187
188 #[error("link failed: {0}")]
189 LinkFailed(String),
190
191 #[error("send failed: {0}")]
192 SendFailed(String),
193
194 #[error("receive failed: {0}")]
195 RecvFailed(String),
196
197 #[error("invalid transport address: {0}")]
198 InvalidAddress(String),
199
200 #[error("mtu exceeded: packet {packet_size} > mtu {mtu}")]
201 MtuExceeded { packet_size: usize, mtu: u16 },
202
203 #[error("transport timeout")]
204 Timeout,
205
206 #[error("connection refused")]
207 ConnectionRefused,
208
209 #[error("transport not supported: {0}")]
210 NotSupported(String),
211
212 #[error("io error: {0}")]
213 Io(#[from] std::io::Error),
214}
215
216#[derive(Clone, Debug, PartialEq, Eq)]
222pub struct TransportType {
223 pub name: &'static str,
225 pub connection_oriented: bool,
227 pub reliable: bool,
229}
230
231impl TransportType {
232 pub const UDP: TransportType = TransportType {
234 name: "udp",
235 connection_oriented: false,
236 reliable: false,
237 };
238
239 pub const TCP: TransportType = TransportType {
241 name: "tcp",
242 connection_oriented: true,
243 reliable: true,
244 };
245
246 pub const ETHERNET: TransportType = TransportType {
248 name: "ethernet",
249 connection_oriented: false,
250 reliable: false,
251 };
252
253 pub const WIFI: TransportType = TransportType {
255 name: "wifi",
256 connection_oriented: false,
257 reliable: false,
258 };
259
260 pub const TOR: TransportType = TransportType {
262 name: "tor",
263 connection_oriented: true,
264 reliable: true,
265 };
266
267 pub const SERIAL: TransportType = TransportType {
269 name: "serial",
270 connection_oriented: false,
271 reliable: true, };
273
274 pub const BLE: TransportType = TransportType {
276 name: "ble",
277 connection_oriented: true,
278 reliable: true, };
280
281 #[cfg(feature = "sim-transport")]
283 pub const SIM: TransportType = TransportType {
284 name: "sim",
285 connection_oriented: false,
286 reliable: false,
287 };
288
289 pub fn is_connectionless(&self) -> bool {
291 !self.connection_oriented
292 }
293}
294
295impl fmt::Display for TransportType {
296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297 write!(f, "{}", self.name)
298 }
299}
300
301#[derive(Clone, Copy, Debug, PartialEq, Eq)]
307pub enum TransportState {
308 Configured,
310 Starting,
312 Up,
314 Down,
316 Failed,
318}
319
320impl TransportState {
321 pub fn is_operational(&self) -> bool {
323 matches!(self, TransportState::Up)
324 }
325
326 pub fn can_start(&self) -> bool {
328 matches!(
329 self,
330 TransportState::Configured | TransportState::Down | TransportState::Failed
331 )
332 }
333
334 pub fn is_terminal(&self) -> bool {
336 matches!(self, TransportState::Failed)
337 }
338}
339
340impl fmt::Display for TransportState {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 let s = match self {
343 TransportState::Configured => "configured",
344 TransportState::Starting => "starting",
345 TransportState::Up => "up",
346 TransportState::Down => "down",
347 TransportState::Failed => "failed",
348 };
349 write!(f, "{}", s)
350 }
351}
352
353#[derive(Clone, Copy, Debug, PartialEq, Eq)]
359pub enum LinkState {
360 Connecting,
362 Connected,
364 Disconnected,
366 Failed,
368}
369
370impl LinkState {
371 pub fn is_operational(&self) -> bool {
373 matches!(self, LinkState::Connected)
374 }
375
376 pub fn is_terminal(&self) -> bool {
378 matches!(self, LinkState::Disconnected | LinkState::Failed)
379 }
380}
381
382impl fmt::Display for LinkState {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 let s = match self {
385 LinkState::Connecting => "connecting",
386 LinkState::Connected => "connected",
387 LinkState::Disconnected => "disconnected",
388 LinkState::Failed => "failed",
389 };
390 write!(f, "{}", s)
391 }
392}
393
394#[derive(Clone, Copy, Debug, PartialEq, Eq)]
396pub enum LinkDirection {
397 Outbound,
399 Inbound,
401}
402
403impl fmt::Display for LinkDirection {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 let s = match self {
406 LinkDirection::Outbound => "outbound",
407 LinkDirection::Inbound => "inbound",
408 };
409 write!(f, "{}", s)
410 }
411}
412
413#[derive(Clone, PartialEq, Eq, Hash)]
423pub struct TransportAddr(Vec<u8>);
424
425impl TransportAddr {
426 pub fn new(bytes: Vec<u8>) -> Self {
428 Self(bytes)
429 }
430
431 pub fn from_bytes(bytes: &[u8]) -> Self {
433 Self(bytes.to_vec())
434 }
435
436 pub fn from_string(s: &str) -> Self {
438 Self(s.as_bytes().to_vec())
439 }
440
441 pub fn from_socket_addr(addr: std::net::SocketAddr) -> Self {
453 use std::io::Write;
454 let mut buf = Vec::with_capacity(56);
458 write!(&mut buf, "{addr}").expect("Vec<u8>::write_fmt is infallible");
462 Self(buf)
463 }
464
465 pub fn as_bytes(&self) -> &[u8] {
467 &self.0
468 }
469
470 pub fn as_str(&self) -> Option<&str> {
472 std::str::from_utf8(&self.0).ok()
473 }
474
475 pub fn len(&self) -> usize {
477 self.0.len()
478 }
479
480 pub fn is_empty(&self) -> bool {
482 self.0.is_empty()
483 }
484}
485
486impl fmt::Debug for TransportAddr {
487 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488 match self.as_str() {
489 Some(s) => write!(f, "TransportAddr(\"{}\")", s),
490 None => write!(f, "TransportAddr({:?})", self.0),
491 }
492 }
493}
494
495impl fmt::Display for TransportAddr {
496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497 match self.as_str() {
499 Some(s) => write!(f, "{}", s),
500 None => {
501 for byte in &self.0 {
502 write!(f, "{:02x}", byte)?;
503 }
504 Ok(())
505 }
506 }
507 }
508}
509
510impl From<&str> for TransportAddr {
511 fn from(s: &str) -> Self {
512 Self::from_string(s)
513 }
514}
515
516impl From<String> for TransportAddr {
517 fn from(s: String) -> Self {
518 Self(s.into_bytes())
519 }
520}
521
522#[derive(Clone, Debug, Default)]
528pub struct LinkStats {
529 pub packets_sent: u64,
531 pub packets_recv: u64,
533 pub bytes_sent: u64,
535 pub bytes_recv: u64,
537 pub last_recv_ms: u64,
539 rtt_estimate: Option<Duration>,
541 pub loss_rate: f32,
543 pub throughput_estimate: u64,
545}
546
547impl LinkStats {
548 pub fn new() -> Self {
550 Self::default()
551 }
552
553 pub fn record_sent(&mut self, bytes: usize) {
555 self.packets_sent += 1;
556 self.bytes_sent += bytes as u64;
557 }
558
559 pub fn record_recv(&mut self, bytes: usize, timestamp_ms: u64) {
561 self.packets_recv += 1;
562 self.bytes_recv += bytes as u64;
563 self.last_recv_ms = timestamp_ms;
564 }
565
566 pub fn rtt_estimate(&self) -> Option<Duration> {
568 self.rtt_estimate
569 }
570
571 pub fn update_rtt(&mut self, rtt: Duration) {
575 match self.rtt_estimate {
576 Some(old_rtt) => {
577 let alpha = 0.2;
578 let new_rtt_nanos = (alpha * rtt.as_nanos() as f64
579 + (1.0 - alpha) * old_rtt.as_nanos() as f64)
580 as u64;
581 self.rtt_estimate = Some(Duration::from_nanos(new_rtt_nanos));
582 }
583 None => {
584 self.rtt_estimate = Some(rtt);
585 }
586 }
587 }
588
589 pub fn time_since_recv(&self, current_time_ms: u64) -> u64 {
591 if self.last_recv_ms == 0 {
592 return u64::MAX;
593 }
594 current_time_ms.saturating_sub(self.last_recv_ms)
595 }
596
597 pub fn reset(&mut self) {
599 *self = Self::default();
600 }
601}
602
603#[derive(Clone, Debug)]
609pub struct Link {
610 link_id: LinkId,
612 transport_id: TransportId,
614 remote_addr: TransportAddr,
616 direction: LinkDirection,
618 state: LinkState,
620 base_rtt: Duration,
622 stats: LinkStats,
624 created_at: u64,
626}
627
628impl Link {
629 pub fn new(
631 link_id: LinkId,
632 transport_id: TransportId,
633 remote_addr: TransportAddr,
634 direction: LinkDirection,
635 base_rtt: Duration,
636 ) -> Self {
637 Self {
638 link_id,
639 transport_id,
640 remote_addr,
641 direction,
642 state: LinkState::Connecting,
643 base_rtt,
644 stats: LinkStats::new(),
645 created_at: 0,
646 }
647 }
648
649 pub fn new_with_timestamp(
651 link_id: LinkId,
652 transport_id: TransportId,
653 remote_addr: TransportAddr,
654 direction: LinkDirection,
655 base_rtt: Duration,
656 created_at: u64,
657 ) -> Self {
658 let mut link = Self::new(link_id, transport_id, remote_addr, direction, base_rtt);
659 link.created_at = created_at;
660 link
661 }
662
663 pub fn connectionless(
668 link_id: LinkId,
669 transport_id: TransportId,
670 remote_addr: TransportAddr,
671 direction: LinkDirection,
672 base_rtt: Duration,
673 ) -> Self {
674 let mut link = Self::new(link_id, transport_id, remote_addr, direction, base_rtt);
675 link.state = LinkState::Connected;
676 link
677 }
678
679 pub fn link_id(&self) -> LinkId {
681 self.link_id
682 }
683
684 pub fn transport_id(&self) -> TransportId {
686 self.transport_id
687 }
688
689 pub fn remote_addr(&self) -> &TransportAddr {
691 &self.remote_addr
692 }
693
694 pub fn direction(&self) -> LinkDirection {
696 self.direction
697 }
698
699 pub fn state(&self) -> LinkState {
701 self.state
702 }
703
704 pub fn base_rtt(&self) -> Duration {
706 self.base_rtt
707 }
708
709 pub fn stats(&self) -> &LinkStats {
711 &self.stats
712 }
713
714 pub fn stats_mut(&mut self) -> &mut LinkStats {
716 &mut self.stats
717 }
718
719 pub fn created_at(&self) -> u64 {
721 self.created_at
722 }
723
724 pub fn set_created_at(&mut self, timestamp: u64) {
726 self.created_at = timestamp;
727 }
728
729 pub fn set_connected(&mut self) {
731 self.state = LinkState::Connected;
732 }
733
734 pub fn set_disconnected(&mut self) {
736 self.state = LinkState::Disconnected;
737 }
738
739 pub fn set_failed(&mut self) {
741 self.state = LinkState::Failed;
742 }
743
744 pub fn is_operational(&self) -> bool {
746 self.state.is_operational()
747 }
748
749 pub fn is_terminal(&self) -> bool {
751 self.state.is_terminal()
752 }
753
754 pub fn effective_rtt(&self) -> Duration {
756 self.stats.rtt_estimate().unwrap_or(self.base_rtt)
757 }
758
759 pub fn age(&self, current_time_ms: u64) -> u64 {
761 if self.created_at == 0 {
762 return 0;
763 }
764 current_time_ms.saturating_sub(self.created_at)
765 }
766}
767
768#[derive(Clone, Debug)]
774pub struct DiscoveredPeer {
775 pub transport_id: TransportId,
777 pub addr: TransportAddr,
779 pub pubkey_hint: Option<XOnlyPublicKey>,
781}
782
783impl DiscoveredPeer {
784 pub fn new(transport_id: TransportId, addr: TransportAddr) -> Self {
786 Self {
787 transport_id,
788 addr,
789 pubkey_hint: None,
790 }
791 }
792
793 pub fn with_hint(
795 transport_id: TransportId,
796 addr: TransportAddr,
797 pubkey: XOnlyPublicKey,
798 ) -> Self {
799 Self {
800 transport_id,
801 addr,
802 pubkey_hint: Some(pubkey),
803 }
804 }
805}
806
807pub trait Transport {
816 fn transport_id(&self) -> TransportId;
818
819 fn transport_type(&self) -> &TransportType;
821
822 fn state(&self) -> TransportState;
824
825 fn mtu(&self) -> u16;
827
828 fn link_mtu(&self, addr: &TransportAddr) -> u16 {
834 let _ = addr;
835 self.mtu()
836 }
837
838 fn start(&mut self) -> Result<(), TransportError>;
840
841 fn stop(&mut self) -> Result<(), TransportError>;
843
844 fn send(&self, addr: &TransportAddr, data: &[u8]) -> Result<(), TransportError>;
846
847 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError>;
849
850 fn auto_connect(&self) -> bool {
853 false
854 }
855
856 fn accept_connections(&self) -> bool {
859 true
860 }
861
862 fn close_connection(&self, _addr: &TransportAddr) {
868 }
870}
871
872#[derive(Clone, Debug, PartialEq, Eq)]
881pub enum ConnectionState {
882 None,
884 Connecting,
886 Connected,
888 Failed(String),
890}
891
892#[derive(Clone, Debug, Default)]
901pub struct TransportCongestion {
902 pub recv_drops: Option<u64>,
905}
906
907pub enum TransportHandle {
916 Udp(UdpTransport),
918 #[cfg(feature = "sim-transport")]
920 Sim(SimTransport),
921 #[cfg(any(target_os = "linux", target_os = "macos"))]
923 Ethernet(EthernetTransport),
924 Tcp(TcpTransport),
926 Tor(TorTransport),
928 #[cfg(target_os = "linux")]
930 Ble(DefaultBleTransport),
931}
932
933impl TransportHandle {
934 pub async fn start(&mut self) -> Result<(), TransportError> {
936 match self {
937 TransportHandle::Udp(t) => t.start_async().await,
938 #[cfg(feature = "sim-transport")]
939 TransportHandle::Sim(t) => t.start_async().await,
940 #[cfg(any(target_os = "linux", target_os = "macos"))]
941 TransportHandle::Ethernet(t) => t.start_async().await,
942 TransportHandle::Tcp(t) => t.start_async().await,
943 TransportHandle::Tor(t) => t.start_async().await,
944 #[cfg(target_os = "linux")]
945 TransportHandle::Ble(t) => t.start_async().await,
946 }
947 }
948
949 pub async fn stop(&mut self) -> Result<(), TransportError> {
951 match self {
952 TransportHandle::Udp(t) => t.stop_async().await,
953 #[cfg(feature = "sim-transport")]
954 TransportHandle::Sim(t) => t.stop_async().await,
955 #[cfg(any(target_os = "linux", target_os = "macos"))]
956 TransportHandle::Ethernet(t) => t.stop_async().await,
957 TransportHandle::Tcp(t) => t.stop_async().await,
958 TransportHandle::Tor(t) => t.stop_async().await,
959 #[cfg(target_os = "linux")]
960 TransportHandle::Ble(t) => t.stop_async().await,
961 }
962 }
963
964 pub async fn send(&self, addr: &TransportAddr, data: &[u8]) -> Result<usize, TransportError> {
966 match self {
967 TransportHandle::Udp(t) => t.send_async(addr, data).await,
968 #[cfg(feature = "sim-transport")]
969 TransportHandle::Sim(t) => t.send_async(addr, data).await,
970 #[cfg(any(target_os = "linux", target_os = "macos"))]
971 TransportHandle::Ethernet(t) => t.send_async(addr, data).await,
972 TransportHandle::Tcp(t) => t.send_async(addr, data).await,
973 TransportHandle::Tor(t) => t.send_async(addr, data).await,
974 #[cfg(target_os = "linux")]
975 TransportHandle::Ble(t) => t.send_async(addr, data).await,
976 }
977 }
978
979 pub async fn flush_pending_send(&self) {
985 if let TransportHandle::Udp(t) = self {
986 t.flush_pending_send().await;
987 }
988 }
989
990 pub fn transport_id(&self) -> TransportId {
992 match self {
993 TransportHandle::Udp(t) => t.transport_id(),
994 #[cfg(feature = "sim-transport")]
995 TransportHandle::Sim(t) => t.transport_id(),
996 #[cfg(any(target_os = "linux", target_os = "macos"))]
997 TransportHandle::Ethernet(t) => t.transport_id(),
998 TransportHandle::Tcp(t) => t.transport_id(),
999 TransportHandle::Tor(t) => t.transport_id(),
1000 #[cfg(target_os = "linux")]
1001 TransportHandle::Ble(t) => t.transport_id(),
1002 }
1003 }
1004
1005 pub fn name(&self) -> Option<&str> {
1007 match self {
1008 TransportHandle::Udp(t) => t.name(),
1009 #[cfg(feature = "sim-transport")]
1010 TransportHandle::Sim(t) => t.name(),
1011 #[cfg(any(target_os = "linux", target_os = "macos"))]
1012 TransportHandle::Ethernet(t) => t.name(),
1013 TransportHandle::Tcp(t) => t.name(),
1014 TransportHandle::Tor(t) => t.name(),
1015 #[cfg(target_os = "linux")]
1016 TransportHandle::Ble(t) => t.name(),
1017 }
1018 }
1019
1020 pub fn transport_type(&self) -> &TransportType {
1022 match self {
1023 TransportHandle::Udp(t) => t.transport_type(),
1024 #[cfg(feature = "sim-transport")]
1025 TransportHandle::Sim(t) => t.transport_type(),
1026 #[cfg(any(target_os = "linux", target_os = "macos"))]
1027 TransportHandle::Ethernet(t) => t.transport_type(),
1028 TransportHandle::Tcp(t) => t.transport_type(),
1029 TransportHandle::Tor(t) => t.transport_type(),
1030 #[cfg(target_os = "linux")]
1031 TransportHandle::Ble(t) => t.transport_type(),
1032 }
1033 }
1034
1035 pub fn state(&self) -> TransportState {
1037 match self {
1038 TransportHandle::Udp(t) => t.state(),
1039 #[cfg(feature = "sim-transport")]
1040 TransportHandle::Sim(t) => t.state(),
1041 #[cfg(any(target_os = "linux", target_os = "macos"))]
1042 TransportHandle::Ethernet(t) => t.state(),
1043 TransportHandle::Tcp(t) => t.state(),
1044 TransportHandle::Tor(t) => t.state(),
1045 #[cfg(target_os = "linux")]
1046 TransportHandle::Ble(t) => t.state(),
1047 }
1048 }
1049
1050 pub fn mtu(&self) -> u16 {
1052 match self {
1053 TransportHandle::Udp(t) => t.mtu(),
1054 #[cfg(feature = "sim-transport")]
1055 TransportHandle::Sim(t) => t.mtu(),
1056 #[cfg(any(target_os = "linux", target_os = "macos"))]
1057 TransportHandle::Ethernet(t) => t.mtu(),
1058 TransportHandle::Tcp(t) => t.mtu(),
1059 TransportHandle::Tor(t) => t.mtu(),
1060 #[cfg(target_os = "linux")]
1061 TransportHandle::Ble(t) => t.mtu(),
1062 }
1063 }
1064
1065 pub fn link_mtu(&self, addr: &TransportAddr) -> u16 {
1070 match self {
1071 TransportHandle::Udp(t) => t.link_mtu(addr),
1072 #[cfg(feature = "sim-transport")]
1073 TransportHandle::Sim(t) => t.link_mtu(addr),
1074 #[cfg(any(target_os = "linux", target_os = "macos"))]
1075 TransportHandle::Ethernet(t) => t.link_mtu(addr),
1076 TransportHandle::Tcp(t) => t.link_mtu(addr),
1077 TransportHandle::Tor(t) => t.link_mtu(addr),
1078 #[cfg(target_os = "linux")]
1079 TransportHandle::Ble(t) => t.link_mtu(addr),
1080 }
1081 }
1082
1083 pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
1085 match self {
1086 TransportHandle::Udp(t) => t.local_addr(),
1087 #[cfg(feature = "sim-transport")]
1088 TransportHandle::Sim(_) => None,
1089 #[cfg(any(target_os = "linux", target_os = "macos"))]
1090 TransportHandle::Ethernet(_) => None,
1091 TransportHandle::Tcp(t) => t.local_addr(),
1092 TransportHandle::Tor(_) => None,
1093 #[cfg(target_os = "linux")]
1094 TransportHandle::Ble(_) => None,
1095 }
1096 }
1097
1098 pub fn interface_name(&self) -> Option<&str> {
1100 match self {
1101 TransportHandle::Udp(_) => None,
1102 #[cfg(feature = "sim-transport")]
1103 TransportHandle::Sim(_) => None,
1104 #[cfg(any(target_os = "linux", target_os = "macos"))]
1105 TransportHandle::Ethernet(t) => Some(t.interface_name()),
1106 TransportHandle::Tcp(_) => None,
1107 TransportHandle::Tor(_) => None,
1108 #[cfg(target_os = "linux")]
1109 TransportHandle::Ble(_) => None,
1110 }
1111 }
1112
1113 pub fn onion_address(&self) -> Option<&str> {
1115 match self {
1116 TransportHandle::Tor(t) => t.onion_address(),
1117 _ => None,
1118 }
1119 }
1120
1121 pub fn tor_monitoring(&self) -> Option<TorMonitoringInfo> {
1123 match self {
1124 TransportHandle::Tor(t) => t.cached_monitoring(),
1125 _ => None,
1126 }
1127 }
1128
1129 pub fn tor_mode(&self) -> Option<&str> {
1131 match self {
1132 TransportHandle::Tor(t) => Some(t.mode()),
1133 _ => None,
1134 }
1135 }
1136
1137 pub fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1139 match self {
1140 TransportHandle::Udp(t) => t.discover(),
1141 #[cfg(feature = "sim-transport")]
1142 TransportHandle::Sim(t) => t.discover(),
1143 #[cfg(any(target_os = "linux", target_os = "macos"))]
1144 TransportHandle::Ethernet(t) => t.discover(),
1145 TransportHandle::Tcp(t) => t.discover(),
1146 TransportHandle::Tor(t) => t.discover(),
1147 #[cfg(target_os = "linux")]
1148 TransportHandle::Ble(t) => t.discover(),
1149 }
1150 }
1151
1152 pub fn auto_connect(&self) -> bool {
1154 match self {
1155 TransportHandle::Udp(t) => t.auto_connect(),
1156 #[cfg(feature = "sim-transport")]
1157 TransportHandle::Sim(t) => t.auto_connect(),
1158 #[cfg(any(target_os = "linux", target_os = "macos"))]
1159 TransportHandle::Ethernet(t) => t.auto_connect(),
1160 TransportHandle::Tcp(t) => t.auto_connect(),
1161 TransportHandle::Tor(t) => t.auto_connect(),
1162 #[cfg(target_os = "linux")]
1163 TransportHandle::Ble(t) => t.auto_connect(),
1164 }
1165 }
1166
1167 pub fn accept_connections(&self) -> bool {
1169 match self {
1170 TransportHandle::Udp(t) => t.accept_connections(),
1171 #[cfg(feature = "sim-transport")]
1172 TransportHandle::Sim(t) => t.accept_connections(),
1173 #[cfg(any(target_os = "linux", target_os = "macos"))]
1174 TransportHandle::Ethernet(t) => t.accept_connections(),
1175 TransportHandle::Tcp(t) => t.accept_connections(),
1176 TransportHandle::Tor(t) => t.accept_connections(),
1177 #[cfg(target_os = "linux")]
1178 TransportHandle::Ble(t) => t.accept_connections(),
1179 }
1180 }
1181
1182 pub async fn connect(&self, addr: &TransportAddr) -> Result<(), TransportError> {
1190 match self {
1191 TransportHandle::Udp(_) => Ok(()), #[cfg(feature = "sim-transport")]
1193 TransportHandle::Sim(_) => Ok(()), #[cfg(any(target_os = "linux", target_os = "macos"))]
1195 TransportHandle::Ethernet(_) => Ok(()), TransportHandle::Tcp(t) => t.connect_async(addr).await,
1197 TransportHandle::Tor(t) => t.connect_async(addr).await,
1198 #[cfg(target_os = "linux")]
1199 TransportHandle::Ble(t) => t.connect_async(addr).await,
1200 }
1201 }
1202
1203 pub fn connection_state(&self, addr: &TransportAddr) -> ConnectionState {
1209 match self {
1210 TransportHandle::Udp(_) => ConnectionState::Connected,
1211 #[cfg(feature = "sim-transport")]
1212 TransportHandle::Sim(_) => ConnectionState::Connected,
1213 #[cfg(any(target_os = "linux", target_os = "macos"))]
1214 TransportHandle::Ethernet(_) => ConnectionState::Connected,
1215 TransportHandle::Tcp(t) => t.connection_state_sync(addr),
1216 TransportHandle::Tor(t) => t.connection_state_sync(addr),
1217 #[cfg(target_os = "linux")]
1218 TransportHandle::Ble(t) => t.connection_state_sync(addr),
1219 }
1220 }
1221
1222 pub async fn close_connection(&self, addr: &TransportAddr) {
1227 match self {
1228 TransportHandle::Udp(t) => t.close_connection(addr),
1229 #[cfg(feature = "sim-transport")]
1230 TransportHandle::Sim(t) => t.close_connection(addr),
1231 #[cfg(any(target_os = "linux", target_os = "macos"))]
1232 TransportHandle::Ethernet(t) => t.close_connection(addr),
1233 TransportHandle::Tcp(t) => t.close_connection_async(addr).await,
1234 TransportHandle::Tor(t) => t.close_connection_async(addr).await,
1235 #[cfg(target_os = "linux")]
1236 TransportHandle::Ble(t) => t.close_connection_async(addr).await,
1237 }
1238 }
1239
1240 pub fn is_operational(&self) -> bool {
1242 self.state().is_operational()
1243 }
1244
1245 pub fn congestion(&self) -> TransportCongestion {
1251 match self {
1252 TransportHandle::Udp(t) => t.congestion(),
1253 #[cfg(feature = "sim-transport")]
1254 TransportHandle::Sim(_) => TransportCongestion::default(),
1255 #[cfg(any(target_os = "linux", target_os = "macos"))]
1256 TransportHandle::Ethernet(_) => TransportCongestion::default(),
1257 TransportHandle::Tcp(_) => TransportCongestion::default(),
1258 TransportHandle::Tor(_) => TransportCongestion::default(),
1259 #[cfg(target_os = "linux")]
1260 TransportHandle::Ble(_) => TransportCongestion::default(),
1261 }
1262 }
1263
1264 pub fn transport_stats(&self) -> serde_json::Value {
1268 match self {
1269 TransportHandle::Udp(t) => {
1270 serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
1271 }
1272 #[cfg(feature = "sim-transport")]
1273 TransportHandle::Sim(t) => serde_json::to_value(t.stats()).unwrap_or_default(),
1274 #[cfg(any(target_os = "linux", target_os = "macos"))]
1275 TransportHandle::Ethernet(t) => {
1276 let snap = t.stats().snapshot();
1277 serde_json::json!({
1278 "frames_sent": snap.frames_sent,
1279 "frames_recv": snap.frames_recv,
1280 "bytes_sent": snap.bytes_sent,
1281 "bytes_recv": snap.bytes_recv,
1282 "send_errors": snap.send_errors,
1283 "recv_errors": snap.recv_errors,
1284 "beacons_sent": snap.beacons_sent,
1285 "beacons_recv": snap.beacons_recv,
1286 "frames_too_short": snap.frames_too_short,
1287 "frames_too_long": snap.frames_too_long,
1288 })
1289 }
1290 TransportHandle::Tcp(t) => {
1291 serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
1292 }
1293 TransportHandle::Tor(t) => {
1294 serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
1295 }
1296 #[cfg(target_os = "linux")]
1297 TransportHandle::Ble(t) => {
1298 serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
1299 }
1300 }
1301 }
1302}
1303
1304pub(crate) async fn resolve_socket_addr(
1315 addr: &TransportAddr,
1316) -> Result<SocketAddr, TransportError> {
1317 let s = addr
1318 .as_str()
1319 .ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?;
1320
1321 if let Ok(sock_addr) = s.parse::<SocketAddr>() {
1323 return Ok(sock_addr);
1324 }
1325
1326 tokio::net::lookup_host(s)
1328 .await
1329 .map_err(|e| {
1330 TransportError::InvalidAddress(format!("DNS resolution failed for {}: {}", s, e))
1331 })?
1332 .next()
1333 .ok_or_else(|| {
1334 TransportError::InvalidAddress(format!(
1335 "DNS resolution returned no addresses for {}",
1336 s
1337 ))
1338 })
1339}
1340
1341#[cfg(test)]
1346mod tests {
1347 use super::*;
1348
1349 #[test]
1350 fn test_transport_id() {
1351 let id = TransportId::new(42);
1352 assert_eq!(id.as_u32(), 42);
1353 assert_eq!(format!("{}", id), "transport:42");
1354 }
1355
1356 #[test]
1357 fn test_link_id() {
1358 let id = LinkId::new(12345);
1359 assert_eq!(id.as_u64(), 12345);
1360 assert_eq!(format!("{}", id), "link:12345");
1361 }
1362
1363 #[test]
1364 fn test_transport_state_transitions() {
1365 assert!(TransportState::Configured.can_start());
1366 assert!(TransportState::Down.can_start());
1367 assert!(TransportState::Failed.can_start());
1368 assert!(!TransportState::Starting.can_start());
1369 assert!(!TransportState::Up.can_start());
1370
1371 assert!(TransportState::Up.is_operational());
1372 assert!(!TransportState::Starting.is_operational());
1373 assert!(!TransportState::Failed.is_operational());
1374 }
1375
1376 #[test]
1377 fn test_link_state() {
1378 assert!(LinkState::Connected.is_operational());
1379 assert!(!LinkState::Connecting.is_operational());
1380 assert!(!LinkState::Disconnected.is_operational());
1381 assert!(!LinkState::Failed.is_operational());
1382
1383 assert!(LinkState::Disconnected.is_terminal());
1384 assert!(LinkState::Failed.is_terminal());
1385 assert!(!LinkState::Connected.is_terminal());
1386 }
1387
1388 #[test]
1389 #[allow(clippy::assertions_on_constants)]
1390 fn test_transport_type_constants() {
1391 assert!(!TransportType::UDP.connection_oriented);
1393 assert!(!TransportType::UDP.reliable);
1394 assert!(TransportType::UDP.is_connectionless());
1395
1396 assert!(TransportType::TOR.connection_oriented);
1397 assert!(TransportType::TOR.reliable);
1398 assert!(!TransportType::TOR.is_connectionless());
1399
1400 assert_eq!(TransportType::UDP.name, "udp");
1401 assert_eq!(TransportType::ETHERNET.name, "ethernet");
1402 }
1403
1404 #[test]
1405 fn test_transport_addr_string() {
1406 let addr = TransportAddr::from_string("192.168.1.1:2121");
1407 assert_eq!(format!("{}", addr), "192.168.1.1:2121");
1408 assert_eq!(addr.as_str(), Some("192.168.1.1:2121"));
1409 }
1410
1411 #[test]
1412 fn test_transport_addr_binary() {
1413 let binary = TransportAddr::new(vec![0xff, 0x80, 0x2b, 0x3c, 0x4d, 0x5e]);
1415 assert_eq!(format!("{}", binary), "ff802b3c4d5e");
1416 assert!(binary.as_str().is_none());
1417 assert_eq!(binary.len(), 6);
1418 }
1419
1420 #[test]
1421 fn test_transport_addr_from_string() {
1422 let addr: TransportAddr = "test:1234".into();
1423 assert_eq!(addr.as_str(), Some("test:1234"));
1424
1425 let addr2: TransportAddr = String::from("hello").into();
1426 assert_eq!(addr2.as_str(), Some("hello"));
1427 }
1428
1429 #[test]
1430 fn test_link_stats_basic() {
1431 let mut stats = LinkStats::new();
1432
1433 stats.record_sent(100);
1434 stats.record_recv(200, 1000);
1435
1436 assert_eq!(stats.packets_sent, 1);
1437 assert_eq!(stats.bytes_sent, 100);
1438 assert_eq!(stats.packets_recv, 1);
1439 assert_eq!(stats.bytes_recv, 200);
1440 assert_eq!(stats.last_recv_ms, 1000);
1441 }
1442
1443 #[test]
1444 fn test_link_stats_rtt() {
1445 let mut stats = LinkStats::new();
1446
1447 assert!(stats.rtt_estimate().is_none());
1448
1449 stats.update_rtt(Duration::from_millis(100));
1450 assert_eq!(stats.rtt_estimate(), Some(Duration::from_millis(100)));
1451
1452 stats.update_rtt(Duration::from_millis(200));
1454 let rtt = stats.rtt_estimate().unwrap();
1456 assert!(rtt.as_millis() >= 110 && rtt.as_millis() <= 130);
1457 }
1458
1459 #[test]
1460 fn test_link_stats_time_since_recv() {
1461 let mut stats = LinkStats::new();
1462
1463 assert_eq!(stats.time_since_recv(1000), u64::MAX);
1465
1466 stats.record_recv(100, 500);
1467 assert_eq!(stats.time_since_recv(1000), 500);
1468 assert_eq!(stats.time_since_recv(500), 0);
1469 }
1470
1471 #[test]
1472 fn test_link_creation() {
1473 let link = Link::new(
1474 LinkId::new(1),
1475 TransportId::new(1),
1476 TransportAddr::from_string("test"),
1477 LinkDirection::Outbound,
1478 Duration::from_millis(50),
1479 );
1480
1481 assert_eq!(link.state(), LinkState::Connecting);
1482 assert!(!link.is_operational());
1483 assert_eq!(link.direction(), LinkDirection::Outbound);
1484 }
1485
1486 #[test]
1487 fn test_link_connectionless() {
1488 let link = Link::connectionless(
1489 LinkId::new(1),
1490 TransportId::new(1),
1491 TransportAddr::from_string("test"),
1492 LinkDirection::Inbound,
1493 Duration::from_millis(5),
1494 );
1495
1496 assert_eq!(link.state(), LinkState::Connected);
1497 assert!(link.is_operational());
1498 }
1499
1500 #[test]
1501 fn test_link_state_changes() {
1502 let mut link = Link::new(
1503 LinkId::new(1),
1504 TransportId::new(1),
1505 TransportAddr::from_string("test"),
1506 LinkDirection::Outbound,
1507 Duration::from_millis(50),
1508 );
1509
1510 assert!(!link.is_operational());
1511
1512 link.set_connected();
1513 assert!(link.is_operational());
1514 assert!(!link.is_terminal());
1515
1516 link.set_disconnected();
1517 assert!(!link.is_operational());
1518 assert!(link.is_terminal());
1519 }
1520
1521 #[test]
1522 fn test_link_effective_rtt() {
1523 let mut link = Link::connectionless(
1524 LinkId::new(1),
1525 TransportId::new(1),
1526 TransportAddr::from_string("test"),
1527 LinkDirection::Inbound,
1528 Duration::from_millis(50),
1529 );
1530
1531 assert_eq!(link.effective_rtt(), Duration::from_millis(50));
1533
1534 link.stats_mut().update_rtt(Duration::from_millis(100));
1536 assert_eq!(link.effective_rtt(), Duration::from_millis(100));
1537 }
1538
1539 #[test]
1540 fn test_link_age() {
1541 let mut link = Link::new(
1542 LinkId::new(1),
1543 TransportId::new(1),
1544 TransportAddr::from_string("test"),
1545 LinkDirection::Outbound,
1546 Duration::from_millis(50),
1547 );
1548
1549 assert_eq!(link.age(1000), 0);
1551
1552 link.set_created_at(500);
1553 assert_eq!(link.age(1000), 500);
1554 assert_eq!(link.age(500), 0);
1555 }
1556
1557 #[test]
1558 fn test_discovered_peer() {
1559 let peer = DiscoveredPeer::new(
1560 TransportId::new(1),
1561 TransportAddr::from_string("192.168.1.1:2121"),
1562 );
1563
1564 assert_eq!(peer.transport_id, TransportId::new(1));
1565 assert!(peer.pubkey_hint.is_none());
1566 }
1567
1568 #[test]
1569 fn test_link_direction_display() {
1570 assert_eq!(format!("{}", LinkDirection::Outbound), "outbound");
1571 assert_eq!(format!("{}", LinkDirection::Inbound), "inbound");
1572 }
1573
1574 #[test]
1575 fn test_transport_state_display() {
1576 assert_eq!(format!("{}", TransportState::Up), "up");
1577 assert_eq!(format!("{}", TransportState::Failed), "failed");
1578 }
1579
1580 #[test]
1581 fn test_received_packet() {
1582 let packet = ReceivedPacket::new(
1583 TransportId::new(1),
1584 TransportAddr::from_string("192.168.1.1:2121"),
1585 vec![1, 2, 3, 4],
1586 );
1587
1588 assert_eq!(packet.transport_id, TransportId::new(1));
1589 assert_eq!(packet.data, vec![1, 2, 3, 4]);
1590 assert!(packet.timestamp_ms > 0);
1591 }
1592
1593 #[test]
1594 fn test_received_packet_with_timestamp() {
1595 let packet = ReceivedPacket::with_timestamp(
1596 TransportId::new(1),
1597 TransportAddr::from_string("test"),
1598 vec![5, 6],
1599 12345,
1600 );
1601
1602 assert_eq!(packet.timestamp_ms, 12345);
1603 }
1604
1605 #[tokio::test]
1606 async fn test_packet_channel() {
1607 let (tx, mut rx) = packet_channel(10);
1608
1609 let packet = ReceivedPacket::new(
1610 TransportId::new(1),
1611 TransportAddr::from_string("test"),
1612 vec![1, 2, 3],
1613 );
1614
1615 tx.send(packet.clone()).unwrap();
1616
1617 let received = rx.recv().await.unwrap();
1618 assert_eq!(received.data, vec![1, 2, 3]);
1619 }
1620
1621 struct MockTransport {
1627 id: TransportId,
1628 mtu_value: u16,
1629 }
1630
1631 impl MockTransport {
1632 fn new(mtu: u16) -> Self {
1633 Self {
1634 id: TransportId::new(99),
1635 mtu_value: mtu,
1636 }
1637 }
1638 }
1639
1640 impl Transport for MockTransport {
1641 fn transport_id(&self) -> TransportId {
1642 self.id
1643 }
1644 fn transport_type(&self) -> &TransportType {
1645 &TransportType::UDP
1646 }
1647 fn state(&self) -> TransportState {
1648 TransportState::Up
1649 }
1650 fn mtu(&self) -> u16 {
1651 self.mtu_value
1652 }
1653 fn start(&mut self) -> Result<(), TransportError> {
1654 Ok(())
1655 }
1656 fn stop(&mut self) -> Result<(), TransportError> {
1657 Ok(())
1658 }
1659 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
1660 Ok(())
1661 }
1662 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1663 Ok(vec![])
1664 }
1665 }
1666
1667 struct PerLinkMtuTransport {
1669 id: TransportId,
1670 default_mtu: u16,
1671 overrides: Vec<(TransportAddr, u16)>,
1673 }
1674
1675 impl PerLinkMtuTransport {
1676 fn new(default_mtu: u16, overrides: Vec<(TransportAddr, u16)>) -> Self {
1677 Self {
1678 id: TransportId::new(100),
1679 default_mtu,
1680 overrides,
1681 }
1682 }
1683 }
1684
1685 impl Transport for PerLinkMtuTransport {
1686 fn transport_id(&self) -> TransportId {
1687 self.id
1688 }
1689 fn transport_type(&self) -> &TransportType {
1690 &TransportType::UDP
1691 }
1692 fn state(&self) -> TransportState {
1693 TransportState::Up
1694 }
1695 fn mtu(&self) -> u16 {
1696 self.default_mtu
1697 }
1698 fn link_mtu(&self, addr: &TransportAddr) -> u16 {
1699 for (a, mtu) in &self.overrides {
1700 if a == addr {
1701 return *mtu;
1702 }
1703 }
1704 self.mtu()
1705 }
1706 fn start(&mut self) -> Result<(), TransportError> {
1707 Ok(())
1708 }
1709 fn stop(&mut self) -> Result<(), TransportError> {
1710 Ok(())
1711 }
1712 fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
1713 Ok(())
1714 }
1715 fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
1716 Ok(vec![])
1717 }
1718 }
1719
1720 #[test]
1721 fn test_link_mtu_default_falls_back_to_mtu() {
1722 let transport = MockTransport::new(1280);
1723 let addr = TransportAddr::from_string("192.168.1.1:2121");
1724
1725 assert_eq!(transport.link_mtu(&addr), 1280);
1727 assert_eq!(transport.link_mtu(&addr), transport.mtu());
1728
1729 let other_addr = TransportAddr::from_string("10.0.0.1:5000");
1731 assert_eq!(transport.link_mtu(&other_addr), 1280);
1732 }
1733
1734 #[test]
1735 fn test_link_mtu_per_link_override() {
1736 let addr_a = TransportAddr::from_string("192.168.1.1:2121");
1737 let addr_b = TransportAddr::from_string("10.0.0.1:5000");
1738 let addr_unknown = TransportAddr::from_string("172.16.0.1:6000");
1739
1740 let transport =
1741 PerLinkMtuTransport::new(1280, vec![(addr_a.clone(), 512), (addr_b.clone(), 247)]);
1742
1743 assert_eq!(transport.link_mtu(&addr_a), 512);
1745 assert_eq!(transport.link_mtu(&addr_b), 247);
1746
1747 assert_eq!(transport.link_mtu(&addr_unknown), 1280);
1749 assert_eq!(transport.mtu(), 1280);
1750 }
1751
1752 #[test]
1753 fn test_transport_handle_link_mtu_delegation() {
1754 use crate::config::UdpConfig;
1755 use crate::transport::udp::UdpTransport;
1756
1757 let config = UdpConfig::default();
1758 let expected_mtu = config.mtu();
1759 let (tx, _rx) = packet_channel(1);
1760 let transport = UdpTransport::new(TransportId::new(1), None, config, tx);
1761 let handle = TransportHandle::Udp(transport);
1762
1763 let addr = TransportAddr::from_string("192.168.1.1:2121");
1764
1765 assert_eq!(handle.link_mtu(&addr), expected_mtu);
1768 assert_eq!(handle.link_mtu(&addr), handle.mtu());
1769 }
1770}