1use either::Either;
2use libp2p::{
3 core::{
4 multiaddr::{Multiaddr, Protocol},
5 transport::{Boxed, MemoryTransport, Transport as LibP2PTransport},
6 upgrade::{self},
7 },
8 dcutr,
9 gossipsub::{
10 self, Config as GossipsubConfig, ConfigBuilder as GossipsubConfigBuilder, IdentTopic,
11 MessageAuthenticity, ValidationMode,
12 },
13 identify::{self},
14 identity::{self, Keypair},
15 kad::{self, store::MemoryStore, QueryResult},
16 mdns::{self},
17 noise,
18 ping::{self},
19 relay,
20 request_response::{self, ProtocolSupport},
21 swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
22 tcp, websocket, yamux, PeerId as LibP2PPeerId, StreamProtocol,
23};
24use void;
25
26#[derive(Debug)]
28pub enum NetworkBehaviourEvent {
29 Kademlia(kad::Event),
30 Gossipsub(gossipsub::Event),
31 Mdns(mdns::Event),
32 Ping(ping::Event),
33 Identify(identify::Event),
34 Relay(relay::Event),
35 Dcutr(dcutr::Event),
36 RequestResponse(request_response::Event<QuDagRequest, QuDagResponse>),
37}
38
39impl From<kad::Event> for NetworkBehaviourEvent {
41 fn from(event: kad::Event) -> Self {
42 NetworkBehaviourEvent::Kademlia(event)
43 }
44}
45
46impl From<gossipsub::Event> for NetworkBehaviourEvent {
47 fn from(event: gossipsub::Event) -> Self {
48 NetworkBehaviourEvent::Gossipsub(event)
49 }
50}
51
52impl From<mdns::Event> for NetworkBehaviourEvent {
53 fn from(event: mdns::Event) -> Self {
54 NetworkBehaviourEvent::Mdns(event)
55 }
56}
57
58impl From<Either<mdns::Event, void::Void>> for NetworkBehaviourEvent {
60 fn from(event: Either<mdns::Event, void::Void>) -> Self {
61 match event {
62 Either::Left(mdns_event) => NetworkBehaviourEvent::Mdns(mdns_event),
63 Either::Right(void) => match void {},
64 }
65 }
66}
67
68impl From<ping::Event> for NetworkBehaviourEvent {
69 fn from(event: ping::Event) -> Self {
70 NetworkBehaviourEvent::Ping(event)
71 }
72}
73
74impl From<identify::Event> for NetworkBehaviourEvent {
75 fn from(event: identify::Event) -> Self {
76 NetworkBehaviourEvent::Identify(event)
77 }
78}
79
80impl From<relay::Event> for NetworkBehaviourEvent {
81 fn from(event: relay::Event) -> Self {
82 NetworkBehaviourEvent::Relay(event)
83 }
84}
85
86impl From<dcutr::Event> for NetworkBehaviourEvent {
87 fn from(event: dcutr::Event) -> Self {
88 NetworkBehaviourEvent::Dcutr(event)
89 }
90}
91
92impl From<request_response::Event<QuDagRequest, QuDagResponse>> for NetworkBehaviourEvent {
93 fn from(event: request_response::Event<QuDagRequest, QuDagResponse>) -> Self {
94 NetworkBehaviourEvent::RequestResponse(event)
95 }
96}
97
98use chacha20poly1305::{
99 aead::{Aead, KeyInit},
100 ChaCha20Poly1305, Key, Nonce,
101};
102use futures::{channel::oneshot, prelude::*};
103use rand::{thread_rng, RngCore};
104use serde::{Deserialize, Serialize};
105use std::{
106 collections::{HashMap, HashSet},
107 error::Error,
108 sync::Arc,
109 time::Duration,
110};
111use tokio::sync::{mpsc, Mutex};
112use tracing::{debug, info, warn};
113
114use crate::routing::Router;
115use crate::types::{MessagePriority, NetworkMessage};
118
119#[derive(Debug, Clone)]
121pub struct NetworkConfig {
122 pub listen_addrs: Vec<String>,
124 pub bootstrap_peers: Vec<String>,
126 pub timeout: Duration,
128 pub max_connections: usize,
130 pub obfuscation_key: [u8; 32],
132 pub enable_mdns: bool,
134 pub enable_relay: bool,
136 pub enable_quic: bool,
138 pub enable_websocket: bool,
140 pub gossipsub_config: Option<GossipsubConfig>,
142 pub kad_replication_factor: usize,
144}
145
146impl Default for NetworkConfig {
147 fn default() -> Self {
148 let mut key = [0u8; 32];
149 thread_rng().fill_bytes(&mut key);
150
151 Self {
152 listen_addrs: vec![
153 "/ip4/0.0.0.0/tcp/0".to_string(),
154 "/ip6/::/tcp/0".to_string(),
155 ],
156 bootstrap_peers: vec![],
157 timeout: Duration::from_secs(20),
158 max_connections: 50,
159 obfuscation_key: key,
160 enable_mdns: true,
161 enable_relay: true,
162 enable_quic: false,
163 enable_websocket: true,
164 gossipsub_config: None,
165 kad_replication_factor: 20,
166 }
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172pub struct QuDagRequest {
173 pub request_id: String,
174 pub payload: Vec<u8>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
178pub struct QuDagResponse {
179 pub request_id: String,
180 pub payload: Vec<u8>,
181}
182
183#[derive(NetworkBehaviour)]
185#[behaviour(out_event = "NetworkBehaviourEvent")]
186pub struct NetworkBehaviourImpl {
187 pub kademlia: kad::Behaviour<MemoryStore>,
189 pub gossipsub: gossipsub::Behaviour,
191 pub mdns: Toggle<mdns::tokio::Behaviour>,
193 pub ping: ping::Behaviour,
195 pub identify: identify::Behaviour,
197 pub relay: relay::Behaviour,
199 pub dcutr: dcutr::Behaviour,
201 pub request_response: request_response::cbor::Behaviour<QuDagRequest, QuDagResponse>,
203}
204
205#[derive(Debug)]
207pub enum P2PCommand {
208 Subscribe {
210 topic: String,
211 response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
212 },
213 Unsubscribe {
215 topic: String,
216 response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
217 },
218 Publish {
220 topic: String,
221 data: Vec<u8>,
222 response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
223 },
224 SendRequest {
226 peer_id: LibP2PPeerId,
227 request: QuDagRequest,
228 response: oneshot::Sender<Result<QuDagResponse, Box<dyn Error + Send + Sync>>>,
229 },
230 Dial {
232 addr: Multiaddr,
233 response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
234 },
235 GetConnectedPeers {
237 response: oneshot::Sender<Vec<LibP2PPeerId>>,
238 },
239 GetLocalPeerId {
241 response: oneshot::Sender<LibP2PPeerId>,
242 },
243 GetListeners {
245 response: oneshot::Sender<Vec<Multiaddr>>,
246 },
247}
248
249#[derive(Debug)]
251pub enum P2PEvent {
252 PeerDiscovered(LibP2PPeerId),
254 PeerConnected(LibP2PPeerId),
256 PeerDisconnected(LibP2PPeerId),
258 MessageReceived {
260 peer_id: LibP2PPeerId,
261 topic: String,
262 data: Vec<u8>,
263 },
264 RequestReceived {
266 peer_id: LibP2PPeerId,
267 request: QuDagRequest,
268 channel: oneshot::Sender<QuDagResponse>,
269 },
270 ResponseReceived {
272 peer_id: LibP2PPeerId,
273 response: QuDagResponse,
274 },
275 RoutingTableUpdated,
277}
278
279pub struct P2PNode {
281 local_peer_id: LibP2PPeerId,
283 swarm: libp2p::Swarm<NetworkBehaviourImpl>,
285 router: Router,
287 cipher: ChaCha20Poly1305,
289 event_tx: mpsc::UnboundedSender<P2PEvent>,
291 command_rx: mpsc::UnboundedReceiver<P2PCommand>,
293 connected_peers: HashSet<LibP2PPeerId>,
295 pending_requests: HashMap<String, oneshot::Sender<QuDagResponse>>,
297 #[allow(dead_code)]
299 metrics: Option<()>, config: NetworkConfig,
302 }
305
306#[derive(Clone)]
308pub struct P2PHandle {
309 command_tx: mpsc::UnboundedSender<P2PCommand>,
311 event_rx: Arc<Mutex<mpsc::UnboundedReceiver<P2PEvent>>>,
313}
314
315impl P2PHandle {
316 pub async fn subscribe(&self, topic: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
318 let (tx, rx) = oneshot::channel();
319 self.command_tx
320 .send(P2PCommand::Subscribe {
321 topic: topic.to_string(),
322 response: tx,
323 })
324 .map_err(|_| "P2P node offline")?;
325 rx.await.map_err(|_| "Command failed")?
326 }
327
328 pub async fn unsubscribe(&self, topic: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
330 let (tx, rx) = oneshot::channel();
331 self.command_tx
332 .send(P2PCommand::Unsubscribe {
333 topic: topic.to_string(),
334 response: tx,
335 })
336 .map_err(|_| "P2P node offline")?;
337 rx.await.map_err(|_| "Command failed")?
338 }
339
340 pub async fn publish(
342 &self,
343 topic: &str,
344 data: Vec<u8>,
345 ) -> Result<(), Box<dyn Error + Send + Sync>> {
346 let (tx, rx) = oneshot::channel();
347 self.command_tx
348 .send(P2PCommand::Publish {
349 topic: topic.to_string(),
350 data,
351 response: tx,
352 })
353 .map_err(|_| "P2P node offline")?;
354 rx.await.map_err(|_| "Command failed")?
355 }
356
357 pub async fn send_request(
359 &self,
360 peer_id: LibP2PPeerId,
361 request: QuDagRequest,
362 ) -> Result<QuDagResponse, Box<dyn Error + Send + Sync>> {
363 let (tx, rx) = oneshot::channel();
364 self.command_tx
365 .send(P2PCommand::SendRequest {
366 peer_id,
367 request,
368 response: tx,
369 })
370 .map_err(|_| "P2P node offline")?;
371 rx.await.map_err(|_| "Command failed")?
372 }
373
374 pub async fn dial(&self, addr: Multiaddr) -> Result<(), Box<dyn Error + Send + Sync>> {
376 let (tx, rx) = oneshot::channel();
377 self.command_tx
378 .send(P2PCommand::Dial { addr, response: tx })
379 .map_err(|_| "P2P node offline")?;
380 rx.await.map_err(|_| "Command failed")?
381 }
382
383 pub async fn connected_peers(&self) -> Vec<LibP2PPeerId> {
385 let (tx, rx) = oneshot::channel();
386 if self
387 .command_tx
388 .send(P2PCommand::GetConnectedPeers { response: tx })
389 .is_ok()
390 {
391 rx.await.unwrap_or_default()
392 } else {
393 Vec::new()
394 }
395 }
396
397 pub async fn local_peer_id(&self) -> LibP2PPeerId {
399 let (tx, rx) = oneshot::channel();
400 if self
401 .command_tx
402 .send(P2PCommand::GetLocalPeerId { response: tx })
403 .is_ok()
404 {
405 rx.await.unwrap_or_else(|_| LibP2PPeerId::random())
406 } else {
407 LibP2PPeerId::random()
408 }
409 }
410
411 pub async fn listeners(&self) -> Vec<Multiaddr> {
413 let (tx, rx) = oneshot::channel();
414 if self
415 .command_tx
416 .send(P2PCommand::GetListeners { response: tx })
417 .is_ok()
418 {
419 rx.await.unwrap_or_default()
420 } else {
421 Vec::new()
422 }
423 }
424
425 pub async fn next_event(&self) -> Option<P2PEvent> {
427 let mut event_rx = self.event_rx.lock().await;
428 event_rx.recv().await
429 }
430}
431
432impl P2PNode {
433 pub async fn new(config: NetworkConfig) -> Result<(Self, P2PHandle), Box<dyn Error>> {
436 let local_key = identity::Keypair::generate_ed25519();
438 let local_peer_id = LibP2PPeerId::from(local_key.public());
439
440 info!("Local peer ID: {}", local_peer_id);
441
442 let transport = build_transport(&local_key, &config)?;
444
445 let store = MemoryStore::new(local_peer_id);
447 let mut kad_config = kad::Config::default();
448 kad_config.set_replication_factor(
449 std::num::NonZeroUsize::new(config.kad_replication_factor)
450 .expect("Replication factor must be > 0"),
451 );
452 let kademlia = kad::Behaviour::with_config(local_peer_id, store, kad_config);
453
454 let gossipsub_config = config.gossipsub_config.clone().unwrap_or_else(|| {
456 GossipsubConfigBuilder::default()
457 .heartbeat_interval(Duration::from_secs(10))
458 .validation_mode(ValidationMode::Strict)
459 .build()
460 .expect("Valid gossipsub config")
461 });
462
463 let gossipsub = gossipsub::Behaviour::new(
464 MessageAuthenticity::Signed(local_key.clone()),
465 gossipsub_config,
466 )?;
467
468 let mdns = if config.enable_mdns {
470 Toggle::from(Some(mdns::tokio::Behaviour::new(
471 mdns::Config::default(),
472 local_peer_id,
473 )?))
474 } else {
475 Toggle::from(None)
476 };
477
478 let ping = ping::Behaviour::new(ping::Config::new());
480 let identify = identify::Behaviour::new(identify::Config::new(
481 "/qudag/1.0.0".to_string(),
482 local_key.public(),
483 ));
484
485 let relay = relay::Behaviour::new(local_peer_id, Default::default());
486 let dcutr = dcutr::Behaviour::new(local_peer_id);
487
488 let protocols = std::iter::once((
490 StreamProtocol::new("/qudag/req/1.0.0"),
491 ProtocolSupport::Full,
492 ));
493 let request_response =
494 request_response::cbor::Behaviour::new(protocols, request_response::Config::default());
495
496 let behaviour = NetworkBehaviourImpl {
498 kademlia,
499 gossipsub,
500 mdns,
501 ping,
502 identify,
503 relay,
504 dcutr,
505 request_response,
506 };
507
508 let swarm = libp2p::Swarm::new(
510 transport,
511 behaviour,
512 local_peer_id,
513 libp2p::swarm::Config::with_tokio_executor(),
514 );
515
516 let (event_tx, event_rx) = mpsc::unbounded_channel();
518 let (command_tx, command_rx) = mpsc::unbounded_channel();
519 let (router_tx, _) = mpsc::channel(1024);
520 let router = Router::new(router_tx);
521
522 let cipher = ChaCha20Poly1305::new(Key::from_slice(&config.obfuscation_key));
524
525 let metrics = if std::env::var("QUDAG_METRICS").is_ok() {
527 Some(()) } else {
529 None
530 };
531
532 let handle = P2PHandle {
534 command_tx,
535 event_rx: Arc::new(Mutex::new(event_rx)),
536 };
537
538 let node = Self {
548 local_peer_id,
549 swarm,
550 router,
551 cipher,
552 event_tx,
553 command_rx,
554 connected_peers: HashSet::new(),
555 pending_requests: HashMap::new(),
556 metrics,
557 config,
558 };
560
561 Ok((node, handle))
562 }
563
564 pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
566 for addr_str in &self.config.listen_addrs {
568 let addr: Multiaddr = addr_str.parse()?;
569 self.swarm.listen_on(addr)?;
570 }
571
572 for peer_addr_str in &self.config.bootstrap_peers {
574 let peer_addr: Multiaddr = peer_addr_str.parse()?;
575 if let Some(peer_id) = extract_peer_id(&peer_addr) {
576 self.swarm
577 .behaviour_mut()
578 .kademlia
579 .add_address(&peer_id, peer_addr);
580 }
581 }
582
583 if let Err(e) = self.swarm.behaviour_mut().kademlia.bootstrap() {
585 warn!("Kademlia bootstrap failed: {}", e);
586 }
587
588 info!("P2P node started");
589 Ok(())
590 }
591
592 pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
594 loop {
595 tokio::select! {
596 swarm_event = self.swarm.next() => {
597 if let Some(event) = swarm_event {
598 self.handle_swarm_event(event).await?;
599 }
600 }
601 command = self.command_rx.recv() => {
602 if let Some(cmd) = command {
603 self.handle_command(cmd).await;
604 } else {
605 break;
607 }
608 }
609 }
610 }
611 Ok(())
612 }
613
614 async fn handle_swarm_event(
616 &mut self,
617 event: SwarmEvent<NetworkBehaviourEvent>,
618 ) -> Result<(), Box<dyn Error>> {
619 match event {
620 SwarmEvent::NewListenAddr { address, .. } => {
621 info!("Listening on {}", address);
622 }
623 SwarmEvent::ConnectionEstablished {
624 peer_id,
625 endpoint,
626 num_established,
627 ..
628 } => {
629 info!(
630 "Connection established with {} at {} ({} total connections)",
631 peer_id,
632 endpoint.get_remote_address(),
633 num_established
634 );
635 self.connected_peers.insert(peer_id);
636 self.event_tx.send(P2PEvent::PeerConnected(peer_id))?;
637
638 if let Ok(socket_addr) = endpoint.get_remote_address().to_string().parse() {
640 self.router
641 .add_discovered_peer(
642 peer_id,
643 crate::discovery::DiscoveredPeer::new(
644 peer_id,
645 socket_addr,
646 crate::discovery::DiscoveryMethod::Kademlia,
647 ),
648 )
649 .await;
650 }
651 }
652 SwarmEvent::ConnectionClosed {
653 peer_id,
654 num_established,
655 ..
656 } => {
657 info!(
658 "Connection closed with {} ({} remaining connections)",
659 peer_id, num_established
660 );
661 if num_established == 0 {
662 self.connected_peers.remove(&peer_id);
663 self.event_tx.send(P2PEvent::PeerDisconnected(peer_id))?;
664
665 self.router.remove_discovered_peer(peer_id).await;
667 }
668 }
669 SwarmEvent::Behaviour(behaviour_event) => {
670 self.handle_behaviour_event(behaviour_event).await?;
671 }
672 _ => {}
673 }
674 Ok(())
675 }
676
677 async fn handle_behaviour_event(
679 &mut self,
680 event: NetworkBehaviourEvent,
681 ) -> Result<(), Box<dyn Error>> {
682 match event {
683 NetworkBehaviourEvent::Kademlia(kad_event) => {
684 self.handle_kademlia_event(kad_event).await?;
685 }
686 NetworkBehaviourEvent::Gossipsub(gossipsub_event) => {
687 self.handle_gossipsub_event(gossipsub_event).await?;
688 }
689 NetworkBehaviourEvent::Mdns(mdns_event) => {
690 self.handle_mdns_event(mdns_event).await?;
691 }
692 NetworkBehaviourEvent::Ping(ping_event) => {
693 self.handle_ping_event(ping_event).await?;
694 }
695 NetworkBehaviourEvent::Identify(identify_event) => {
696 self.handle_identify_event(identify_event).await?;
697 }
698 NetworkBehaviourEvent::RequestResponse(req_res_event) => {
699 self.handle_request_response_event(req_res_event).await?;
700 }
701 NetworkBehaviourEvent::Relay(relay_event) => {
702 self.handle_relay_event(relay_event).await?;
703 }
704 NetworkBehaviourEvent::Dcutr(dcutr_event) => {
705 self.handle_dcutr_event(dcutr_event).await?;
706 }
707 }
708 Ok(())
709 }
710
711 async fn handle_kademlia_event(&mut self, event: kad::Event) -> Result<(), Box<dyn Error>> {
713 match event {
714 kad::Event::RoutingUpdated {
715 peer, addresses, ..
716 } => {
717 debug!("Kademlia routing updated for peer {}", peer);
718 for addr in addresses.iter() {
719 self.swarm
720 .behaviour_mut()
721 .kademlia
722 .add_address(&peer, addr.clone());
723 }
724 self.event_tx.send(P2PEvent::RoutingTableUpdated)?;
725 }
726 kad::Event::UnroutablePeer { peer } => {
727 warn!("Peer {} is unroutable", peer);
728 }
729 kad::Event::InboundRequest { request } => {
730 debug!("Kademlia inbound request: {:?}", request);
731 }
732 kad::Event::OutboundQueryProgressed { result, .. } => match result {
733 QueryResult::GetClosestPeers(result) => match result {
734 Ok(ok) => {
735 for peer in ok.peers {
736 debug!("Found closest peer: {}", peer);
737 self.event_tx.send(P2PEvent::PeerDiscovered(peer))?;
738 }
739 }
740 Err(e) => warn!("Get closest peers error: {:?}", e),
741 },
742 _ => {}
743 },
744 _ => {}
745 }
746 Ok(())
747 }
748
749 async fn handle_gossipsub_event(
751 &mut self,
752 event: gossipsub::Event,
753 ) -> Result<(), Box<dyn Error>> {
754 match event {
755 gossipsub::Event::Message {
756 propagation_source,
757 message,
758 ..
759 } => {
760 let topic = message.topic.to_string();
761 let data = message.data;
762
763 let decrypted_data = match self.deobfuscate_traffic(&data) {
765 Ok(d) => d,
766 Err(_) => data, };
768
769 self.event_tx.send(P2PEvent::MessageReceived {
770 peer_id: propagation_source,
771 topic,
772 data: decrypted_data,
773 })?;
774 }
775 gossipsub::Event::Subscribed { peer_id, topic } => {
776 debug!("Peer {} subscribed to topic {}", peer_id, topic);
777 }
778 gossipsub::Event::Unsubscribed { peer_id, topic } => {
779 debug!("Peer {} unsubscribed from topic {}", peer_id, topic);
780 }
781 _ => {}
782 }
783 Ok(())
784 }
785
786 async fn handle_mdns_event(&mut self, event: mdns::Event) -> Result<(), Box<dyn Error>> {
788 match event {
789 mdns::Event::Discovered(peers) => {
790 for (peer_id, addr) in peers {
791 debug!("MDNS discovered peer {} at {}", peer_id, addr);
792 self.swarm
793 .behaviour_mut()
794 .kademlia
795 .add_address(&peer_id, addr);
796 self.event_tx.send(P2PEvent::PeerDiscovered(peer_id))?;
797 }
798 }
799 mdns::Event::Expired(peers) => {
800 for (peer_id, _) in peers {
801 debug!("MDNS peer expired: {}", peer_id);
802 }
803 }
804 }
805 Ok(())
806 }
807
808 async fn handle_ping_event(&mut self, event: ping::Event) -> Result<(), Box<dyn Error>> {
810 match event.result {
811 Ok(duration) => {
812 debug!("Ping to {} successful: {:?}", event.peer, duration);
813 }
814 Err(e) => {
815 debug!("Ping to {} failed: {}", event.peer, e);
816 }
817 }
818 Ok(())
819 }
820
821 async fn handle_identify_event(
823 &mut self,
824 event: identify::Event,
825 ) -> Result<(), Box<dyn Error>> {
826 match event {
827 identify::Event::Received { peer_id, info } => {
828 debug!(
829 "Identified peer {}: protocols={:?}, agent={}",
830 peer_id, info.protocols, info.agent_version
831 );
832
833 for addr in info.listen_addrs {
835 self.swarm
836 .behaviour_mut()
837 .kademlia
838 .add_address(&peer_id, addr);
839 }
840 }
841 identify::Event::Sent { .. } => {}
842 identify::Event::Pushed { .. } => {}
843 identify::Event::Error { peer_id, error } => {
844 warn!("Identify error with {}: {}", peer_id, error);
845 }
846 }
847 Ok(())
848 }
849
850 async fn handle_relay_event(&mut self, event: relay::Event) -> Result<(), Box<dyn Error>> {
852 match event {
853 relay::Event::ReservationReqAccepted {
854 src_peer_id,
855 renewed,
856 ..
857 } => {
858 info!(
859 "Relay reservation accepted from peer {}: renewed={}",
860 src_peer_id, renewed
861 );
862 }
863 relay::Event::ReservationReqDenied { src_peer_id, .. } => {
864 warn!("Relay reservation denied by peer {}", src_peer_id);
865 }
866 relay::Event::ReservationTimedOut { src_peer_id, .. } => {
867 warn!("Relay reservation timed out for peer {}", src_peer_id);
868 }
869 #[allow(deprecated)]
870 relay::Event::CircuitReqAcceptFailed {
871 src_peer_id,
872 dst_peer_id,
873 error,
874 } => {
875 warn!(
876 "Circuit request accept failed from {} to {}: {:?}",
877 src_peer_id, dst_peer_id, error
878 );
879 }
880 relay::Event::CircuitReqDenied {
881 src_peer_id,
882 dst_peer_id,
883 ..
884 } => {
885 warn!(
886 "Circuit request denied from {} to {}",
887 src_peer_id, dst_peer_id
888 );
889 }
890 relay::Event::CircuitClosed {
891 src_peer_id,
892 dst_peer_id,
893 error,
894 } => {
895 if let Some(error) = error {
896 warn!(
897 "Circuit closed between {} and {}: {:?}",
898 src_peer_id, dst_peer_id, error
899 );
900 } else {
901 debug!("Circuit closed between {} and {}", src_peer_id, dst_peer_id);
902 }
903 }
904 _ => {
906 debug!("Unhandled relay event: {:?}", event);
907 }
908 }
909 Ok(())
910 }
911
912 async fn handle_dcutr_event(&mut self, event: dcutr::Event) -> Result<(), Box<dyn Error>> {
914 match event {
915 dcutr::Event {
916 remote_peer_id,
917 result,
918 } => match result {
919 Ok(connection_id) => {
920 info!(
921 "Direct connection upgrade succeeded with peer {} (connection: {:?})",
922 remote_peer_id, connection_id
923 );
924 }
925 Err(error) => {
926 warn!(
927 "Direct connection upgrade failed with {}: {:?}",
928 remote_peer_id, error
929 );
930 }
931 },
932 }
933 Ok(())
934 }
935
936 async fn handle_request_response_event(
938 &mut self,
939 event: request_response::Event<QuDagRequest, QuDagResponse>,
940 ) -> Result<(), Box<dyn Error>> {
941 match event {
942 request_response::Event::Message { peer, message } => match message {
943 request_response::Message::Request {
944 request, channel, ..
945 } => {
946 let response = QuDagResponse {
948 request_id: request.request_id.clone(),
949 payload: vec![], };
951
952 self.swarm
954 .behaviour_mut()
955 .request_response
956 .send_response(channel, response)
957 .map_err(|_| "Failed to send response")?;
958
959 let (tx, _rx) = oneshot::channel();
961 self.event_tx.send(P2PEvent::RequestReceived {
962 peer_id: peer,
963 request,
964 channel: tx,
965 })?;
966 }
967 request_response::Message::Response {
968 request_id,
969 response,
970 } => {
971 if let Some(tx) = self.pending_requests.remove(&request_id.to_string()) {
972 let _ = tx.send(response);
973 }
974 }
975 },
976 request_response::Event::OutboundFailure {
977 peer,
978 request_id,
979 error,
980 } => {
981 warn!(
982 "Request to {} failed (id: {}): {:?}",
983 peer, request_id, error
984 );
985 self.pending_requests.remove(&request_id.to_string());
986 }
987 request_response::Event::InboundFailure {
988 peer,
989 request_id,
990 error,
991 } => {
992 warn!(
993 "Inbound request from {} failed (id: {}): {:?}",
994 peer, request_id, error
995 );
996 }
997 _ => {}
998 }
999 Ok(())
1000 }
1001
1002 async fn handle_command(&mut self, command: P2PCommand) {
1004 match command {
1005 P2PCommand::Subscribe { topic, response } => {
1006 let result = self.subscribe_internal(&topic).await;
1007 let _ = response.send(result);
1008 }
1009 P2PCommand::Unsubscribe { topic, response } => {
1010 let result = self.unsubscribe_internal(&topic).await;
1011 let _ = response.send(result);
1012 }
1013 P2PCommand::Publish {
1014 topic,
1015 data,
1016 response,
1017 } => {
1018 let result = self.publish_internal(&topic, data).await;
1019 let _ = response.send(result);
1020 }
1021 P2PCommand::SendRequest {
1022 peer_id,
1023 request,
1024 response,
1025 } => {
1026 let result = self.send_request_internal(peer_id, request).await;
1027 let _ = response.send(result);
1028 }
1029 P2PCommand::Dial { addr, response } => {
1030 let result = self.dial_internal(addr).await;
1031 let _ = response.send(result);
1032 }
1033 P2PCommand::GetConnectedPeers { response } => {
1034 let peers = self.connected_peers.iter().copied().collect();
1035 let _ = response.send(peers);
1036 }
1037 P2PCommand::GetLocalPeerId { response } => {
1038 let _ = response.send(self.local_peer_id);
1039 }
1040 P2PCommand::GetListeners { response } => {
1041 let listeners = self.swarm.listeners().cloned().collect();
1042 let _ = response.send(listeners);
1043 }
1044 }
1045 }
1046
1047 async fn subscribe_internal(
1049 &mut self,
1050 topic: &str,
1051 ) -> Result<(), Box<dyn Error + Send + Sync>> {
1052 let topic = IdentTopic::new(topic);
1053 self.swarm
1054 .behaviour_mut()
1055 .gossipsub
1056 .subscribe(&topic)
1057 .map_err(|e| format!("Subscribe error: {}", e))?;
1058 info!("Subscribed to topic: {}", topic);
1059 Ok(())
1060 }
1061
1062 async fn unsubscribe_internal(
1064 &mut self,
1065 topic: &str,
1066 ) -> Result<(), Box<dyn Error + Send + Sync>> {
1067 let topic = IdentTopic::new(topic);
1068 self.swarm
1069 .behaviour_mut()
1070 .gossipsub
1071 .unsubscribe(&topic)
1072 .map_err(|e| format!("Unsubscribe error: {}", e))?;
1073 info!("Unsubscribed from topic: {}", topic);
1074 Ok(())
1075 }
1076
1077 async fn publish_internal(
1079 &mut self,
1080 topic: &str,
1081 data: Vec<u8>,
1082 ) -> Result<(), Box<dyn Error + Send + Sync>> {
1083 let topic = IdentTopic::new(topic);
1084
1085 let message_data = self
1087 .obfuscate_traffic(&data)
1088 .map_err(|e| format!("Obfuscation error: {}", e))?;
1089
1090 self.swarm
1091 .behaviour_mut()
1092 .gossipsub
1093 .publish(topic.clone(), message_data)
1094 .map_err(|e| format!("Publish error: {}", e))?;
1095
1096 debug!("Published message to topic: {}", topic);
1097 Ok(())
1098 }
1099
1100 async fn send_request_internal(
1102 &mut self,
1103 peer_id: LibP2PPeerId,
1104 request: QuDagRequest,
1105 ) -> Result<QuDagResponse, Box<dyn Error + Send + Sync>> {
1106 let request_id = request.request_id.clone();
1107
1108 let network_message = NetworkMessage {
1110 id: request.request_id.clone(),
1111 source: vec![0], destination: vec![0], payload: request.payload.clone(),
1114 priority: MessagePriority::Normal,
1115 ttl: Duration::from_secs(60),
1116 };
1117
1118 let request = QuDagRequest {
1124 request_id: request_id.clone(),
1125 payload: bincode::serialize(&network_message)
1126 .map_err(|e| format!("Serialization error: {}", e))?,
1127 };
1128
1129 let (tx, rx) = oneshot::channel();
1131 self.pending_requests.insert(request_id.clone(), tx);
1132
1133 self.swarm
1134 .behaviour_mut()
1135 .request_response
1136 .send_request(&peer_id, request);
1137
1138 match tokio::time::timeout(self.config.timeout, rx).await {
1140 Ok(Ok(response)) => Ok(response),
1141 Ok(Err(_)) => Err("Response channel closed".into()),
1142 Err(_) => {
1143 self.pending_requests.remove(&request_id);
1144 Err("Request timeout".into())
1145 }
1146 }
1147 }
1148
1149 async fn dial_internal(
1151 &mut self,
1152 peer_addr: Multiaddr,
1153 ) -> Result<(), Box<dyn Error + Send + Sync>> {
1154 self.swarm
1155 .dial(peer_addr)
1156 .map_err(|e| format!("Dial error: {}", e))?;
1157 Ok(())
1158 }
1159
1160 fn obfuscate_traffic(&self, data: &[u8]) -> Result<Vec<u8>, Box<dyn Error>> {
1162 let mut nonce = [0u8; 12];
1163 thread_rng().fill_bytes(&mut nonce);
1164 let nonce = Nonce::from_slice(&nonce);
1165
1166 let mut encrypted = self
1167 .cipher
1168 .encrypt(nonce, data)
1169 .map_err(|e| format!("Encryption error: {}", e))?;
1170
1171 let mut result = nonce.to_vec();
1173 result.append(&mut encrypted);
1174 Ok(result)
1175 }
1176
1177 fn deobfuscate_traffic(&self, data: &[u8]) -> Result<Vec<u8>, Box<dyn Error>> {
1179 if data.len() < 12 {
1180 return Err("Data too short".into());
1181 }
1182
1183 let nonce = Nonce::from_slice(&data[..12]);
1184 let encrypted = &data[12..];
1185
1186 self.cipher
1187 .decrypt(nonce, encrypted)
1188 .map_err(|e| format!("Decryption error: {}", e).into())
1189 }
1190}
1191
1192fn build_transport(
1194 local_key: &Keypair,
1195 config: &NetworkConfig,
1196) -> Result<Boxed<(LibP2PPeerId, StreamMuxerBox)>, Box<dyn Error>> {
1197 let noise = noise::Config::new(local_key)?;
1198
1199 let yamux_config = yamux::Config::default();
1200
1201 let tcp = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true));
1203
1204 let memory = MemoryTransport::default();
1206
1207 let base_transport = tcp.or_transport(memory);
1209
1210 let transport: Boxed<(LibP2PPeerId, StreamMuxerBox)> = if config.enable_websocket {
1212 let ws = websocket::WsConfig::new(tcp::tokio::Transport::new(
1213 tcp::Config::default().nodelay(true),
1214 ));
1215 base_transport
1216 .or_transport(ws)
1217 .upgrade(upgrade::Version::V1)
1218 .authenticate(noise)
1219 .multiplex(yamux_config)
1220 .timeout(Duration::from_secs(20))
1221 .boxed()
1222 } else {
1223 base_transport
1224 .upgrade(upgrade::Version::V1)
1225 .authenticate(noise)
1226 .multiplex(yamux_config)
1227 .timeout(Duration::from_secs(20))
1228 .boxed()
1229 };
1230
1231 Ok(transport)
1232}
1233
1234fn extract_peer_id(addr: &Multiaddr) -> Option<LibP2PPeerId> {
1236 addr.iter().find_map(|p| match p {
1237 Protocol::P2p(peer_id) => Some(peer_id),
1238 _ => None,
1239 })
1240}
1241
1242type StreamMuxerBox = libp2p::core::muxing::StreamMuxerBox;
1244
1245#[allow(dead_code)]
1247type TransactionId = [u8; 12];
1248#[allow(dead_code)]
1249type Message = Vec<u8>;
1250
1251#[cfg(test)]
1252mod tests {
1253 use super::*;
1254
1255 #[tokio::test]
1256 async fn test_node_creation() {
1257 let config = NetworkConfig::default();
1258 let (_node, handle) = P2PNode::new(config).await.unwrap();
1259 let peer_id = handle.local_peer_id().await;
1260 assert!(!peer_id.to_string().is_empty());
1261 }
1262
1263 #[tokio::test]
1264 async fn test_traffic_obfuscation() {
1265 let config = NetworkConfig::default();
1266 let (node, _handle) = P2PNode::new(config).await.unwrap();
1267
1268 let test_data = b"test message";
1269 let obfuscated = node.obfuscate_traffic(test_data).unwrap();
1270 let deobfuscated = node.deobfuscate_traffic(&obfuscated).unwrap();
1271
1272 assert_eq!(test_data.to_vec(), deobfuscated);
1273 }
1274
1275 #[tokio::test]
1276 async fn test_node_start() {
1277 let mut config = NetworkConfig::default();
1278 config.listen_addrs = vec!["/ip4/127.0.0.1/tcp/0".to_string()];
1279 config.enable_mdns = false; let (mut node, handle) = P2PNode::new(config).await.unwrap();
1282 node.start().await.unwrap();
1283
1284 tokio::time::sleep(Duration::from_millis(100)).await;
1286
1287 let listeners = handle.listeners().await;
1288 assert!(!listeners.is_empty());
1289 }
1290
1291 #[tokio::test]
1292 async fn test_pubsub() {
1293 let config = NetworkConfig::default();
1294 let (_node, handle) = P2PNode::new(config).await.unwrap();
1295
1296 let topic = "test-topic";
1297 handle.subscribe(topic).await.unwrap();
1298
1299 let test_data = vec![1, 2, 3, 4, 5];
1300 handle.publish(topic, test_data).await.unwrap();
1301 }
1302}