qudag_network/
p2p.rs

1use either::Either;
2use libp2p::{
3    core::{
4        multiaddr::{Multiaddr, Protocol},
5        transport::{Boxed, MemoryTransport, Transport as LibP2PTransport},
6        upgrade::{self},
7    },
8    dcutr,
9    gossipsub::{
10        self, Config as GossipsubConfig, ConfigBuilder as GossipsubConfigBuilder, IdentTopic,
11        MessageAuthenticity, ValidationMode,
12    },
13    identify::{self},
14    identity::{self, Keypair},
15    kad::{self, store::MemoryStore, QueryResult},
16    mdns::{self},
17    noise,
18    ping::{self},
19    relay,
20    request_response::{self, ProtocolSupport},
21    swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
22    tcp, websocket, yamux, PeerId as LibP2PPeerId, StreamProtocol,
23};
24use void;
25
26/// Combined network behaviour event
27#[derive(Debug)]
28pub enum NetworkBehaviourEvent {
29    Kademlia(kad::Event),
30    Gossipsub(gossipsub::Event),
31    Mdns(mdns::Event),
32    Ping(ping::Event),
33    Identify(identify::Event),
34    Relay(relay::Event),
35    Dcutr(dcutr::Event),
36    RequestResponse(request_response::Event<QuDagRequest, QuDagResponse>),
37}
38
39// Implement From traits for all event types
40impl From<kad::Event> for NetworkBehaviourEvent {
41    fn from(event: kad::Event) -> Self {
42        NetworkBehaviourEvent::Kademlia(event)
43    }
44}
45
46impl From<gossipsub::Event> for NetworkBehaviourEvent {
47    fn from(event: gossipsub::Event) -> Self {
48        NetworkBehaviourEvent::Gossipsub(event)
49    }
50}
51
52impl From<mdns::Event> for NetworkBehaviourEvent {
53    fn from(event: mdns::Event) -> Self {
54        NetworkBehaviourEvent::Mdns(event)
55    }
56}
57
58// Handle Toggle<T> event conversion for MDNS
59impl From<Either<mdns::Event, void::Void>> for NetworkBehaviourEvent {
60    fn from(event: Either<mdns::Event, void::Void>) -> Self {
61        match event {
62            Either::Left(mdns_event) => NetworkBehaviourEvent::Mdns(mdns_event),
63            Either::Right(void) => match void {},
64        }
65    }
66}
67
68impl From<ping::Event> for NetworkBehaviourEvent {
69    fn from(event: ping::Event) -> Self {
70        NetworkBehaviourEvent::Ping(event)
71    }
72}
73
74impl From<identify::Event> for NetworkBehaviourEvent {
75    fn from(event: identify::Event) -> Self {
76        NetworkBehaviourEvent::Identify(event)
77    }
78}
79
80impl From<relay::Event> for NetworkBehaviourEvent {
81    fn from(event: relay::Event) -> Self {
82        NetworkBehaviourEvent::Relay(event)
83    }
84}
85
86impl From<dcutr::Event> for NetworkBehaviourEvent {
87    fn from(event: dcutr::Event) -> Self {
88        NetworkBehaviourEvent::Dcutr(event)
89    }
90}
91
92impl From<request_response::Event<QuDagRequest, QuDagResponse>> for NetworkBehaviourEvent {
93    fn from(event: request_response::Event<QuDagRequest, QuDagResponse>) -> Self {
94        NetworkBehaviourEvent::RequestResponse(event)
95    }
96}
97
98use chacha20poly1305::{
99    aead::{Aead, KeyInit},
100    ChaCha20Poly1305, Key, Nonce,
101};
102use futures::{channel::oneshot, prelude::*};
103use rand::{thread_rng, RngCore};
104use serde::{Deserialize, Serialize};
105use std::{
106    collections::{HashMap, HashSet},
107    error::Error,
108    sync::Arc,
109    time::Duration,
110};
111use tokio::sync::{mpsc, Mutex};
112use tracing::{debug, info, warn};
113
114use crate::routing::Router;
115// Optimization features disabled for initial release
116// use crate::optimized::message_chunking::{MessageChunker, ChunkerConfig, ChunkedMessage};
117use crate::types::{MessagePriority, NetworkMessage};
118
119/// Configuration for the P2P network node
120#[derive(Debug, Clone)]
121pub struct NetworkConfig {
122    /// Local listening addresses
123    pub listen_addrs: Vec<String>,
124    /// Bootstrap peer addresses
125    pub bootstrap_peers: Vec<String>,
126    /// Connection timeout
127    pub timeout: Duration,
128    /// Maximum number of concurrent connections
129    pub max_connections: usize,
130    /// Traffic obfuscation key
131    pub obfuscation_key: [u8; 32],
132    /// Enable MDNS for local peer discovery
133    pub enable_mdns: bool,
134    /// Enable relay for NAT traversal
135    pub enable_relay: bool,
136    /// Enable QUIC transport
137    pub enable_quic: bool,
138    /// Enable WebSocket transport
139    pub enable_websocket: bool,
140    /// Gossipsub configuration
141    pub gossipsub_config: Option<GossipsubConfig>,
142    /// Kademlia replication factor
143    pub kad_replication_factor: usize,
144}
145
146impl Default for NetworkConfig {
147    fn default() -> Self {
148        let mut key = [0u8; 32];
149        thread_rng().fill_bytes(&mut key);
150
151        Self {
152            listen_addrs: vec![
153                "/ip4/0.0.0.0/tcp/0".to_string(),
154                "/ip6/::/tcp/0".to_string(),
155            ],
156            bootstrap_peers: vec![],
157            timeout: Duration::from_secs(20),
158            max_connections: 50,
159            obfuscation_key: key,
160            enable_mdns: true,
161            enable_relay: true,
162            enable_quic: false,
163            enable_websocket: true,
164            gossipsub_config: None,
165            kad_replication_factor: 20,
166        }
167    }
168}
169
170/// Request-response protocol for custom messages
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172pub struct QuDagRequest {
173    pub request_id: String,
174    pub payload: Vec<u8>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
178pub struct QuDagResponse {
179    pub request_id: String,
180    pub payload: Vec<u8>,
181}
182
183/// Combined network behaviour for the P2P node
184#[derive(NetworkBehaviour)]
185#[behaviour(out_event = "NetworkBehaviourEvent")]
186pub struct NetworkBehaviourImpl {
187    /// Kademlia DHT for peer discovery and content routing
188    pub kademlia: kad::Behaviour<MemoryStore>,
189    /// Gossipsub for pub/sub messaging
190    pub gossipsub: gossipsub::Behaviour,
191    /// MDNS for local peer discovery
192    pub mdns: Toggle<mdns::tokio::Behaviour>,
193    /// Ping for keep-alive and latency measurement
194    pub ping: ping::Behaviour,
195    /// Identify protocol for peer identification
196    pub identify: identify::Behaviour,
197    /// Relay for NAT traversal
198    pub relay: relay::Behaviour,
199    /// Direct connection upgrade through relay
200    pub dcutr: dcutr::Behaviour,
201    /// Request-response protocol for custom messages
202    pub request_response: request_response::cbor::Behaviour<QuDagRequest, QuDagResponse>,
203}
204
205/// Commands that can be sent to the P2P node
206#[derive(Debug)]
207pub enum P2PCommand {
208    /// Subscribe to a gossipsub topic
209    Subscribe {
210        topic: String,
211        response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
212    },
213    /// Unsubscribe from a gossipsub topic
214    Unsubscribe {
215        topic: String,
216        response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
217    },
218    /// Publish a message to a topic
219    Publish {
220        topic: String,
221        data: Vec<u8>,
222        response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
223    },
224    /// Send a request to a peer
225    SendRequest {
226        peer_id: LibP2PPeerId,
227        request: QuDagRequest,
228        response: oneshot::Sender<Result<QuDagResponse, Box<dyn Error + Send + Sync>>>,
229    },
230    /// Dial a peer
231    Dial {
232        addr: Multiaddr,
233        response: oneshot::Sender<Result<(), Box<dyn Error + Send + Sync>>>,
234    },
235    /// Get connected peers
236    GetConnectedPeers {
237        response: oneshot::Sender<Vec<LibP2PPeerId>>,
238    },
239    /// Get local peer ID
240    GetLocalPeerId {
241        response: oneshot::Sender<LibP2PPeerId>,
242    },
243    /// Get listeners
244    GetListeners {
245        response: oneshot::Sender<Vec<Multiaddr>>,
246    },
247}
248
249/// Events emitted by the P2P network
250#[derive(Debug)]
251pub enum P2PEvent {
252    /// New peer discovered
253    PeerDiscovered(LibP2PPeerId),
254    /// Peer connection established
255    PeerConnected(LibP2PPeerId),
256    /// Peer disconnected
257    PeerDisconnected(LibP2PPeerId),
258    /// Message received via gossipsub
259    MessageReceived {
260        peer_id: LibP2PPeerId,
261        topic: String,
262        data: Vec<u8>,
263    },
264    /// Request received
265    RequestReceived {
266        peer_id: LibP2PPeerId,
267        request: QuDagRequest,
268        channel: oneshot::Sender<QuDagResponse>,
269    },
270    /// Response received
271    ResponseReceived {
272        peer_id: LibP2PPeerId,
273        response: QuDagResponse,
274    },
275    /// Routing table updated
276    RoutingTableUpdated,
277}
278
279/// Main P2P network node implementation
280pub struct P2PNode {
281    /// Local peer ID
282    local_peer_id: LibP2PPeerId,
283    /// Swarm instance
284    swarm: libp2p::Swarm<NetworkBehaviourImpl>,
285    /// Router for message routing  
286    router: Router,
287    /// Traffic obfuscation cipher
288    cipher: ChaCha20Poly1305,
289    /// Event channel sender
290    event_tx: mpsc::UnboundedSender<P2PEvent>,
291    /// Command channel receiver
292    command_rx: mpsc::UnboundedReceiver<P2PCommand>,
293    /// Connected peers
294    connected_peers: HashSet<LibP2PPeerId>,
295    /// Pending requests
296    pending_requests: HashMap<String, oneshot::Sender<QuDagResponse>>,
297    /// Metrics recorder
298    #[allow(dead_code)]
299    metrics: Option<()>, // TODO: Use proper metrics type
300    /// Network configuration
301    config: NetworkConfig,
302    // Message chunker for large messages (disabled for initial release)
303    // message_chunker: MessageChunker,
304}
305
306/// Handle for sending commands to the P2P node
307#[derive(Clone)]
308pub struct P2PHandle {
309    /// Command channel sender
310    command_tx: mpsc::UnboundedSender<P2PCommand>,
311    /// Event channel receiver (cloned for each handle)
312    event_rx: Arc<Mutex<mpsc::UnboundedReceiver<P2PEvent>>>,
313}
314
315impl P2PHandle {
316    /// Subscribe to a gossipsub topic
317    pub async fn subscribe(&self, topic: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
318        let (tx, rx) = oneshot::channel();
319        self.command_tx
320            .send(P2PCommand::Subscribe {
321                topic: topic.to_string(),
322                response: tx,
323            })
324            .map_err(|_| "P2P node offline")?;
325        rx.await.map_err(|_| "Command failed")?
326    }
327
328    /// Unsubscribe from a gossipsub topic
329    pub async fn unsubscribe(&self, topic: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
330        let (tx, rx) = oneshot::channel();
331        self.command_tx
332            .send(P2PCommand::Unsubscribe {
333                topic: topic.to_string(),
334                response: tx,
335            })
336            .map_err(|_| "P2P node offline")?;
337        rx.await.map_err(|_| "Command failed")?
338    }
339
340    /// Publish a message to a topic
341    pub async fn publish(
342        &self,
343        topic: &str,
344        data: Vec<u8>,
345    ) -> Result<(), Box<dyn Error + Send + Sync>> {
346        let (tx, rx) = oneshot::channel();
347        self.command_tx
348            .send(P2PCommand::Publish {
349                topic: topic.to_string(),
350                data,
351                response: tx,
352            })
353            .map_err(|_| "P2P node offline")?;
354        rx.await.map_err(|_| "Command failed")?
355    }
356
357    /// Send a request to a peer
358    pub async fn send_request(
359        &self,
360        peer_id: LibP2PPeerId,
361        request: QuDagRequest,
362    ) -> Result<QuDagResponse, Box<dyn Error + Send + Sync>> {
363        let (tx, rx) = oneshot::channel();
364        self.command_tx
365            .send(P2PCommand::SendRequest {
366                peer_id,
367                request,
368                response: tx,
369            })
370            .map_err(|_| "P2P node offline")?;
371        rx.await.map_err(|_| "Command failed")?
372    }
373
374    /// Dial a peer
375    pub async fn dial(&self, addr: Multiaddr) -> Result<(), Box<dyn Error + Send + Sync>> {
376        let (tx, rx) = oneshot::channel();
377        self.command_tx
378            .send(P2PCommand::Dial { addr, response: tx })
379            .map_err(|_| "P2P node offline")?;
380        rx.await.map_err(|_| "Command failed")?
381    }
382
383    /// Get connected peers
384    pub async fn connected_peers(&self) -> Vec<LibP2PPeerId> {
385        let (tx, rx) = oneshot::channel();
386        if self
387            .command_tx
388            .send(P2PCommand::GetConnectedPeers { response: tx })
389            .is_ok()
390        {
391            rx.await.unwrap_or_default()
392        } else {
393            Vec::new()
394        }
395    }
396
397    /// Get local peer ID
398    pub async fn local_peer_id(&self) -> LibP2PPeerId {
399        let (tx, rx) = oneshot::channel();
400        if self
401            .command_tx
402            .send(P2PCommand::GetLocalPeerId { response: tx })
403            .is_ok()
404        {
405            rx.await.unwrap_or_else(|_| LibP2PPeerId::random())
406        } else {
407            LibP2PPeerId::random()
408        }
409    }
410
411    /// Get listeners
412    pub async fn listeners(&self) -> Vec<Multiaddr> {
413        let (tx, rx) = oneshot::channel();
414        if self
415            .command_tx
416            .send(P2PCommand::GetListeners { response: tx })
417            .is_ok()
418        {
419            rx.await.unwrap_or_default()
420        } else {
421            Vec::new()
422        }
423    }
424
425    /// Get the next network event
426    pub async fn next_event(&self) -> Option<P2PEvent> {
427        let mut event_rx = self.event_rx.lock().await;
428        event_rx.recv().await
429    }
430}
431
432impl P2PNode {
433    /// Creates a new P2P network node with the given configuration
434    /// Returns the node and a handle for sending commands
435    pub async fn new(config: NetworkConfig) -> Result<(Self, P2PHandle), Box<dyn Error>> {
436        // Generate node identity
437        let local_key = identity::Keypair::generate_ed25519();
438        let local_peer_id = LibP2PPeerId::from(local_key.public());
439
440        info!("Local peer ID: {}", local_peer_id);
441
442        // Build the transport
443        let transport = build_transport(&local_key, &config)?;
444
445        // Set up Kademlia DHT
446        let store = MemoryStore::new(local_peer_id);
447        let mut kad_config = kad::Config::default();
448        kad_config.set_replication_factor(
449            std::num::NonZeroUsize::new(config.kad_replication_factor)
450                .expect("Replication factor must be > 0"),
451        );
452        let kademlia = kad::Behaviour::with_config(local_peer_id, store, kad_config);
453
454        // Set up Gossipsub
455        let gossipsub_config = config.gossipsub_config.clone().unwrap_or_else(|| {
456            GossipsubConfigBuilder::default()
457                .heartbeat_interval(Duration::from_secs(10))
458                .validation_mode(ValidationMode::Strict)
459                .build()
460                .expect("Valid gossipsub config")
461        });
462
463        let gossipsub = gossipsub::Behaviour::new(
464            MessageAuthenticity::Signed(local_key.clone()),
465            gossipsub_config,
466        )?;
467
468        // Set up MDNS
469        let mdns = if config.enable_mdns {
470            Toggle::from(Some(mdns::tokio::Behaviour::new(
471                mdns::Config::default(),
472                local_peer_id,
473            )?))
474        } else {
475            Toggle::from(None)
476        };
477
478        // Set up other protocols
479        let ping = ping::Behaviour::new(ping::Config::new());
480        let identify = identify::Behaviour::new(identify::Config::new(
481            "/qudag/1.0.0".to_string(),
482            local_key.public(),
483        ));
484
485        let relay = relay::Behaviour::new(local_peer_id, Default::default());
486        let dcutr = dcutr::Behaviour::new(local_peer_id);
487
488        // Set up request-response protocol
489        let protocols = std::iter::once((
490            StreamProtocol::new("/qudag/req/1.0.0"),
491            ProtocolSupport::Full,
492        ));
493        let request_response =
494            request_response::cbor::Behaviour::new(protocols, request_response::Config::default());
495
496        // Create the network behaviour
497        let behaviour = NetworkBehaviourImpl {
498            kademlia,
499            gossipsub,
500            mdns,
501            ping,
502            identify,
503            relay,
504            dcutr,
505            request_response,
506        };
507
508        // Build the swarm
509        let swarm = libp2p::Swarm::new(
510            transport,
511            behaviour,
512            local_peer_id,
513            libp2p::swarm::Config::with_tokio_executor(),
514        );
515
516        // Set up channels and state
517        let (event_tx, event_rx) = mpsc::unbounded_channel();
518        let (command_tx, command_rx) = mpsc::unbounded_channel();
519        let (router_tx, _) = mpsc::channel(1024);
520        let router = Router::new(router_tx);
521
522        // Initialize traffic obfuscation
523        let cipher = ChaCha20Poly1305::new(Key::from_slice(&config.obfuscation_key));
524
525        // Initialize metrics if enabled
526        let metrics = if std::env::var("QUDAG_METRICS").is_ok() {
527            Some(()) // TODO: Initialize proper metrics
528        } else {
529            None
530        };
531
532        // Create the handle
533        let handle = P2PHandle {
534            command_tx,
535            event_rx: Arc::new(Mutex::new(event_rx)),
536        };
537
538        // Initialize message chunker (disabled for initial release)
539        // let chunker_config = ChunkerConfig {
540        //     max_chunk_size: 65536, // 64KB chunks
541        //     enable_compression: true,
542        //     compression_threshold: 1024, // Compress messages larger than 1KB
543        //     ..Default::default()
544        // };
545        // let message_chunker = MessageChunker::new(chunker_config);
546
547        let node = Self {
548            local_peer_id,
549            swarm,
550            router,
551            cipher,
552            event_tx,
553            command_rx,
554            connected_peers: HashSet::new(),
555            pending_requests: HashMap::new(),
556            metrics,
557            config,
558            // message_chunker,
559        };
560
561        Ok((node, handle))
562    }
563
564    /// Starts the network node and begins listening on configured addresses
565    pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
566        // Listen on all configured addresses
567        for addr_str in &self.config.listen_addrs {
568            let addr: Multiaddr = addr_str.parse()?;
569            self.swarm.listen_on(addr)?;
570        }
571
572        // Add bootstrap peers to Kademlia
573        for peer_addr_str in &self.config.bootstrap_peers {
574            let peer_addr: Multiaddr = peer_addr_str.parse()?;
575            if let Some(peer_id) = extract_peer_id(&peer_addr) {
576                self.swarm
577                    .behaviour_mut()
578                    .kademlia
579                    .add_address(&peer_id, peer_addr);
580            }
581        }
582
583        // Bootstrap Kademlia
584        if let Err(e) = self.swarm.behaviour_mut().kademlia.bootstrap() {
585            warn!("Kademlia bootstrap failed: {}", e);
586        }
587
588        info!("P2P node started");
589        Ok(())
590    }
591
592    /// Main event loop for the P2P node
593    pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
594        loop {
595            tokio::select! {
596                swarm_event = self.swarm.next() => {
597                    if let Some(event) = swarm_event {
598                        self.handle_swarm_event(event).await?;
599                    }
600                }
601                command = self.command_rx.recv() => {
602                    if let Some(cmd) = command {
603                        self.handle_command(cmd).await;
604                    } else {
605                        // Command channel closed, exit loop
606                        break;
607                    }
608                }
609            }
610        }
611        Ok(())
612    }
613
614    /// Handle swarm events
615    async fn handle_swarm_event(
616        &mut self,
617        event: SwarmEvent<NetworkBehaviourEvent>,
618    ) -> Result<(), Box<dyn Error>> {
619        match event {
620            SwarmEvent::NewListenAddr { address, .. } => {
621                info!("Listening on {}", address);
622            }
623            SwarmEvent::ConnectionEstablished {
624                peer_id,
625                endpoint,
626                num_established,
627                ..
628            } => {
629                info!(
630                    "Connection established with {} at {} ({} total connections)",
631                    peer_id,
632                    endpoint.get_remote_address(),
633                    num_established
634                );
635                self.connected_peers.insert(peer_id);
636                self.event_tx.send(P2PEvent::PeerConnected(peer_id))?;
637
638                // Update router
639                if let Ok(socket_addr) = endpoint.get_remote_address().to_string().parse() {
640                    self.router
641                        .add_discovered_peer(
642                            peer_id,
643                            crate::discovery::DiscoveredPeer::new(
644                                peer_id,
645                                socket_addr,
646                                crate::discovery::DiscoveryMethod::Kademlia,
647                            ),
648                        )
649                        .await;
650                }
651            }
652            SwarmEvent::ConnectionClosed {
653                peer_id,
654                num_established,
655                ..
656            } => {
657                info!(
658                    "Connection closed with {} ({} remaining connections)",
659                    peer_id, num_established
660                );
661                if num_established == 0 {
662                    self.connected_peers.remove(&peer_id);
663                    self.event_tx.send(P2PEvent::PeerDisconnected(peer_id))?;
664
665                    // Update router
666                    self.router.remove_discovered_peer(peer_id).await;
667                }
668            }
669            SwarmEvent::Behaviour(behaviour_event) => {
670                self.handle_behaviour_event(behaviour_event).await?;
671            }
672            _ => {}
673        }
674        Ok(())
675    }
676
677    /// Handle behaviour events
678    async fn handle_behaviour_event(
679        &mut self,
680        event: NetworkBehaviourEvent,
681    ) -> Result<(), Box<dyn Error>> {
682        match event {
683            NetworkBehaviourEvent::Kademlia(kad_event) => {
684                self.handle_kademlia_event(kad_event).await?;
685            }
686            NetworkBehaviourEvent::Gossipsub(gossipsub_event) => {
687                self.handle_gossipsub_event(gossipsub_event).await?;
688            }
689            NetworkBehaviourEvent::Mdns(mdns_event) => {
690                self.handle_mdns_event(mdns_event).await?;
691            }
692            NetworkBehaviourEvent::Ping(ping_event) => {
693                self.handle_ping_event(ping_event).await?;
694            }
695            NetworkBehaviourEvent::Identify(identify_event) => {
696                self.handle_identify_event(identify_event).await?;
697            }
698            NetworkBehaviourEvent::RequestResponse(req_res_event) => {
699                self.handle_request_response_event(req_res_event).await?;
700            }
701            NetworkBehaviourEvent::Relay(relay_event) => {
702                self.handle_relay_event(relay_event).await?;
703            }
704            NetworkBehaviourEvent::Dcutr(dcutr_event) => {
705                self.handle_dcutr_event(dcutr_event).await?;
706            }
707        }
708        Ok(())
709    }
710
711    /// Handle Kademlia events
712    async fn handle_kademlia_event(&mut self, event: kad::Event) -> Result<(), Box<dyn Error>> {
713        match event {
714            kad::Event::RoutingUpdated {
715                peer, addresses, ..
716            } => {
717                debug!("Kademlia routing updated for peer {}", peer);
718                for addr in addresses.iter() {
719                    self.swarm
720                        .behaviour_mut()
721                        .kademlia
722                        .add_address(&peer, addr.clone());
723                }
724                self.event_tx.send(P2PEvent::RoutingTableUpdated)?;
725            }
726            kad::Event::UnroutablePeer { peer } => {
727                warn!("Peer {} is unroutable", peer);
728            }
729            kad::Event::InboundRequest { request } => {
730                debug!("Kademlia inbound request: {:?}", request);
731            }
732            kad::Event::OutboundQueryProgressed { result, .. } => match result {
733                QueryResult::GetClosestPeers(result) => match result {
734                    Ok(ok) => {
735                        for peer in ok.peers {
736                            debug!("Found closest peer: {}", peer);
737                            self.event_tx.send(P2PEvent::PeerDiscovered(peer))?;
738                        }
739                    }
740                    Err(e) => warn!("Get closest peers error: {:?}", e),
741                },
742                _ => {}
743            },
744            _ => {}
745        }
746        Ok(())
747    }
748
749    /// Handle Gossipsub events
750    async fn handle_gossipsub_event(
751        &mut self,
752        event: gossipsub::Event,
753    ) -> Result<(), Box<dyn Error>> {
754        match event {
755            gossipsub::Event::Message {
756                propagation_source,
757                message,
758                ..
759            } => {
760                let topic = message.topic.to_string();
761                let data = message.data;
762
763                // Deobfuscate if needed
764                let decrypted_data = match self.deobfuscate_traffic(&data) {
765                    Ok(d) => d,
766                    Err(_) => data, // Assume not obfuscated
767                };
768
769                self.event_tx.send(P2PEvent::MessageReceived {
770                    peer_id: propagation_source,
771                    topic,
772                    data: decrypted_data,
773                })?;
774            }
775            gossipsub::Event::Subscribed { peer_id, topic } => {
776                debug!("Peer {} subscribed to topic {}", peer_id, topic);
777            }
778            gossipsub::Event::Unsubscribed { peer_id, topic } => {
779                debug!("Peer {} unsubscribed from topic {}", peer_id, topic);
780            }
781            _ => {}
782        }
783        Ok(())
784    }
785
786    /// Handle MDNS events
787    async fn handle_mdns_event(&mut self, event: mdns::Event) -> Result<(), Box<dyn Error>> {
788        match event {
789            mdns::Event::Discovered(peers) => {
790                for (peer_id, addr) in peers {
791                    debug!("MDNS discovered peer {} at {}", peer_id, addr);
792                    self.swarm
793                        .behaviour_mut()
794                        .kademlia
795                        .add_address(&peer_id, addr);
796                    self.event_tx.send(P2PEvent::PeerDiscovered(peer_id))?;
797                }
798            }
799            mdns::Event::Expired(peers) => {
800                for (peer_id, _) in peers {
801                    debug!("MDNS peer expired: {}", peer_id);
802                }
803            }
804        }
805        Ok(())
806    }
807
808    /// Handle ping events
809    async fn handle_ping_event(&mut self, event: ping::Event) -> Result<(), Box<dyn Error>> {
810        match event.result {
811            Ok(duration) => {
812                debug!("Ping to {} successful: {:?}", event.peer, duration);
813            }
814            Err(e) => {
815                debug!("Ping to {} failed: {}", event.peer, e);
816            }
817        }
818        Ok(())
819    }
820
821    /// Handle identify events
822    async fn handle_identify_event(
823        &mut self,
824        event: identify::Event,
825    ) -> Result<(), Box<dyn Error>> {
826        match event {
827            identify::Event::Received { peer_id, info } => {
828                debug!(
829                    "Identified peer {}: protocols={:?}, agent={}",
830                    peer_id, info.protocols, info.agent_version
831                );
832
833                // Add observed addresses to Kademlia
834                for addr in info.listen_addrs {
835                    self.swarm
836                        .behaviour_mut()
837                        .kademlia
838                        .add_address(&peer_id, addr);
839                }
840            }
841            identify::Event::Sent { .. } => {}
842            identify::Event::Pushed { .. } => {}
843            identify::Event::Error { peer_id, error } => {
844                warn!("Identify error with {}: {}", peer_id, error);
845            }
846        }
847        Ok(())
848    }
849
850    /// Handle relay events
851    async fn handle_relay_event(&mut self, event: relay::Event) -> Result<(), Box<dyn Error>> {
852        match event {
853            relay::Event::ReservationReqAccepted {
854                src_peer_id,
855                renewed,
856                ..
857            } => {
858                info!(
859                    "Relay reservation accepted from peer {}: renewed={}",
860                    src_peer_id, renewed
861                );
862            }
863            relay::Event::ReservationReqDenied { src_peer_id, .. } => {
864                warn!("Relay reservation denied by peer {}", src_peer_id);
865            }
866            relay::Event::ReservationTimedOut { src_peer_id, .. } => {
867                warn!("Relay reservation timed out for peer {}", src_peer_id);
868            }
869            #[allow(deprecated)]
870            relay::Event::CircuitReqAcceptFailed {
871                src_peer_id,
872                dst_peer_id,
873                error,
874            } => {
875                warn!(
876                    "Circuit request accept failed from {} to {}: {:?}",
877                    src_peer_id, dst_peer_id, error
878                );
879            }
880            relay::Event::CircuitReqDenied {
881                src_peer_id,
882                dst_peer_id,
883                ..
884            } => {
885                warn!(
886                    "Circuit request denied from {} to {}",
887                    src_peer_id, dst_peer_id
888                );
889            }
890            relay::Event::CircuitClosed {
891                src_peer_id,
892                dst_peer_id,
893                error,
894            } => {
895                if let Some(error) = error {
896                    warn!(
897                        "Circuit closed between {} and {}: {:?}",
898                        src_peer_id, dst_peer_id, error
899                    );
900                } else {
901                    debug!("Circuit closed between {} and {}", src_peer_id, dst_peer_id);
902                }
903            }
904            // Handle other relay events
905            _ => {
906                debug!("Unhandled relay event: {:?}", event);
907            }
908        }
909        Ok(())
910    }
911
912    /// Handle DCUTR events
913    async fn handle_dcutr_event(&mut self, event: dcutr::Event) -> Result<(), Box<dyn Error>> {
914        match event {
915            dcutr::Event {
916                remote_peer_id,
917                result,
918            } => match result {
919                Ok(connection_id) => {
920                    info!(
921                        "Direct connection upgrade succeeded with peer {} (connection: {:?})",
922                        remote_peer_id, connection_id
923                    );
924                }
925                Err(error) => {
926                    warn!(
927                        "Direct connection upgrade failed with {}: {:?}",
928                        remote_peer_id, error
929                    );
930                }
931            },
932        }
933        Ok(())
934    }
935
936    /// Handle request-response events
937    async fn handle_request_response_event(
938        &mut self,
939        event: request_response::Event<QuDagRequest, QuDagResponse>,
940    ) -> Result<(), Box<dyn Error>> {
941        match event {
942            request_response::Event::Message { peer, message } => match message {
943                request_response::Message::Request {
944                    request, channel, ..
945                } => {
946                    // Handle the request and prepare response
947                    let response = QuDagResponse {
948                        request_id: request.request_id.clone(),
949                        payload: vec![], // TODO: Process request and generate actual response
950                    };
951
952                    // Send the response back directly
953                    self.swarm
954                        .behaviour_mut()
955                        .request_response
956                        .send_response(channel, response)
957                        .map_err(|_| "Failed to send response")?;
958
959                    // Also emit event for the application layer
960                    let (tx, _rx) = oneshot::channel();
961                    self.event_tx.send(P2PEvent::RequestReceived {
962                        peer_id: peer,
963                        request,
964                        channel: tx,
965                    })?;
966                }
967                request_response::Message::Response {
968                    request_id,
969                    response,
970                } => {
971                    if let Some(tx) = self.pending_requests.remove(&request_id.to_string()) {
972                        let _ = tx.send(response);
973                    }
974                }
975            },
976            request_response::Event::OutboundFailure {
977                peer,
978                request_id,
979                error,
980            } => {
981                warn!(
982                    "Request to {} failed (id: {}): {:?}",
983                    peer, request_id, error
984                );
985                self.pending_requests.remove(&request_id.to_string());
986            }
987            request_response::Event::InboundFailure {
988                peer,
989                request_id,
990                error,
991            } => {
992                warn!(
993                    "Inbound request from {} failed (id: {}): {:?}",
994                    peer, request_id, error
995                );
996            }
997            _ => {}
998        }
999        Ok(())
1000    }
1001
1002    /// Handle commands received from P2PHandle
1003    async fn handle_command(&mut self, command: P2PCommand) {
1004        match command {
1005            P2PCommand::Subscribe { topic, response } => {
1006                let result = self.subscribe_internal(&topic).await;
1007                let _ = response.send(result);
1008            }
1009            P2PCommand::Unsubscribe { topic, response } => {
1010                let result = self.unsubscribe_internal(&topic).await;
1011                let _ = response.send(result);
1012            }
1013            P2PCommand::Publish {
1014                topic,
1015                data,
1016                response,
1017            } => {
1018                let result = self.publish_internal(&topic, data).await;
1019                let _ = response.send(result);
1020            }
1021            P2PCommand::SendRequest {
1022                peer_id,
1023                request,
1024                response,
1025            } => {
1026                let result = self.send_request_internal(peer_id, request).await;
1027                let _ = response.send(result);
1028            }
1029            P2PCommand::Dial { addr, response } => {
1030                let result = self.dial_internal(addr).await;
1031                let _ = response.send(result);
1032            }
1033            P2PCommand::GetConnectedPeers { response } => {
1034                let peers = self.connected_peers.iter().copied().collect();
1035                let _ = response.send(peers);
1036            }
1037            P2PCommand::GetLocalPeerId { response } => {
1038                let _ = response.send(self.local_peer_id);
1039            }
1040            P2PCommand::GetListeners { response } => {
1041                let listeners = self.swarm.listeners().cloned().collect();
1042                let _ = response.send(listeners);
1043            }
1044        }
1045    }
1046
1047    /// Internal subscribe method
1048    async fn subscribe_internal(
1049        &mut self,
1050        topic: &str,
1051    ) -> Result<(), Box<dyn Error + Send + Sync>> {
1052        let topic = IdentTopic::new(topic);
1053        self.swarm
1054            .behaviour_mut()
1055            .gossipsub
1056            .subscribe(&topic)
1057            .map_err(|e| format!("Subscribe error: {}", e))?;
1058        info!("Subscribed to topic: {}", topic);
1059        Ok(())
1060    }
1061
1062    /// Internal unsubscribe method
1063    async fn unsubscribe_internal(
1064        &mut self,
1065        topic: &str,
1066    ) -> Result<(), Box<dyn Error + Send + Sync>> {
1067        let topic = IdentTopic::new(topic);
1068        self.swarm
1069            .behaviour_mut()
1070            .gossipsub
1071            .unsubscribe(&topic)
1072            .map_err(|e| format!("Unsubscribe error: {}", e))?;
1073        info!("Unsubscribed from topic: {}", topic);
1074        Ok(())
1075    }
1076
1077    /// Internal publish method
1078    async fn publish_internal(
1079        &mut self,
1080        topic: &str,
1081        data: Vec<u8>,
1082    ) -> Result<(), Box<dyn Error + Send + Sync>> {
1083        let topic = IdentTopic::new(topic);
1084
1085        // Obfuscate traffic if configured
1086        let message_data = self
1087            .obfuscate_traffic(&data)
1088            .map_err(|e| format!("Obfuscation error: {}", e))?;
1089
1090        self.swarm
1091            .behaviour_mut()
1092            .gossipsub
1093            .publish(topic.clone(), message_data)
1094            .map_err(|e| format!("Publish error: {}", e))?;
1095
1096        debug!("Published message to topic: {}", topic);
1097        Ok(())
1098    }
1099
1100    /// Internal send request method with chunking support
1101    async fn send_request_internal(
1102        &mut self,
1103        peer_id: LibP2PPeerId,
1104        request: QuDagRequest,
1105    ) -> Result<QuDagResponse, Box<dyn Error + Send + Sync>> {
1106        let request_id = request.request_id.clone();
1107
1108        // Check if message needs chunking
1109        let network_message = NetworkMessage {
1110            id: request.request_id.clone(),
1111            source: vec![0],      // Placeholder source
1112            destination: vec![0], // Placeholder destination
1113            payload: request.payload.clone(),
1114            priority: MessagePriority::Normal,
1115            ttl: Duration::from_secs(60),
1116        };
1117
1118        // Chunking disabled for initial release - send message directly
1119        // let chunks = self.message_chunker.chunk_message(&network_message).await
1120        //     .map_err(|e| format!("Chunking error: {:?}", e))?;
1121
1122        // Send message directly without chunking
1123        let request = QuDagRequest {
1124            request_id: request_id.clone(),
1125            payload: bincode::serialize(&network_message)
1126                .map_err(|e| format!("Serialization error: {}", e))?,
1127        };
1128
1129        // Setup response handling
1130        let (tx, rx) = oneshot::channel();
1131        self.pending_requests.insert(request_id.clone(), tx);
1132
1133        self.swarm
1134            .behaviour_mut()
1135            .request_response
1136            .send_request(&peer_id, request);
1137
1138        // Wait for response with timeout
1139        match tokio::time::timeout(self.config.timeout, rx).await {
1140            Ok(Ok(response)) => Ok(response),
1141            Ok(Err(_)) => Err("Response channel closed".into()),
1142            Err(_) => {
1143                self.pending_requests.remove(&request_id);
1144                Err("Request timeout".into())
1145            }
1146        }
1147    }
1148
1149    /// Internal dial method
1150    async fn dial_internal(
1151        &mut self,
1152        peer_addr: Multiaddr,
1153    ) -> Result<(), Box<dyn Error + Send + Sync>> {
1154        self.swarm
1155            .dial(peer_addr)
1156            .map_err(|e| format!("Dial error: {}", e))?;
1157        Ok(())
1158    }
1159
1160    /// Obfuscates traffic using ChaCha20-Poly1305
1161    fn obfuscate_traffic(&self, data: &[u8]) -> Result<Vec<u8>, Box<dyn Error>> {
1162        let mut nonce = [0u8; 12];
1163        thread_rng().fill_bytes(&mut nonce);
1164        let nonce = Nonce::from_slice(&nonce);
1165
1166        let mut encrypted = self
1167            .cipher
1168            .encrypt(nonce, data)
1169            .map_err(|e| format!("Encryption error: {}", e))?;
1170
1171        // Prepend nonce to encrypted data
1172        let mut result = nonce.to_vec();
1173        result.append(&mut encrypted);
1174        Ok(result)
1175    }
1176
1177    /// Deobfuscates traffic using ChaCha20-Poly1305
1178    fn deobfuscate_traffic(&self, data: &[u8]) -> Result<Vec<u8>, Box<dyn Error>> {
1179        if data.len() < 12 {
1180            return Err("Data too short".into());
1181        }
1182
1183        let nonce = Nonce::from_slice(&data[..12]);
1184        let encrypted = &data[12..];
1185
1186        self.cipher
1187            .decrypt(nonce, encrypted)
1188            .map_err(|e| format!("Decryption error: {}", e).into())
1189    }
1190}
1191
1192/// Build the transport layer with multiple protocol support
1193fn build_transport(
1194    local_key: &Keypair,
1195    config: &NetworkConfig,
1196) -> Result<Boxed<(LibP2PPeerId, StreamMuxerBox)>, Box<dyn Error>> {
1197    let noise = noise::Config::new(local_key)?;
1198
1199    let yamux_config = yamux::Config::default();
1200
1201    // Build base TCP transport
1202    let tcp = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true));
1203
1204    // Memory transport for testing
1205    let memory = MemoryTransport::default();
1206
1207    // Combine transports
1208    let base_transport = tcp.or_transport(memory);
1209
1210    // Add WebSocket support if enabled
1211    let transport: Boxed<(LibP2PPeerId, StreamMuxerBox)> = if config.enable_websocket {
1212        let ws = websocket::WsConfig::new(tcp::tokio::Transport::new(
1213            tcp::Config::default().nodelay(true),
1214        ));
1215        base_transport
1216            .or_transport(ws)
1217            .upgrade(upgrade::Version::V1)
1218            .authenticate(noise)
1219            .multiplex(yamux_config)
1220            .timeout(Duration::from_secs(20))
1221            .boxed()
1222    } else {
1223        base_transport
1224            .upgrade(upgrade::Version::V1)
1225            .authenticate(noise)
1226            .multiplex(yamux_config)
1227            .timeout(Duration::from_secs(20))
1228            .boxed()
1229    };
1230
1231    Ok(transport)
1232}
1233
1234/// Extract peer ID from multiaddr if present
1235fn extract_peer_id(addr: &Multiaddr) -> Option<LibP2PPeerId> {
1236    addr.iter().find_map(|p| match p {
1237        Protocol::P2p(peer_id) => Some(peer_id),
1238        _ => None,
1239    })
1240}
1241
1242/// Type alias for stream muxer
1243type StreamMuxerBox = libp2p::core::muxing::StreamMuxerBox;
1244
1245/// Type aliases for missing libp2p types in 0.53
1246#[allow(dead_code)]
1247type TransactionId = [u8; 12];
1248#[allow(dead_code)]
1249type Message = Vec<u8>;
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254
1255    #[tokio::test]
1256    async fn test_node_creation() {
1257        let config = NetworkConfig::default();
1258        let (_node, handle) = P2PNode::new(config).await.unwrap();
1259        let peer_id = handle.local_peer_id().await;
1260        assert!(!peer_id.to_string().is_empty());
1261    }
1262
1263    #[tokio::test]
1264    async fn test_traffic_obfuscation() {
1265        let config = NetworkConfig::default();
1266        let (node, _handle) = P2PNode::new(config).await.unwrap();
1267
1268        let test_data = b"test message";
1269        let obfuscated = node.obfuscate_traffic(test_data).unwrap();
1270        let deobfuscated = node.deobfuscate_traffic(&obfuscated).unwrap();
1271
1272        assert_eq!(test_data.to_vec(), deobfuscated);
1273    }
1274
1275    #[tokio::test]
1276    async fn test_node_start() {
1277        let mut config = NetworkConfig::default();
1278        config.listen_addrs = vec!["/ip4/127.0.0.1/tcp/0".to_string()];
1279        config.enable_mdns = false; // Disable MDNS for tests
1280
1281        let (mut node, handle) = P2PNode::new(config).await.unwrap();
1282        node.start().await.unwrap();
1283
1284        // Give it a moment to bind
1285        tokio::time::sleep(Duration::from_millis(100)).await;
1286
1287        let listeners = handle.listeners().await;
1288        assert!(!listeners.is_empty());
1289    }
1290
1291    #[tokio::test]
1292    async fn test_pubsub() {
1293        let config = NetworkConfig::default();
1294        let (_node, handle) = P2PNode::new(config).await.unwrap();
1295
1296        let topic = "test-topic";
1297        handle.subscribe(topic).await.unwrap();
1298
1299        let test_data = vec![1, 2, 3, 4, 5];
1300        handle.publish(topic, test_data).await.unwrap();
1301    }
1302}