Skip to main content

firecloud_net/
node.rs

1//! FireCloud P2P Node
2
3use crate::{
4    FireCloudBehaviour, FireCloudEvent, HealthMonitor, NetError, NetResult, 
5    TransferRequest, TransferResponse,
6    MessageRequest, MessageResponse,
7    relay_manager::{RelayManager, RelayManagerConfig},
8};
9use futures::StreamExt;
10use libp2p::{
11    dcutr,
12    gossipsub::{self, IdentTopic},
13    identify,
14    identity::Keypair,
15    kad,
16    mdns,
17    multiaddr::Protocol,
18    ping,
19    request_response::{self, OutboundRequestId, ResponseChannel},
20    swarm::{SwarmEvent, dial_opts::DialOpts},
21    Multiaddr, PeerId, Swarm,
22};
23use std::{collections::{HashSet, HashMap}, time::Duration};
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27/// Node configuration
28#[derive(Debug, Clone)]
29pub struct NodeConfig {
30    /// Port to listen on (0 for random)
31    pub port: u16,
32    /// Enable mDNS local discovery
33    pub enable_mdns: bool,
34    /// Bootstrap peers for Kademlia
35    pub bootstrap_peers: Vec<Multiaddr>,
36    /// Bootstrap relay addresses for NAT traversal
37    pub bootstrap_relays: Vec<Multiaddr>,
38}
39
40impl Default for NodeConfig {
41    fn default() -> Self {
42        Self {
43            port: 0,
44            enable_mdns: true,
45            bootstrap_peers: Vec::new(),
46            bootstrap_relays: Vec::new(),
47        }
48    }
49}
50
51/// Events emitted by the node
52#[derive(Debug)]
53pub enum NodeEvent {
54    /// A new peer was discovered
55    PeerDiscovered(PeerId),
56    /// A peer disconnected
57    PeerDisconnected(PeerId),
58    /// Received a message on a topic
59    Message {
60        source: PeerId,
61        topic: String,
62        data: Vec<u8>,
63    },
64    /// Node is listening on an address
65    Listening(Multiaddr),
66    /// Received a transfer request from a peer (needs response via respond_transfer)
67    TransferRequest {
68        peer: PeerId,
69        request: TransferRequest,
70        channel: ResponseChannel<TransferResponse>,
71    },
72    /// Received a response to our transfer request
73    TransferResponse {
74        peer: PeerId,
75        request_id: OutboundRequestId,
76        response: TransferResponse,
77    },
78    /// Transfer request failed
79    TransferFailed {
80        peer: PeerId,
81        request_id: OutboundRequestId,
82        error: String,
83    },
84    /// DHT: Found providers for a key
85    ProvidersFound {
86        key: String,
87        providers: Vec<PeerId>,
88    },
89    /// DHT: Successfully started providing
90    ProvideStarted {
91        key: String,
92    },
93    /// DHT: Record found in DHT
94    RecordFound {
95        key: String,
96        value: Vec<u8>,
97    },
98    /// DHT: Record stored successfully
99    RecordStored {
100        key: String,
101    },
102    /// DHT: Query failed
103    DhtQueryFailed {
104        key: String,
105        error: String,
106    },
107    
108    // ========================================================================
109    // Messaging Events (Phase 1.5)
110    // ========================================================================
111    
112    /// Received a message request from a peer (needs response via respond_message)
113    MessageRequest {
114        peer: PeerId,
115        request: MessageRequest,
116        channel: ResponseChannel<MessageResponse>,
117    },
118    /// Received a response to our message request
119    MessageResponse {
120        peer: PeerId,
121        request_id: OutboundRequestId,
122        response: MessageResponse,
123    },
124    /// Message request failed
125    MessageFailed {
126        peer: PeerId,
127        request_id: OutboundRequestId,
128        error: String,
129    },
130}
131
132/// A FireCloud P2P node
133pub struct FireCloudNode {
134    swarm: Swarm<FireCloudBehaviour>,
135    local_peer_id: PeerId,
136    known_peers: HashSet<PeerId>,
137    /// Peers discovered via mDNS (local network)
138    local_peers: HashSet<PeerId>,
139    /// Latest measured round-trip times per peer (from ping events)
140    peer_latencies: HashMap<PeerId, Duration>,
141    /// Health monitoring for all peers
142    health_monitor: HealthMonitor,
143    /// Relay connection manager for NAT traversal
144    relay_manager: RelayManager,
145    event_tx: mpsc::Sender<NodeEvent>,
146    event_rx: mpsc::Receiver<NodeEvent>,
147}
148
149impl FireCloudNode {
150    /// Create a new FireCloud node
151    pub async fn new(config: NodeConfig) -> NetResult<Self> {
152        // Generate identity
153        let local_key = Keypair::generate_ed25519();
154        let local_peer_id = PeerId::from(local_key.public());
155
156        info!("Local peer ID: {}", local_peer_id);
157
158        // Build swarm with QUIC transport + relay client
159        // The .with_relay_client() method handles transport integration automatically
160        let swarm = libp2p::SwarmBuilder::with_existing_identity(local_key.clone())
161            .with_tokio()
162            .with_quic()
163            .with_relay_client(
164                libp2p::noise::Config::new,
165                libp2p::yamux::Config::default,
166            )
167            .map_err(|e| NetError::Other(e.to_string()))?
168            .with_behaviour(|keypair, relay_client| {
169                FireCloudBehaviour::new_with_relay(local_peer_id, keypair, relay_client)
170                    .expect("Failed to create FireCloudBehaviour")
171            })
172            .map_err(|e| NetError::Other(e.to_string()))?
173            .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
174            .build();
175
176        let (event_tx, event_rx) = mpsc::channel(100);
177
178        // Initialize relay manager with bootstrap relays
179        let relay_config = RelayManagerConfig {
180            bootstrap_relays: config.bootstrap_relays.clone(),
181            min_active_relays: 2,
182            max_relay_connections: 5,
183            reconnect_interval: Duration::from_secs(30),
184            connection_timeout: Duration::from_secs(10),
185        };
186        let relay_manager = RelayManager::new(relay_config);
187
188        let mut node = Self {
189            swarm,
190            local_peer_id,
191            known_peers: HashSet::new(),
192            local_peers: HashSet::new(),
193            peer_latencies: HashMap::new(),
194            health_monitor: HealthMonitor::new(),
195            relay_manager,
196            event_tx,
197            event_rx,
198        };
199
200        // Start listening
201        let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/udp/{}/quic-v1", config.port)
202            .parse()
203            .map_err(|e| NetError::Other(format!("Invalid address: {}", e)))?;
204
205        node.swarm
206            .listen_on(listen_addr)
207            .map_err(|e| NetError::Transport(e.to_string()))?;
208
209        // Bootstrap Kademlia if peers provided
210        for addr in config.bootstrap_peers {
211            if let Some(peer_id) = extract_peer_id(&addr) {
212                node.swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
213            }
214        }
215
216        // Connect to bootstrap relays for NAT traversal
217        node.connect_to_bootstrap_relays();
218
219        Ok(node)
220    }
221
222    /// Connect to bootstrap relay nodes
223    fn connect_to_bootstrap_relays(&mut self) {
224        let relays_to_connect = self.relay_manager.relays_to_connect();
225        
226        for relay_addr in relays_to_connect {
227            info!("Connecting to bootstrap relay: {}", relay_addr);
228            
229            // Mark as connecting in relay manager
230            self.relay_manager.mark_connecting(&relay_addr);
231            
232            // Dial the relay
233            match self.swarm.dial(relay_addr.clone()) {
234                Ok(_) => {
235                    debug!("Dialing relay at {}", relay_addr);
236                }
237                Err(e) => {
238                    warn!("Failed to dial relay {}: {}", relay_addr, e);
239                    self.relay_manager.mark_failed(&relay_addr, format!("Dial failed: {}", e));
240                }
241            }
242        }
243        
244        // Log relay stats
245        let stats = self.relay_manager.stats();
246        info!(
247            "Relay connections: {} total, {} connected, {} listening, {} failed",
248            stats.total_relays, stats.connected_relays, stats.listening_relays, stats.failed_relays
249        );
250    }
251
252    /// Get the latest measured RTT for a peer, if available
253    pub fn peer_latency(&self, peer: &PeerId) -> Option<Duration> {
254        self.peer_latencies.get(peer).cloned()
255    }
256
257    /// Select the peer with the lowest recorded latency from the provided list
258    /// Returns None if no latencies are known for the provided peers
259    pub fn select_lowest_latency_peer(&self, candidates: &[PeerId]) -> Option<PeerId> {
260        candidates
261            .iter()
262            .filter_map(|p| self.peer_latencies.get(p).map(|d| (p, d)))
263            .min_by_key(|(_, d)| *d)
264            .map(|(p, _)| *p)
265    }
266
267    /// Check if a peer was discovered via mDNS (i.e., is on the local network)
268    pub fn is_local_peer(&self, peer: &PeerId) -> bool {
269        self.local_peers.contains(peer)
270    }
271
272    /// Get list of local (mDNS-discovered) peers
273    pub fn local_peers(&self) -> &HashSet<PeerId> {
274        &self.local_peers
275    }
276
277    /// Choose the best peer from candidates for chunk retrieval.
278    /// Priority:
279    /// 1. Local peers (mDNS-discovered) with lowest latency
280    /// 2. Any peer with lowest latency
281    /// 3. First candidate if no latency info available
282    pub fn choose_best_peer(&self, candidates: &[PeerId]) -> Option<PeerId> {
283        if candidates.is_empty() {
284            return None;
285        }
286
287        // Filter to local peers first
288        let local_candidates: Vec<PeerId> = candidates
289            .iter()
290            .filter(|p| self.local_peers.contains(p))
291            .cloned()
292            .collect();
293
294        // Try to pick lowest-latency local peer
295        if !local_candidates.is_empty() {
296            if let Some(best) = self.select_lowest_latency_peer(&local_candidates) {
297                return Some(best);
298            }
299            // If no latency info, just return first local peer
300            return Some(local_candidates[0]);
301        }
302
303        // No local peers; pick lowest-latency from all candidates
304        if let Some(best) = self.select_lowest_latency_peer(candidates) {
305            return Some(best);
306        }
307
308        // Fallback: return first candidate
309        Some(candidates[0])
310    }
311
312    /// Get local peer ID
313    pub fn local_peer_id(&self) -> PeerId {
314        self.local_peer_id
315    }
316
317    /// Get known peers
318    pub fn known_peers(&self) -> &HashSet<PeerId> {
319        &self.known_peers
320    }
321
322    /// Get the number of connected peers
323    pub fn connected_peers_count(&self) -> usize {
324        self.swarm.connected_peers().count()
325    }
326
327    /// Get all connected peer IDs
328    pub fn connected_peers(&self) -> Vec<PeerId> {
329        self.swarm.connected_peers().cloned().collect()
330    }
331
332    /// Get Kademlia routing table size (approximate)
333    pub fn kademlia_peers_count(&self) -> usize {
334        // Return known peers count as approximation
335        // The actual Kademlia kbuckets iteration requires mutable access
336        self.known_peers.len()
337    }
338
339    /// Add a bootstrap peer to Kademlia
340    pub fn add_bootstrap_peer(&mut self, peer_id: &PeerId, addr: Multiaddr) {
341        self.swarm.behaviour_mut().kademlia.add_address(peer_id, addr);
342        info!("Added bootstrap peer: {}", peer_id);
343    }
344
345    /// Trigger Kademlia bootstrap process
346    pub fn bootstrap(&mut self) -> NetResult<kad::QueryId> {
347        self.swarm
348            .behaviour_mut()
349            .kademlia
350            .bootstrap()
351            .map_err(|e| NetError::Protocol(format!("Bootstrap failed: {:?}", e)))
352    }
353
354    /// Subscribe to a GossipSub topic
355    pub fn subscribe(&mut self, topic: &str) -> NetResult<()> {
356        let topic = IdentTopic::new(topic);
357        self.swarm
358            .behaviour_mut()
359            .gossipsub
360            .subscribe(&topic)
361            .map_err(|e| NetError::Protocol(format!("Subscribe failed: {}", e)))?;
362        Ok(())
363    }
364
365    /// Publish a message to a topic
366    pub fn publish(&mut self, topic: &str, data: Vec<u8>) -> NetResult<()> {
367        let topic = IdentTopic::new(topic);
368        self.swarm
369            .behaviour_mut()
370            .gossipsub
371            .publish(topic, data)
372            .map_err(|e| NetError::Protocol(format!("Publish failed: {}", e)))?;
373        Ok(())
374    }
375
376    /// Dial a peer by address
377    pub fn dial(&mut self, addr: Multiaddr) -> NetResult<()> {
378        self.swarm
379            .dial(DialOpts::unknown_peer_id().address(addr).build())
380            .map_err(|e| NetError::Dial(e.to_string()))?;
381        Ok(())
382    }
383
384    /// Get reference to health monitor
385    pub fn health_monitor(&self) -> &HealthMonitor {
386        &self.health_monitor
387    }
388
389    /// Get best peer for transfer (uses health monitor)
390    pub fn get_best_peer_for_transfer(&self) -> Option<PeerId> {
391        self.health_monitor.get_best_peer()
392    }
393
394    /// Get healthy peers sorted by health score
395    pub fn get_healthy_peers(&self) -> Vec<PeerId> {
396        self.health_monitor.get_healthy_peers()
397    }
398
399    /// Cleanup offline peers from health monitor
400    pub fn cleanup_offline_peers(&mut self) {
401        self.health_monitor.cleanup_offline_peers();
402    }
403
404    /// Send a transfer request to a peer
405    pub fn send_transfer_request(&mut self, peer: &PeerId, request: TransferRequest) -> OutboundRequestId {
406        self.swarm.behaviour_mut().transfer.send_request(peer, request)
407    }
408
409    /// Send a response to a transfer request
410    pub fn respond_transfer(&mut self, channel: ResponseChannel<TransferResponse>, response: TransferResponse) -> NetResult<()> {
411        self.swarm.behaviour_mut().transfer.send_response(channel, response)
412            .map_err(|_| NetError::Protocol("Failed to send response".to_string()))
413    }
414
415    /// Request a chunk from a peer
416    pub fn request_chunk(&mut self, peer: &PeerId, hash: String) -> OutboundRequestId {
417        self.send_transfer_request(peer, TransferRequest::GetChunk { hash })
418    }
419
420    // ========================================================================
421    // Messaging API (Phase 1.5)
422    // ========================================================================
423
424    /// Send a message request to a peer
425    pub fn send_message_request(&mut self, peer: &PeerId, request: MessageRequest) -> OutboundRequestId {
426        self.swarm.behaviour_mut().messaging.send_request(peer, request)
427    }
428
429    /// Send a response to a message request
430    pub fn respond_message(&mut self, channel: ResponseChannel<MessageResponse>, response: MessageResponse) -> NetResult<()> {
431        self.swarm.behaviour_mut().messaging.send_response(channel, response)
432            .map_err(|_| NetError::Protocol("Failed to send message response".to_string()))
433    }
434
435    /// Send a friend request to a peer
436    pub fn send_friend_request(&mut self, peer: &PeerId, name: Option<String>) -> OutboundRequestId {
437        self.send_message_request(peer, MessageRequest::FriendRequest { name })
438    }
439
440    /// Accept a friend request from a peer
441    pub fn send_friend_accept(&mut self, peer: &PeerId) -> OutboundRequestId {
442        self.send_message_request(peer, MessageRequest::FriendAccept)
443    }
444
445    /// Send a direct message to a friend (content already padded to 1KB!)
446    pub fn send_direct_message(
447        &mut self,
448        peer: &PeerId,
449        content: Vec<u8>,
450        message_id: String,
451        timestamp: i64,
452    ) -> OutboundRequestId {
453        self.send_message_request(
454            peer,
455            MessageRequest::DirectMessage {
456                content,
457                message_id,
458                timestamp,
459            },
460        )
461    }
462
463    /// Ping a peer to check if they're online
464    pub fn ping_peer(&mut self, peer: &PeerId) -> OutboundRequestId {
465        self.send_message_request(peer, MessageRequest::Ping)
466    }
467
468    /// Store a chunk on a peer
469    pub fn store_chunk_on_peer(&mut self, peer: &PeerId, hash: String, data: Vec<u8>, original_size: u64) -> OutboundRequestId {
470        self.send_transfer_request(peer, TransferRequest::StoreChunk { hash, data, original_size })
471    }
472
473    /// Check if a peer has a chunk
474    pub fn check_chunk(&mut self, peer: &PeerId, hash: String) -> OutboundRequestId {
475        self.send_transfer_request(peer, TransferRequest::HasChunk { hash })
476    }
477
478    // ==================== DHT (Kademlia) Methods ====================
479
480    /// Announce this node as a provider for a file (by file_id)
481    /// Other nodes can then find us via get_providers
482    pub fn announce_file(&mut self, file_id: &str) -> kad::QueryId {
483        let key = kad::RecordKey::new(&file_id.as_bytes());
484        self.swarm.behaviour_mut().kademlia.start_providing(key)
485            .expect("Failed to start providing")
486    }
487
488    /// Stop announcing as a provider for a file
489    pub fn stop_announcing_file(&mut self, file_id: &str) {
490        let key = kad::RecordKey::new(&file_id.as_bytes());
491        self.swarm.behaviour_mut().kademlia.stop_providing(&key);
492    }
493
494    /// Find providers for a file (by file_id)
495    /// Results will come via NodeEvent::ProvidersFound
496    pub fn find_file_providers(&mut self, file_id: &str) -> kad::QueryId {
497        let key = kad::RecordKey::new(&file_id.as_bytes());
498        self.swarm.behaviour_mut().kademlia.get_providers(key)
499    }
500
501    /// Store a record in the DHT (for manifest metadata)
502    /// Results will come via NodeEvent::RecordStored
503    pub fn put_dht_record(&mut self, key: &str, value: Vec<u8>) -> kad::QueryId {
504        let record = kad::Record {
505            key: kad::RecordKey::new(&key.as_bytes()),
506            value,
507            publisher: Some(self.local_peer_id),
508            expires: None,
509        };
510        self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One)
511            .expect("Failed to put record")
512    }
513
514    /// Get a record from the DHT
515    /// Results will come via NodeEvent::RecordFound
516    pub fn get_dht_record(&mut self, key: &str) -> kad::QueryId {
517        let key = kad::RecordKey::new(&key.as_bytes());
518        self.swarm.behaviour_mut().kademlia.get_record(key)
519    }
520
521    /// Bootstrap Kademlia DHT (find closest peers to self)
522    pub fn bootstrap_dht(&mut self) -> Result<kad::QueryId, kad::NoKnownPeers> {
523        self.swarm.behaviour_mut().kademlia.bootstrap()
524    }
525
526    // ==================== End DHT Methods ====================
527
528    /// Run the node event loop
529    pub async fn run(&mut self) {
530        loop {
531            tokio::select! {
532                event = self.swarm.select_next_some() => {
533                    self.handle_swarm_event(event).await;
534                }
535            }
536        }
537    }
538
539    /// Poll for the next event (non-blocking version for integration)
540    pub async fn poll_event(&mut self) -> Option<NodeEvent> {
541        tokio::select! {
542            event = self.swarm.select_next_some() => {
543                self.handle_swarm_event(event).await;
544                self.event_rx.try_recv().ok()
545            }
546            event = self.event_rx.recv() => {
547                event
548            }
549        }
550    }
551
552    async fn handle_swarm_event(&mut self, event: SwarmEvent<FireCloudEvent>) {
553        match event {
554            SwarmEvent::NewListenAddr { address, .. } => {
555                info!("Listening on {}/p2p/{}", address, self.local_peer_id);
556                let _ = self.event_tx.send(NodeEvent::Listening(address)).await;
557            }
558
559            SwarmEvent::Behaviour(FireCloudEvent::Mdns(mdns::Event::Discovered(peers))) => {
560                for (peer_id, addr) in peers {
561                    if !self.known_peers.contains(&peer_id) {
562                        info!("mDNS discovered (local): {} at {}", peer_id, addr);
563                        self.known_peers.insert(peer_id);
564                        // Mark as local peer (discovered via mDNS)
565                        self.local_peers.insert(peer_id);
566                        
567                        // Add to health monitor as local peer
568                        self.health_monitor.add_peer(peer_id, true);
569                        
570                        // Add to Kademlia
571                        self.swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
572                        
573                        let _ = self.event_tx.send(NodeEvent::PeerDiscovered(peer_id)).await;
574                    }
575                }
576            }
577
578            SwarmEvent::Behaviour(FireCloudEvent::Mdns(mdns::Event::Expired(peers))) => {
579                for (peer_id, _) in peers {
580                    debug!("mDNS peer expired: {}", peer_id);
581                    // Remove from local_peers but keep in known_peers (may reconnect via other means)
582                    self.local_peers.remove(&peer_id);
583                    // Note: Keep in health monitor, will be cleaned up by periodic task if truly offline
584                }
585            }
586
587            SwarmEvent::Behaviour(FireCloudEvent::Kademlia(kad::Event::RoutingUpdated { 
588                peer, 
589                addresses, 
590                .. 
591            })) => {
592                debug!("Kademlia routing updated: {} with {} addresses", peer, addresses.len());
593                if !self.known_peers.contains(&peer) {
594                    self.known_peers.insert(peer);
595                    let _ = self.event_tx.send(NodeEvent::PeerDiscovered(peer)).await;
596                }
597            }
598
599            // Handle Kademlia query results
600            SwarmEvent::Behaviour(FireCloudEvent::Kademlia(kad::Event::OutboundQueryProgressed { 
601                result, 
602                .. 
603            })) => {
604                match result {
605                    kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {
606                        let key_str = String::from_utf8_lossy(key.as_ref()).to_string();
607                        let provider_ids: Vec<PeerId> = providers.into_iter().collect();
608                        info!("Found {} providers for {}", provider_ids.len(), key_str);
609                        let _ = self.event_tx.send(NodeEvent::ProvidersFound {
610                            key: key_str,
611                            providers: provider_ids,
612                        }).await;
613                    }
614                    kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. })) => {
615                        debug!("GetProviders finished with no additional records");
616                    }
617                    kad::QueryResult::GetProviders(Err(e)) => {
618                        warn!("GetProviders failed: {:?}", e);
619                        let _ = self.event_tx.send(NodeEvent::DhtQueryFailed {
620                            key: String::from_utf8_lossy(e.key().as_ref()).to_string(),
621                            error: format!("{:?}", e),
622                        }).await;
623                    }
624                    kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => {
625                        let key_str = String::from_utf8_lossy(key.as_ref()).to_string();
626                        info!("Started providing: {}", key_str);
627                        let _ = self.event_tx.send(NodeEvent::ProvideStarted {
628                            key: key_str,
629                        }).await;
630                    }
631                    kad::QueryResult::StartProviding(Err(e)) => {
632                        warn!("StartProviding failed: {:?}", e);
633                    }
634                    kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(peer_record))) => {
635                        let key_str = String::from_utf8_lossy(peer_record.record.key.as_ref()).to_string();
636                        info!("Found record: {} ({} bytes)", key_str, peer_record.record.value.len());
637                        let _ = self.event_tx.send(NodeEvent::RecordFound {
638                            key: key_str,
639                            value: peer_record.record.value,
640                        }).await;
641                    }
642                    kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. })) => {
643                        debug!("GetRecord finished with no additional records");
644                    }
645                    kad::QueryResult::GetRecord(Err(e)) => {
646                        warn!("GetRecord failed: {:?}", e);
647                        let key_bytes = match &e {
648                            kad::GetRecordError::NotFound { key, .. } => key.as_ref(),
649                            kad::GetRecordError::QuorumFailed { key, .. } => key.as_ref(),
650                            kad::GetRecordError::Timeout { key, .. } => key.as_ref(),
651                        };
652                        let _ = self.event_tx.send(NodeEvent::DhtQueryFailed {
653                            key: String::from_utf8_lossy(key_bytes).to_string(),
654                            error: format!("{:?}", e),
655                        }).await;
656                    }
657                    kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
658                        let key_str = String::from_utf8_lossy(key.as_ref()).to_string();
659                        info!("Record stored: {}", key_str);
660                        let _ = self.event_tx.send(NodeEvent::RecordStored {
661                            key: key_str,
662                        }).await;
663                    }
664                    kad::QueryResult::PutRecord(Err(e)) => {
665                        warn!("PutRecord failed: {:?}", e);
666                    }
667                    kad::QueryResult::Bootstrap(Ok(_)) => {
668                        info!("Kademlia bootstrap successful");
669                    }
670                    kad::QueryResult::Bootstrap(Err(e)) => {
671                        warn!("Kademlia bootstrap failed: {:?}", e);
672                    }
673                    _ => {}
674                }
675            }
676
677            SwarmEvent::Behaviour(FireCloudEvent::Gossipsub(gossipsub::Event::Message {
678                propagation_source,
679                message,
680                ..
681            })) => {
682                info!(
683                    "Received message from {} on topic {}",
684                    propagation_source,
685                    message.topic
686                );
687                let _ = self.event_tx.send(NodeEvent::Message {
688                    source: propagation_source,
689                    topic: message.topic.to_string(),
690                    data: message.data,
691                }).await;
692            }
693
694            SwarmEvent::Behaviour(FireCloudEvent::Identify(identify::Event::Received {
695                peer_id,
696                info,
697                ..
698            })) => {
699                debug!(
700                    "Identified peer {}: {} with {} addresses",
701                    peer_id,
702                    info.protocol_version,
703                    info.listen_addrs.len()
704                );
705                
706                // IMPORTANT: Add the peer's addresses to Kademlia's routing table
707                // This is necessary for DHT peer discovery beyond bootstrap nodes
708                for addr in &info.listen_addrs {
709                    self.swarm.behaviour_mut().kademlia.add_address(&peer_id, addr.clone());
710                }
711                debug!("Kademlia routing updated: {} with {} addresses", peer_id, info.listen_addrs.len());
712            }
713
714            SwarmEvent::Behaviour(FireCloudEvent::Ping(ping::Event { peer, result, .. })) => {
715                match result {
716                    Ok(rtt) => {
717                        debug!("Ping to {}: {:?}", peer, rtt);
718                        // Store/update latest RTT for this peer
719                        self.peer_latencies.insert(peer, rtt);
720                        // Update health monitor
721                        self.health_monitor.record_ping_success(&peer, rtt);
722                    }
723                    Err(e) => {
724                        warn!("Ping to {} failed: {}", peer, e);
725                        // Update health monitor
726                        self.health_monitor.record_ping_failure(&peer);
727                    }
728                }
729            }
730
731            // Handle relay client events for circuit relay
732            SwarmEvent::Behaviour(FireCloudEvent::RelayClient(event)) => {
733                use libp2p::relay;
734                match event {
735                    relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => {
736                        info!("✅ Relay reservation accepted by {}", relay_peer_id);
737                        
738                        // Find the relay address that matches this peer ID
739                        let bootstrap_relays: Vec<_> = self.relay_manager.config().bootstrap_relays.clone();
740                        for relay_addr in bootstrap_relays {
741                            // Extract peer ID from multiaddr
742                            if let Some(peer) = extract_peer_id(&relay_addr) {
743                                if peer == relay_peer_id {
744                                    self.relay_manager.mark_connected(&relay_addr, relay_peer_id);
745                                    
746                                    // Build p2p-circuit address for listening
747                                    let circuit_addr = relay_addr
748                                        .with(libp2p::multiaddr::Protocol::P2pCircuit)
749                                        .with(libp2p::multiaddr::Protocol::P2p(self.local_peer_id));
750                                    
751                                    info!("🔊 Listening on relay circuit: {}", circuit_addr);
752                                    
753                                    // Listen on the circuit relay address
754                                    match self.swarm.listen_on(circuit_addr.clone()) {
755                                        Ok(_) => {
756                                            self.relay_manager.mark_listening(&relay_peer_id, circuit_addr);
757                                        }
758                                        Err(e) => {
759                                            warn!("Failed to listen on relay circuit: {}", e);
760                                        }
761                                    }
762                                    break;
763                                }
764                            }
765                        }
766                    }
767                    relay::client::Event::OutboundCircuitEstablished { relay_peer_id, .. } => {
768                        info!("🔗 Outbound circuit established via relay {}", relay_peer_id);
769                    }
770                    relay::client::Event::InboundCircuitEstablished { src_peer_id, .. } => {
771                        info!("🔗 Inbound circuit established from {}", src_peer_id);
772                    }
773                    _ => {
774                        debug!("Relay client event: {:?}", event);
775                    }
776                }
777            }
778
779            // Handle DCUtR events for hole punching
780            SwarmEvent::Behaviour(FireCloudEvent::Dcutr(dcutr::Event {
781                remote_peer_id,
782                result,
783            })) => {
784                match result {
785                    Ok(_connection_id) => {
786                        info!("✅ Hole punch SUCCESS with {} - direct connection established", remote_peer_id);
787                        // Direct connection established - no longer need relay for this peer
788                    }
789                    Err(error) => {
790                        warn!("❌ Hole punch FAILED with {}: {:?}", remote_peer_id, error);
791                        // Fall back to relayed connection
792                    }
793                }
794            }
795
796            SwarmEvent::Behaviour(FireCloudEvent::Transfer(event)) => {
797                match event {
798                    request_response::Event::Message { peer, message } => {
799                        match message {
800                            request_response::Message::Request { request, channel, .. } => {
801                                info!("Transfer request from {}: {:?}", peer, request);
802                                // Note: Clone is not implemented for ResponseChannel,
803                                // so we need to handle this differently - expose it to caller
804                                // The caller needs to handle the request and respond
805                                let _ = self.event_tx.send(NodeEvent::TransferRequest {
806                                    peer,
807                                    request,
808                                    channel,
809                                }).await;
810                            }
811                            request_response::Message::Response { request_id, response } => {
812                                info!("Transfer response for {:?}: {:?}", request_id, response);
813                                let _ = self.event_tx.send(NodeEvent::TransferResponse {
814                                    peer,
815                                    request_id,
816                                    response,
817                                }).await;
818                            }
819                        }
820                    }
821                    request_response::Event::OutboundFailure { peer, request_id, error, .. } => {
822                        warn!("Transfer to {} failed: {:?}", peer, error);
823                        let _ = self.event_tx.send(NodeEvent::TransferFailed {
824                            peer,
825                            request_id,
826                            error: format!("{:?}", error),
827                        }).await;
828                    }
829                    request_response::Event::InboundFailure { peer, error, .. } => {
830                        warn!("Inbound transfer from {} failed: {:?}", peer, error);
831                    }
832                    request_response::Event::ResponseSent { peer, .. } => {
833                        debug!("Response sent to {}", peer);
834                    }
835                }
836            }
837
838            // ========================================================================
839            // Messaging Events (Phase 1.5)
840            // ========================================================================
841            SwarmEvent::Behaviour(FireCloudEvent::Messaging(event)) => {
842                match event {
843                    request_response::Event::Message { peer, message } => {
844                        match message {
845                            request_response::Message::Request { request, channel, .. } => {
846                                info!("📨 Message request from {}: {:?}", peer, request);
847                                // Forward to caller for handling (friend requests, messages, etc.)
848                                let _ = self.event_tx.send(NodeEvent::MessageRequest {
849                                    peer,
850                                    request,
851                                    channel,
852                                }).await;
853                            }
854                            request_response::Message::Response { request_id, response } => {
855                                info!("✅ Message response for {:?}: {:?}", request_id, response);
856                                let _ = self.event_tx.send(NodeEvent::MessageResponse {
857                                    peer,
858                                    request_id,
859                                    response,
860                                }).await;
861                            }
862                        }
863                    }
864                    request_response::Event::OutboundFailure { peer, request_id, error, .. } => {
865                        warn!("❌ Message to {} failed: {:?}", peer, error);
866                        let _ = self.event_tx.send(NodeEvent::MessageFailed {
867                            peer,
868                            request_id,
869                            error: format!("{:?}", error),
870                        }).await;
871                    }
872                    request_response::Event::InboundFailure { peer, error, .. } => {
873                        warn!("❌ Inbound message from {} failed: {:?}", peer, error);
874                    }
875                    request_response::Event::ResponseSent { peer, .. } => {
876                        debug!("✅ Message response sent to {}", peer);
877                    }
878                }
879            }
880
881            SwarmEvent::ConnectionEstablished { peer_id, .. } => {
882                info!("Connected to {}", peer_id);
883                if !self.known_peers.contains(&peer_id) {
884                    self.known_peers.insert(peer_id);
885                    let _ = self.event_tx.send(NodeEvent::PeerDiscovered(peer_id)).await;
886                }
887                
888                // Check if this is a relay server we're supposed to use
889                let bootstrap_relays: Vec<_> = self.relay_manager.config().bootstrap_relays.clone();
890                for relay_addr in bootstrap_relays {
891                    if let Some(relay_peer) = extract_peer_id(&relay_addr) {
892                        if relay_peer == peer_id {
893                            info!("🔗 Connected to relay server, requesting reservation...");
894                            self.relay_manager.mark_connected(&relay_addr, peer_id);
895                            
896                            // Listen on relay circuit to trigger reservation
897                            let circuit_addr = relay_addr
898                                .with(libp2p::multiaddr::Protocol::P2pCircuit);
899                            
900                            match self.swarm.listen_on(circuit_addr.clone()) {
901                                Ok(_) => {
902                                    info!("📡 Listening on relay circuit: {}", circuit_addr);
903                                }
904                                Err(e) => {
905                                    warn!("Failed to listen on relay circuit: {}", e);
906                                }
907                            }
908                            break;
909                        }
910                    }
911                }
912            }
913
914            SwarmEvent::ConnectionClosed { peer_id, .. } => {
915                debug!("Disconnected from {}", peer_id);
916                let _ = self.event_tx.send(NodeEvent::PeerDisconnected(peer_id)).await;
917            }
918
919            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
920                warn!("Outgoing connection error to {:?}: {}", peer_id, error);
921            }
922
923            SwarmEvent::IncomingConnectionError { error, .. } => {
924                warn!("Incoming connection error: {}", error);
925            }
926
927            _ => {}
928        }
929    }
930
931    // ========== PHASE 2: STORAGE PROVIDER METHODS ==========
932
933    /// Announce this node as a storage provider to the DHT
934    pub async fn announce_as_provider(&mut self, available_space: u64) -> NetResult<()> {
935        use crate::provider::ProviderInfo;
936        
937        info!("📢 Announcing as storage provider: {} GB available", available_space / (1024 * 1024 * 1024));
938
939        // Get our listen addresses
940        let listen_addrs: Vec<Multiaddr> = self.swarm.listeners().cloned().collect();
941
942        // Create provider info
943        let provider_info = ProviderInfo::new(
944            self.local_peer_id,
945            available_space,
946            listen_addrs,
947        );
948
949        // Serialize provider info
950        let provider_data = bincode::serialize(&provider_info)
951            .map_err(|e| NetError::Other(format!("Failed to serialize provider info: {}", e)))?;
952
953        // Store in DHT with key "storage-providers:<peer_id>"
954        let key_string = format!("storage-providers:{}", self.local_peer_id);
955        let key = libp2p::kad::RecordKey::new(&key_string.as_bytes());
956        
957        let record = libp2p::kad::Record {
958            key: key.clone(),
959            value: provider_data,
960            publisher: Some(self.local_peer_id),
961            expires: None, // We'll refresh periodically
962        };
963
964        self.swarm
965            .behaviour_mut()
966            .kademlia
967            .put_record(record, libp2p::kad::Quorum::One)
968            .map_err(|e| NetError::Other(format!("Failed to put DHT record: {}", e)))?;
969
970        info!("✓ Provider announcement sent to DHT");
971        Ok(())
972    }
973
974    /// Find available storage providers from the DHT
975    pub async fn find_storage_providers(&mut self, min_count: usize) -> NetResult<Vec<crate::provider::ProviderInfo>> {
976        info!("🔍 Searching for {} storage providers...", min_count);
977
978        // For now, we'll use a simplified approach:
979        // In a full implementation, we'd query the DHT for all "storage-providers:*" keys
980        // For MVP, we'll return known peers and assume they're providers
981        
982        let mut providers = Vec::new();
983
984        // Get all known peers from Kademlia routing table
985        for bucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
986            for entry in bucket.iter() {
987                let peer_id = entry.node.key.preimage().clone();
988                
989                // Skip ourselves
990                if peer_id == self.local_peer_id {
991                    continue;
992                }
993
994                // Create provider info for this peer
995                // In a full implementation, we'd query their actual provider record
996                let provider_info = crate::provider::ProviderInfo::new(
997                    peer_id,
998                    10 * 1024 * 1024 * 1024, // Assume 10GB available (TODO: query actual)
999                    vec![], // We'll get addresses from connection
1000                );
1001
1002                providers.push(provider_info);
1003
1004                if providers.len() >= min_count * 2 {
1005                    break;
1006                }
1007            }
1008        }
1009
1010        if providers.is_empty() {
1011            warn!("⚠️  No storage providers found in DHT!");
1012            warn!("    Make sure other nodes are running with:");
1013            warn!("    firecloud storage init --quota 10GB");
1014            warn!("    firecloud node --port 4001");
1015        } else {
1016            info!("✓ Found {} potential storage providers", providers.len());
1017        }
1018
1019        Ok(providers)
1020    }
1021
1022    /// Send a chunk to a storage provider
1023    pub async fn send_chunk_to_provider(
1024        &mut self,
1025        provider: PeerId,
1026        chunk_hash_str: &str,
1027        chunk_data: &[u8],
1028        original_size: u64,
1029    ) -> NetResult<()> {
1030        use crate::protocol::TransferRequest;
1031
1032        debug!("📤 Sending chunk {}... to provider {}", &chunk_hash_str[..16], provider);
1033
1034        let request = TransferRequest::StoreChunk {
1035            hash: chunk_hash_str.to_string(),
1036            data: chunk_data.to_vec(),
1037            original_size,
1038        };
1039
1040        // Send via request-response protocol
1041        self.swarm
1042            .behaviour_mut()
1043            .transfer
1044            .send_request(&provider, request);
1045
1046        // In a full implementation, we'd wait for response
1047        // For now, we'll assume success
1048        debug!("✓ Chunk sent to provider");
1049        
1050        Ok(())
1051    }
1052}
1053
1054
1055/// Extract peer ID from a multiaddr if present
1056fn extract_peer_id(addr: &Multiaddr) -> Option<PeerId> {
1057    addr.iter().find_map(|p| {
1058        if let Protocol::P2p(peer_id) = p {
1059            Some(peer_id)
1060        } else {
1061            None
1062        }
1063    })
1064}