1use std::net::SocketAddr;
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50
51use crate::bootstrap_cache::PeerCapabilities;
52use crate::crypto::pqc::types::{MlDsaPublicKey, MlDsaSecretKey};
53use tokio::sync::broadcast;
54use tracing::info;
55
56use crate::host_identity::HostIdentity;
57use crate::nat_traversal_api::PeerId;
58use crate::node_config::NodeConfig;
59use crate::node_event::NodeEvent;
60use crate::node_status::{NatType, NodeStatus};
61use crate::p2p_endpoint::{
62 ConnectionHealth, EndpointError, P2pEndpoint, P2pEvent, PeerConnection, PeerLifecycleEvent,
63};
64use crate::reachability::{DIRECT_REACHABILITY_TTL, socket_addr_scope};
65use crate::unified_config::P2pConfig;
66use crate::unified_config::load_or_generate_endpoint_keypair;
67
68#[derive(Debug, thiserror::Error)]
70pub enum NodeError {
71 #[error("Failed to create node: {0}")]
73 Creation(String),
74
75 #[error("Connection error: {0}")]
77 Connection(String),
78
79 #[error("Endpoint error: {0}")]
81 Endpoint(#[from] EndpointError),
82
83 #[error("Node is shutting down")]
85 ShuttingDown,
86}
87
88pub struct Node {
127 inner: Arc<P2pEndpoint>,
129
130 start_time: Instant,
132
133 event_tx: broadcast::Sender<NodeEvent>,
135}
136
137impl std::fmt::Debug for Node {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("Node")
140 .field("peer_id", &self.peer_id())
141 .field("local_addr", &self.local_addr())
142 .finish_non_exhaustive()
143 }
144}
145
146impl Node {
147 pub async fn new() -> Result<Self, NodeError> {
163 Self::with_config(NodeConfig::default()).await
164 }
165
166 pub async fn bind(addr: SocketAddr) -> Result<Self, NodeError> {
176 Self::with_config(NodeConfig::with_bind_addr(addr)).await
177 }
178
179 pub async fn with_peers(peers: Vec<SocketAddr>) -> Result<Self, NodeError> {
193 Self::with_config(NodeConfig::with_known_peers(peers)).await
194 }
195
196 pub async fn with_keypair(
209 public_key: MlDsaPublicKey,
210 secret_key: MlDsaSecretKey,
211 ) -> Result<Self, NodeError> {
212 Self::with_config(NodeConfig::with_keypair(public_key, secret_key)).await
213 }
214
215 pub async fn with_host_identity(
239 host: &HostIdentity,
240 network_id: &[u8],
241 storage_dir: impl AsRef<std::path::Path>,
242 ) -> Result<Self, NodeError> {
243 let (public_key, secret_key) =
244 load_or_generate_endpoint_keypair(host, network_id, storage_dir.as_ref()).map_err(
245 |e| NodeError::Creation(format!("Failed to load/generate keypair: {e}")),
246 )?;
247
248 Self::with_keypair(public_key, secret_key).await
249 }
250
251 pub async fn with_config(config: NodeConfig) -> Result<Self, NodeError> {
268 let mut p2p_config = P2pConfig::default();
270
271 p2p_config.transport_registry = config.build_transport_registry();
273
274 if let Some(bind_addr) = config.bind_addr {
275 p2p_config.bind_addr = Some(bind_addr.into());
276 }
277
278 p2p_config.known_peers = config.known_peers.into_iter().map(Into::into).collect();
279 p2p_config.keypair = config.keypair;
280
281 if let Some(capacity) = config.data_channel_capacity {
282 p2p_config.data_channel_capacity = capacity;
283 }
284 if let Some(streams) = config.max_concurrent_uni_streams {
285 p2p_config.max_concurrent_uni_streams = streams;
286 }
287
288 let (event_tx, _) = broadcast::channel(256);
290
291 let endpoint = P2pEndpoint::new(p2p_config)
293 .await
294 .map_err(NodeError::Endpoint)?;
295
296 info!("Node created with peer ID: {:?}", endpoint.peer_id());
297
298 let inner = Arc::new(endpoint);
299
300 Self::spawn_event_bridge(Arc::clone(&inner), event_tx.clone());
302
303 Ok(Self {
304 inner,
305 start_time: Instant::now(),
306 event_tx,
307 })
308 }
309
310 fn spawn_event_bridge(endpoint: Arc<P2pEndpoint>, event_tx: broadcast::Sender<NodeEvent>) {
312 let mut p2p_events = endpoint.subscribe();
313
314 tokio::spawn(async move {
315 loop {
316 match p2p_events.recv().await {
317 Ok(p2p_event) => {
318 if let Some(node_event) = Self::convert_event(p2p_event) {
319 let _ = event_tx.send(node_event);
321 }
322 }
323 Err(broadcast::error::RecvError::Closed) => {
324 break;
326 }
327 Err(broadcast::error::RecvError::Lagged(n)) => {
328 tracing::warn!("Event bridge lagged by {} events", n);
330 }
331 }
332 }
333 });
334 }
335
336 fn convert_event(p2p_event: P2pEvent) -> Option<NodeEvent> {
340 match p2p_event {
341 P2pEvent::PeerConnected {
342 peer_id,
343 addr,
344 side: _,
345 traversal_method,
346 } => Some(NodeEvent::PeerConnected {
347 peer_id,
348 addr,
349 method: traversal_method,
350 direct: traversal_method.is_direct(),
351 }),
352 P2pEvent::PeerDisconnected { peer_id, reason } => Some(NodeEvent::PeerDisconnected {
353 peer_id,
354 reason: reason.into(), }),
356 P2pEvent::ExternalAddressDiscovered { addr } => {
357 Some(NodeEvent::ExternalAddressDiscovered { addr })
358 }
359 P2pEvent::PortMappingEstablished { external_addr } => {
360 Some(NodeEvent::PortMappingEstablished { external_addr })
361 }
362 P2pEvent::PortMappingRenewed { external_addr } => {
363 Some(NodeEvent::PortMappingRenewed { external_addr })
364 }
365 P2pEvent::PortMappingAddressChanged {
366 previous_addr,
367 external_addr,
368 } => Some(NodeEvent::PortMappingAddressChanged {
369 previous_addr,
370 external_addr,
371 }),
372 P2pEvent::PortMappingFailed { error } => Some(NodeEvent::PortMappingFailed { error }),
373 P2pEvent::PortMappingRemoved { external_addr } => {
374 Some(NodeEvent::PortMappingRemoved { external_addr })
375 }
376 P2pEvent::DirectPathStatus { peer_id, status } => {
377 Some(NodeEvent::DirectPathStatus { peer_id, status })
378 }
379 P2pEvent::DataReceived { peer_id, bytes } => Some(NodeEvent::DataReceived {
380 peer_id,
381 stream_id: 0, bytes,
383 }),
384 P2pEvent::ConstrainedDataReceived {
385 remote_addr,
386 connection_id,
387 data,
388 } => {
389 let synthetic_peer_id = {
391 use std::collections::hash_map::DefaultHasher;
392 use std::hash::{Hash, Hasher};
393 let synthetic_addr = remote_addr.to_synthetic_socket_addr();
394 let mut hasher = DefaultHasher::new();
395 synthetic_addr.hash(&mut hasher);
396 let hash = hasher.finish();
397 let mut peer_id_bytes = [0u8; 32];
398 peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
399 PeerId(peer_id_bytes)
400 };
401 Some(NodeEvent::DataReceived {
402 peer_id: synthetic_peer_id,
403 stream_id: connection_id as u64,
404 bytes: data.len(),
405 })
406 }
407 P2pEvent::MdnsServiceAdvertised {
408 service,
409 namespace,
410 instance_fullname,
411 } => Some(NodeEvent::MdnsServiceAdvertised {
412 service,
413 namespace,
414 instance_fullname,
415 }),
416 P2pEvent::MdnsPeerDiscovered { peer } => Some(NodeEvent::MdnsPeerDiscovered { peer }),
417 P2pEvent::MdnsPeerUpdated { peer } => Some(NodeEvent::MdnsPeerUpdated { peer }),
418 P2pEvent::MdnsPeerRemoved { peer } => Some(NodeEvent::MdnsPeerRemoved { peer }),
419 P2pEvent::MdnsPeerEligible { peer } => Some(NodeEvent::MdnsPeerEligible { peer }),
420 P2pEvent::MdnsPeerIneligible { peer, reason } => {
421 Some(NodeEvent::MdnsPeerIneligible { peer, reason })
422 }
423 P2pEvent::MdnsPeerApprovalRequired { peer, reason } => {
424 Some(NodeEvent::MdnsPeerApprovalRequired { peer, reason })
425 }
426 P2pEvent::MdnsAutoConnectAttempted { peer, addresses } => {
427 Some(NodeEvent::MdnsAutoConnectAttempted { peer, addresses })
428 }
429 P2pEvent::MdnsAutoConnectSucceeded {
430 peer,
431 authenticated_peer_id,
432 remote_addr,
433 } => Some(NodeEvent::MdnsAutoConnectSucceeded {
434 peer,
435 authenticated_peer_id,
436 remote_addr,
437 }),
438 P2pEvent::MdnsAutoConnectFailed {
439 peer,
440 addresses,
441 error,
442 } => Some(NodeEvent::MdnsAutoConnectFailed {
443 peer,
444 addresses,
445 error,
446 }),
447 P2pEvent::NatTraversalProgress { .. }
449 | P2pEvent::BootstrapStatus { .. }
450 | P2pEvent::PeerAuthenticated { .. }
451 | P2pEvent::PeerAddressUpdated { .. }
452 | P2pEvent::RelayEstablished { .. } => None,
453 }
454 }
455
456 pub fn peer_id(&self) -> PeerId {
463 self.inner.peer_id()
464 }
465
466 pub fn local_addr(&self) -> Option<SocketAddr> {
470 self.inner.local_addr()
471 }
472
473 pub fn external_addr(&self) -> Option<SocketAddr> {
478 self.inner.external_addr()
479 }
480
481 pub fn direct_path_status(&self, peer_id: PeerId) -> Option<crate::DirectPathStatus> {
483 self.inner.direct_path_status(peer_id)
484 }
485
486 pub fn public_key_bytes(&self) -> &[u8] {
488 self.inner.public_key_bytes()
489 }
490
491 pub fn inner_endpoint(&self) -> &Arc<P2pEndpoint> {
493 &self.inner
494 }
495
496 pub fn transport_registry(&self) -> &crate::transport::TransportRegistry {
501 self.inner.transport_registry()
502 }
503
504 pub async fn connect_addr(&self, addr: SocketAddr) -> Result<PeerConnection, NodeError> {
511 self.inner
512 .connect_addr(addr)
513 .await
514 .map_err(NodeError::Endpoint)
515 }
516
517 pub async fn connect_peer(&self, peer_id: PeerId) -> Result<PeerConnection, NodeError> {
522 self.inner
523 .connect_peer(peer_id)
524 .await
525 .map_err(NodeError::Endpoint)
526 }
527
528 #[deprecated(note = "use connect_peer(peer_id) for the canonical peer-oriented API")]
533 pub async fn connect(&self, peer_id: PeerId) -> Result<PeerConnection, NodeError> {
534 self.connect_peer(peer_id).await
535 }
536
537 pub async fn connect_peer_with_addrs(
543 &self,
544 peer_id: PeerId,
545 addrs: Vec<SocketAddr>,
546 ) -> Result<PeerConnection, NodeError> {
547 self.inner
548 .connect_peer_with_addrs(peer_id, addrs)
549 .await
550 .map_err(NodeError::Endpoint)
551 }
552
553 pub async fn upsert_peer_hints(
558 &self,
559 peer_id: PeerId,
560 addrs: Vec<SocketAddr>,
561 capabilities: Option<PeerCapabilities>,
562 ) {
563 self.inner
564 .upsert_peer_hints(peer_id, addrs, capabilities)
565 .await;
566 }
567
568 pub async fn accept(&self) -> Option<PeerConnection> {
582 self.inner.accept().await
583 }
584
585 pub async fn add_peer(&self, addr: SocketAddr) {
590 self.inner.add_known_peer(addr).await;
591 }
592
593 pub async fn connect_known_peers(&self) -> Result<usize, NodeError> {
597 self.inner
598 .connect_known_peers()
599 .await
600 .map_err(NodeError::Endpoint)
601 }
602
603 pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), NodeError> {
605 self.inner
606 .disconnect(peer_id)
607 .await
608 .map_err(NodeError::Endpoint)
609 }
610
611 pub async fn connected_peers(&self) -> Vec<PeerConnection> {
613 self.inner.connected_peers().await
614 }
615
616 pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
618 self.inner.is_connected(peer_id).await
619 }
620
621 pub async fn connection_health(&self, peer_id: &PeerId) -> ConnectionHealth {
623 self.inner.connection_health(peer_id).await
624 }
625
626 pub fn subscribe_peer_events(
628 &self,
629 peer_id: &PeerId,
630 ) -> broadcast::Receiver<PeerLifecycleEvent> {
631 self.inner.subscribe_peer_events(peer_id)
632 }
633
634 pub fn subscribe_all_peer_events(&self) -> broadcast::Receiver<(PeerId, PeerLifecycleEvent)> {
636 self.inner.subscribe_all_peer_events()
637 }
638
639 pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), NodeError> {
643 self.inner
644 .send(peer_id, data)
645 .await
646 .map_err(NodeError::Endpoint)
647 }
648
649 pub async fn send_with_receive_ack(
651 &self,
652 peer_id: &PeerId,
653 data: &[u8],
654 timeout: Duration,
655 ) -> Result<(), NodeError> {
656 self.inner
657 .send_with_receive_ack(peer_id, data, timeout)
658 .await
659 .map_err(NodeError::Endpoint)
660 }
661
662 pub async fn probe_peer(
669 &self,
670 peer_id: &PeerId,
671 timeout: Duration,
672 ) -> Result<Duration, NodeError> {
673 self.inner
674 .probe_peer(peer_id, timeout)
675 .await
676 .map_err(NodeError::Endpoint)
677 }
678
679 pub async fn recv(&self) -> Result<(PeerId, Vec<u8>), NodeError> {
681 self.inner.recv().await.map_err(NodeError::Endpoint)
682 }
683
684 pub async fn status(&self) -> NodeStatus {
701 let stats = self.inner.stats().await;
702 let connected_peers = self.inner.connected_peers().await;
703
704 let nat_type = self.detect_nat_type(&stats);
708
709 let local_addr = self.local_addr();
712 let external_addr = self.external_addr();
713
714 let mut external_addrs = self.inner.all_external_addrs();
718 if let Some(addr) = external_addr {
720 if !external_addrs.contains(&addr) {
721 external_addrs.insert(0, addr);
722 }
723 }
724
725 let hole_punch_success_rate = if stats.nat_traversal_attempts > 0 {
727 stats.nat_traversal_successes as f64 / stats.nat_traversal_attempts as f64
728 } else {
729 0.0
730 };
731
732 let has_global_address = external_addrs
733 .iter()
734 .copied()
735 .chain(local_addr)
736 .any(|addr| {
737 socket_addr_scope(addr)
738 .is_some_and(|scope| scope == crate::ReachabilityScope::Global)
739 });
740 let port_mapping = self.inner.port_mapping_snapshot();
741 let mdns = self.inner.mdns_snapshot();
742
743 let fresh_scope = [
747 (
748 crate::ReachabilityScope::Global,
749 stats.last_direct_global_at,
750 ),
751 (
752 crate::ReachabilityScope::LocalNetwork,
753 stats.last_direct_local_at,
754 ),
755 (
756 crate::ReachabilityScope::Loopback,
757 stats.last_direct_loopback_at,
758 ),
759 ]
760 .into_iter()
761 .find_map(|(scope, seen)| {
762 seen.filter(|instant| instant.elapsed() <= DIRECT_REACHABILITY_TTL)
763 .map(|_| scope)
764 });
765 let can_receive_direct =
766 stats.active_direct_incoming_connections > 0 || fresh_scope.is_some();
767 let direct_reachability_scope = fresh_scope;
768
769 let runtime_assist = self.inner.runtime_assist_snapshot().await;
773 let relay_service_enabled = self.inner.relay_service_enabled();
774 let coordinator_service_enabled = self.inner.coordinator_service_enabled();
775 let bootstrap_service_enabled = self.inner.bootstrap_service_enabled();
776 let is_relaying = runtime_assist.active_relay_sessions > 0;
777 let relay_sessions = runtime_assist.active_relay_sessions;
778 let relay_bytes_forwarded = runtime_assist.relay_bytes_forwarded;
779 let is_coordinating = runtime_assist.successful_coordinations > 0;
780 let coordination_sessions =
781 usize::try_from(runtime_assist.successful_coordinations).unwrap_or(usize::MAX);
782
783 let mut total_rtt = Duration::ZERO;
785 let mut rtt_count = 0u32;
786 for peer in &connected_peers {
787 if let Some(metrics) = self.inner.connection_metrics(&peer.peer_id).await {
788 if let Some(rtt) = metrics.rtt {
789 total_rtt += rtt;
790 rtt_count += 1;
791 }
792 }
793 }
794 let avg_rtt = if rtt_count > 0 {
795 total_rtt / rtt_count
796 } else {
797 Duration::ZERO
798 };
799
800 NodeStatus {
801 peer_id: self.peer_id(),
802 local_addr: local_addr.unwrap_or_else(|| {
803 "0.0.0.0:0".parse().unwrap_or_else(|_| {
804 SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
805 })
806 }),
807 external_addrs,
808 nat_type,
809 can_receive_direct,
810 direct_reachability_scope,
811 has_global_address,
812 port_mapping_active: port_mapping.active,
813 port_mapping_addr: port_mapping.external_addr,
814 mdns_browsing: mdns.browsing,
815 mdns_advertising: mdns.advertising,
816 mdns_discovered_peers: mdns.discovered_peers.len(),
817 relay_service_enabled,
818 coordinator_service_enabled,
819 bootstrap_service_enabled,
820 connected_peers: connected_peers.len(),
821 active_connections: stats.active_connections,
822 pending_connections: 0, direct_connections: stats.direct_connections,
824 relayed_connections: stats.relayed_connections,
825 hole_punch_success_rate,
826 is_relaying,
827 relay_sessions,
828 relay_bytes_forwarded,
829 is_coordinating,
830 coordination_sessions,
831 avg_rtt,
832 uptime: self.start_time.elapsed(),
833 }
834 }
835
836 pub fn subscribe(&self) -> broadcast::Receiver<NodeEvent> {
857 self.event_tx.subscribe()
858 }
859
860 pub fn subscribe_raw(&self) -> broadcast::Receiver<P2pEvent> {
865 self.inner.subscribe()
866 }
867
868 pub async fn shutdown(self) {
874 self.inner.shutdown().await;
875 }
876
877 pub fn is_running(&self) -> bool {
879 self.inner.is_running()
880 }
881
882 fn detect_nat_type(&self, stats: &crate::p2p_endpoint::EndpointStats) -> NatType {
889 if stats.direct_connections > 0 && stats.relayed_connections == 0 {
892 return NatType::FullCone;
893 }
894
895 if stats.direct_connections > 0 && stats.relayed_connections > 0 {
896 return NatType::PortRestricted;
897 }
898
899 if stats.relayed_connections > stats.direct_connections {
900 return NatType::Symmetric;
901 }
902
903 NatType::Unknown
904 }
905}
906
907impl Clone for Node {
909 fn clone(&self) -> Self {
910 Self {
911 inner: Arc::clone(&self.inner),
912 start_time: self.start_time,
913 event_tx: self.event_tx.clone(),
914 }
915 }
916}
917
918#[cfg(test)]
919mod tests {
920 use super::*;
921 use crate::derive_peer_id_from_public_key;
922
923 #[tokio::test]
924 async fn test_node_new_default() {
925 let node = Node::new().await;
926 assert!(node.is_ok(), "Node::new() should succeed: {:?}", node.err());
927
928 let node = node.unwrap();
929 assert!(node.is_running());
930
931 let peer_id = node.peer_id();
933 assert_ne!(peer_id.0, [0u8; 32]);
934
935 node.shutdown().await;
936 }
937
938 #[tokio::test]
939 async fn test_node_bind() {
940 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
941 let node = Node::bind(addr).await;
942 assert!(node.is_ok(), "Node::bind() should succeed");
943
944 let node = node.unwrap();
945 assert!(node.local_addr().is_some());
946
947 node.shutdown().await;
948 }
949
950 #[tokio::test]
951 async fn test_node_with_peers() {
952 let peers = vec!["127.0.0.1:9000".parse().unwrap()];
953 let node = Node::with_peers(peers).await;
954 assert!(node.is_ok(), "Node::with_peers() should succeed");
955
956 node.unwrap().shutdown().await;
957 }
958
959 #[tokio::test]
960 async fn test_node_with_config() {
961 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
962 let config = NodeConfig::builder().bind_addr(addr).build();
963
964 let node = Node::with_config(config).await;
965 assert!(node.is_ok(), "Node::with_config() should succeed");
966
967 node.unwrap().shutdown().await;
968 }
969
970 #[tokio::test]
971 async fn test_node_status() {
972 let node = Node::new().await.unwrap();
973 let status = node.status().await;
974
975 assert_ne!(status.peer_id.0, [0u8; 32]);
977 assert_eq!(status.connected_peers, 0); assert!(!status.port_mapping_active);
979 assert_eq!(status.port_mapping_addr, None);
980 assert!(status.relay_service_enabled);
981 assert!(status.coordinator_service_enabled);
982 assert!(status.bootstrap_service_enabled);
983 assert!(!status.is_relaying);
984 assert!(!status.is_coordinating);
985
986 node.shutdown().await;
987 }
988
989 #[tokio::test]
990 async fn test_node_subscribe() {
991 let node = Node::new().await.unwrap();
992 let _events = node.subscribe();
993
994 node.shutdown().await;
996 }
997
998 #[tokio::test]
999 async fn test_node_is_clone() {
1000 let node1 = Node::new().await.unwrap();
1001 let node2 = node1.clone();
1002
1003 assert_eq!(node1.peer_id(), node2.peer_id());
1005
1006 node1.shutdown().await;
1007 }
1009
1010 #[tokio::test]
1011 async fn test_node_debug() {
1012 let node = Node::new().await.unwrap();
1013 let debug_str = format!("{:?}", node);
1014 assert!(debug_str.contains("Node"));
1015 assert!(debug_str.contains("peer_id"));
1016
1017 node.shutdown().await;
1018 }
1019
1020 #[tokio::test]
1021 async fn test_node_identity() {
1022 use crate::crypto::raw_public_keys::key_utils::derive_peer_id_from_key_bytes;
1023
1024 let node = Node::new().await.unwrap();
1025
1026 let peer_id = node.peer_id();
1028 let public_key = node.public_key_bytes();
1029
1030 let derived = derive_peer_id_from_key_bytes(public_key).unwrap();
1032 assert_eq!(peer_id, derived);
1033
1034 node.shutdown().await;
1035 }
1036
1037 #[tokio::test]
1038 async fn test_connected_peers_empty() {
1039 let node = Node::new().await.unwrap();
1040 let peers = node.connected_peers().await;
1041 assert!(peers.is_empty());
1042
1043 node.shutdown().await;
1044 }
1045
1046 #[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
1050 #[tokio::test]
1051 async fn test_connect_peer_with_addrs_uses_explicit_hint() {
1052 let listener = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1053 let dialer = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1054
1055 let listener_addr = listener.local_addr().expect("listener addr");
1056 let listener_addr = if listener_addr.ip().is_unspecified() {
1057 SocketAddr::new(
1058 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
1059 listener_addr.port(),
1060 )
1061 } else {
1062 listener_addr
1063 };
1064 let peer_conn = tokio::time::timeout(
1065 Duration::from_secs(30),
1066 dialer.connect_peer_with_addrs(listener.peer_id(), vec![listener_addr]),
1067 )
1068 .await
1069 .expect("connect should not time out")
1070 .expect("dialer should connect using explicit address hint");
1071 assert_eq!(peer_conn.peer_id, listener.peer_id());
1072
1073 let accepted = tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept())
1074 .await
1075 .expect("accept should complete")
1076 .expect("listener should accept");
1077 assert_eq!(accepted.peer_id, dialer.peer_id());
1078
1079 dialer.shutdown().await;
1080 listener.shutdown().await;
1081 }
1082
1083 #[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
1084 #[tokio::test]
1085 async fn test_connect_peer_uses_upserted_peer_hints() {
1086 let listener = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1087 let dialer = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1088
1089 let listener_addr = listener.local_addr().expect("listener addr");
1090 let listener_addr = if listener_addr.ip().is_unspecified() {
1091 SocketAddr::new(
1092 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
1093 listener_addr.port(),
1094 )
1095 } else {
1096 listener_addr
1097 };
1098
1099 dialer
1100 .upsert_peer_hints(listener.peer_id(), vec![listener_addr], None)
1101 .await;
1102
1103 let peer_conn = tokio::time::timeout(
1104 Duration::from_secs(30),
1105 dialer.connect_peer(listener.peer_id()),
1106 )
1107 .await
1108 .expect("connect should not time out")
1109 .expect("dialer should connect using upserted peer hints");
1110 assert_eq!(peer_conn.peer_id, listener.peer_id());
1111
1112 let accepted = tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept())
1113 .await
1114 .expect("accept should complete")
1115 .expect("listener should accept");
1116 assert_eq!(accepted.peer_id, dialer.peer_id());
1117
1118 dialer.shutdown().await;
1119 listener.shutdown().await;
1120 }
1121
1122 #[tokio::test]
1123 async fn test_node_error_types() {
1124 let err = NodeError::Creation("test".to_string());
1126 assert!(err.to_string().contains("test"));
1127
1128 let err = NodeError::Connection("connection failed".to_string());
1129 assert!(err.to_string().contains("connection"));
1130
1131 let err = NodeError::ShuttingDown;
1132 assert!(err.to_string().contains("shutting down"));
1133 }
1134
1135 #[tokio::test]
1136 async fn test_node_with_keypair_persistence() {
1137 use crate::crypto::raw_public_keys::key_utils::generate_ml_dsa_keypair;
1138
1139 let (public_key, secret_key) = generate_ml_dsa_keypair().unwrap();
1141 let expected_peer_id = derive_peer_id_from_public_key(&public_key);
1142 let expected_public_key_bytes = public_key.as_bytes().to_vec();
1143
1144 let node = Node::with_keypair(public_key, secret_key).await.unwrap();
1146
1147 assert_eq!(node.peer_id(), expected_peer_id);
1149 assert_eq!(node.public_key_bytes(), expected_public_key_bytes);
1150
1151 node.shutdown().await;
1152 }
1153
1154 #[tokio::test]
1155 async fn test_node_keypair_via_config() {
1156 use crate::crypto::raw_public_keys::key_utils::generate_ml_dsa_keypair;
1157
1158 let (public_key, secret_key) = generate_ml_dsa_keypair().unwrap();
1160 let expected_peer_id = derive_peer_id_from_public_key(&public_key);
1161 let expected_public_key_bytes = public_key.as_bytes().to_vec();
1162
1163 let config = NodeConfig::with_keypair(public_key, secret_key);
1165 let node = Node::with_config(config).await.unwrap();
1166
1167 assert_eq!(node.peer_id(), expected_peer_id);
1169 assert_eq!(node.public_key_bytes(), expected_public_key_bytes);
1170
1171 node.shutdown().await;
1172 }
1173
1174 #[tokio::test]
1175 async fn test_node_event_bridge_exists() {
1176 let node = Node::new().await.unwrap();
1177
1178 let mut events = node.subscribe();
1180
1181 assert!(events.try_recv().is_err()); node.shutdown().await;
1188 }
1189
1190 #[tokio::test]
1191 async fn test_node_with_host_identity() {
1192 use crate::host_identity::HostIdentity;
1193
1194 let temp_dir =
1196 std::env::temp_dir().join(format!("ant-quic-test-node-{}", std::process::id()));
1197 let _ = std::fs::create_dir_all(&temp_dir);
1198
1199 let host = HostIdentity::generate();
1201 let network_id = b"test-network";
1202
1203 let node1 = Node::with_host_identity(&host, network_id, &temp_dir)
1205 .await
1206 .unwrap();
1207 let peer_id_1 = node1.peer_id();
1208 let public_key_1 = node1.public_key_bytes().to_vec();
1209
1210 assert!(node1.is_running());
1212
1213 node1.shutdown().await;
1215
1216 let node2 = Node::with_host_identity(&host, network_id, &temp_dir)
1218 .await
1219 .unwrap();
1220 let peer_id_2 = node2.peer_id();
1221 let public_key_2 = node2.public_key_bytes().to_vec();
1222
1223 assert_eq!(peer_id_1, peer_id_2);
1225 assert_eq!(public_key_1, public_key_2);
1226
1227 node2.shutdown().await;
1228
1229 let _ = std::fs::remove_dir_all(&temp_dir);
1231 }
1232
1233 #[tokio::test]
1234 async fn test_node_host_identity_per_network_isolation() {
1235 use crate::host_identity::HostIdentity;
1236
1237 let temp_dir =
1239 std::env::temp_dir().join(format!("ant-quic-test-isolation-{}", std::process::id()));
1240 let _ = std::fs::create_dir_all(&temp_dir);
1241
1242 let host = HostIdentity::generate();
1244
1245 let node1 = Node::with_host_identity(&host, b"network-1", &temp_dir)
1247 .await
1248 .unwrap();
1249 let peer_id_1 = node1.peer_id();
1250
1251 let node2 = Node::with_host_identity(&host, b"network-2", &temp_dir)
1252 .await
1253 .unwrap();
1254 let peer_id_2 = node2.peer_id();
1255
1256 assert_ne!(peer_id_1, peer_id_2);
1258
1259 node1.shutdown().await;
1260 node2.shutdown().await;
1261
1262 let _ = std::fs::remove_dir_all(&temp_dir);
1264 }
1265}