1use crate::{
4 FireCloudBehaviour, FireCloudEvent, HealthMonitor, NetError, NetResult,
5 TransferRequest, TransferResponse,
6 MessageRequest, MessageResponse,
7 relay_manager::{RelayManager, RelayManagerConfig},
8};
9use futures::StreamExt;
10use libp2p::{
11 dcutr,
12 gossipsub::{self, IdentTopic},
13 identify,
14 identity::Keypair,
15 kad,
16 mdns,
17 multiaddr::Protocol,
18 ping,
19 request_response::{self, OutboundRequestId, ResponseChannel},
20 swarm::{SwarmEvent, dial_opts::DialOpts},
21 Multiaddr, PeerId, Swarm,
22};
23use std::{collections::{HashSet, HashMap}, time::Duration};
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27#[derive(Debug, Clone)]
29pub struct NodeConfig {
30 pub port: u16,
32 pub enable_mdns: bool,
34 pub bootstrap_peers: Vec<Multiaddr>,
36 pub bootstrap_relays: Vec<Multiaddr>,
38}
39
40impl Default for NodeConfig {
41 fn default() -> Self {
42 Self {
43 port: 0,
44 enable_mdns: true,
45 bootstrap_peers: Vec::new(),
46 bootstrap_relays: Vec::new(),
47 }
48 }
49}
50
51#[derive(Debug)]
53pub enum NodeEvent {
54 PeerDiscovered(PeerId),
56 PeerDisconnected(PeerId),
58 Message {
60 source: PeerId,
61 topic: String,
62 data: Vec<u8>,
63 },
64 Listening(Multiaddr),
66 TransferRequest {
68 peer: PeerId,
69 request: TransferRequest,
70 channel: ResponseChannel<TransferResponse>,
71 },
72 TransferResponse {
74 peer: PeerId,
75 request_id: OutboundRequestId,
76 response: TransferResponse,
77 },
78 TransferFailed {
80 peer: PeerId,
81 request_id: OutboundRequestId,
82 error: String,
83 },
84 ProvidersFound {
86 key: String,
87 providers: Vec<PeerId>,
88 },
89 ProvideStarted {
91 key: String,
92 },
93 RecordFound {
95 key: String,
96 value: Vec<u8>,
97 },
98 RecordStored {
100 key: String,
101 },
102 DhtQueryFailed {
104 key: String,
105 error: String,
106 },
107
108 MessageRequest {
114 peer: PeerId,
115 request: MessageRequest,
116 channel: ResponseChannel<MessageResponse>,
117 },
118 MessageResponse {
120 peer: PeerId,
121 request_id: OutboundRequestId,
122 response: MessageResponse,
123 },
124 MessageFailed {
126 peer: PeerId,
127 request_id: OutboundRequestId,
128 error: String,
129 },
130}
131
132pub struct FireCloudNode {
134 swarm: Swarm<FireCloudBehaviour>,
135 local_peer_id: PeerId,
136 known_peers: HashSet<PeerId>,
137 local_peers: HashSet<PeerId>,
139 peer_latencies: HashMap<PeerId, Duration>,
141 health_monitor: HealthMonitor,
143 relay_manager: RelayManager,
145 event_tx: mpsc::Sender<NodeEvent>,
146 event_rx: mpsc::Receiver<NodeEvent>,
147}
148
149impl FireCloudNode {
150 pub async fn new(config: NodeConfig) -> NetResult<Self> {
152 let local_key = Keypair::generate_ed25519();
154 let local_peer_id = PeerId::from(local_key.public());
155
156 info!("Local peer ID: {}", local_peer_id);
157
158 let swarm = libp2p::SwarmBuilder::with_existing_identity(local_key.clone())
161 .with_tokio()
162 .with_quic()
163 .with_relay_client(
164 libp2p::noise::Config::new,
165 libp2p::yamux::Config::default,
166 )
167 .map_err(|e| NetError::Other(e.to_string()))?
168 .with_behaviour(|keypair, relay_client| {
169 FireCloudBehaviour::new_with_relay(local_peer_id, keypair, relay_client)
170 .expect("Failed to create FireCloudBehaviour")
171 })
172 .map_err(|e| NetError::Other(e.to_string()))?
173 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
174 .build();
175
176 let (event_tx, event_rx) = mpsc::channel(100);
177
178 let relay_config = RelayManagerConfig {
180 bootstrap_relays: config.bootstrap_relays.clone(),
181 min_active_relays: 2,
182 max_relay_connections: 5,
183 reconnect_interval: Duration::from_secs(30),
184 connection_timeout: Duration::from_secs(10),
185 };
186 let relay_manager = RelayManager::new(relay_config);
187
188 let mut node = Self {
189 swarm,
190 local_peer_id,
191 known_peers: HashSet::new(),
192 local_peers: HashSet::new(),
193 peer_latencies: HashMap::new(),
194 health_monitor: HealthMonitor::new(),
195 relay_manager,
196 event_tx,
197 event_rx,
198 };
199
200 let listen_addr: Multiaddr = format!("/ip4/0.0.0.0/udp/{}/quic-v1", config.port)
202 .parse()
203 .map_err(|e| NetError::Other(format!("Invalid address: {}", e)))?;
204
205 node.swarm
206 .listen_on(listen_addr)
207 .map_err(|e| NetError::Transport(e.to_string()))?;
208
209 for addr in config.bootstrap_peers {
211 if let Some(peer_id) = extract_peer_id(&addr) {
212 node.swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
213 }
214 }
215
216 node.connect_to_bootstrap_relays();
218
219 Ok(node)
220 }
221
222 fn connect_to_bootstrap_relays(&mut self) {
224 let relays_to_connect = self.relay_manager.relays_to_connect();
225
226 for relay_addr in relays_to_connect {
227 info!("Connecting to bootstrap relay: {}", relay_addr);
228
229 self.relay_manager.mark_connecting(&relay_addr);
231
232 match self.swarm.dial(relay_addr.clone()) {
234 Ok(_) => {
235 debug!("Dialing relay at {}", relay_addr);
236 }
237 Err(e) => {
238 warn!("Failed to dial relay {}: {}", relay_addr, e);
239 self.relay_manager.mark_failed(&relay_addr, format!("Dial failed: {}", e));
240 }
241 }
242 }
243
244 let stats = self.relay_manager.stats();
246 info!(
247 "Relay connections: {} total, {} connected, {} listening, {} failed",
248 stats.total_relays, stats.connected_relays, stats.listening_relays, stats.failed_relays
249 );
250 }
251
252 pub fn peer_latency(&self, peer: &PeerId) -> Option<Duration> {
254 self.peer_latencies.get(peer).cloned()
255 }
256
257 pub fn select_lowest_latency_peer(&self, candidates: &[PeerId]) -> Option<PeerId> {
260 candidates
261 .iter()
262 .filter_map(|p| self.peer_latencies.get(p).map(|d| (p, d)))
263 .min_by_key(|(_, d)| *d)
264 .map(|(p, _)| *p)
265 }
266
267 pub fn is_local_peer(&self, peer: &PeerId) -> bool {
269 self.local_peers.contains(peer)
270 }
271
272 pub fn local_peers(&self) -> &HashSet<PeerId> {
274 &self.local_peers
275 }
276
277 pub fn choose_best_peer(&self, candidates: &[PeerId]) -> Option<PeerId> {
283 if candidates.is_empty() {
284 return None;
285 }
286
287 let local_candidates: Vec<PeerId> = candidates
289 .iter()
290 .filter(|p| self.local_peers.contains(p))
291 .cloned()
292 .collect();
293
294 if !local_candidates.is_empty() {
296 if let Some(best) = self.select_lowest_latency_peer(&local_candidates) {
297 return Some(best);
298 }
299 return Some(local_candidates[0]);
301 }
302
303 if let Some(best) = self.select_lowest_latency_peer(candidates) {
305 return Some(best);
306 }
307
308 Some(candidates[0])
310 }
311
312 pub fn local_peer_id(&self) -> PeerId {
314 self.local_peer_id
315 }
316
317 pub fn known_peers(&self) -> &HashSet<PeerId> {
319 &self.known_peers
320 }
321
322 pub fn connected_peers_count(&self) -> usize {
324 self.swarm.connected_peers().count()
325 }
326
327 pub fn connected_peers(&self) -> Vec<PeerId> {
329 self.swarm.connected_peers().cloned().collect()
330 }
331
332 pub fn kademlia_peers_count(&self) -> usize {
334 self.known_peers.len()
337 }
338
339 pub fn add_bootstrap_peer(&mut self, peer_id: &PeerId, addr: Multiaddr) {
341 self.swarm.behaviour_mut().kademlia.add_address(peer_id, addr);
342 info!("Added bootstrap peer: {}", peer_id);
343 }
344
345 pub fn bootstrap(&mut self) -> NetResult<kad::QueryId> {
347 self.swarm
348 .behaviour_mut()
349 .kademlia
350 .bootstrap()
351 .map_err(|e| NetError::Protocol(format!("Bootstrap failed: {:?}", e)))
352 }
353
354 pub fn subscribe(&mut self, topic: &str) -> NetResult<()> {
356 let topic = IdentTopic::new(topic);
357 self.swarm
358 .behaviour_mut()
359 .gossipsub
360 .subscribe(&topic)
361 .map_err(|e| NetError::Protocol(format!("Subscribe failed: {}", e)))?;
362 Ok(())
363 }
364
365 pub fn publish(&mut self, topic: &str, data: Vec<u8>) -> NetResult<()> {
367 let topic = IdentTopic::new(topic);
368 self.swarm
369 .behaviour_mut()
370 .gossipsub
371 .publish(topic, data)
372 .map_err(|e| NetError::Protocol(format!("Publish failed: {}", e)))?;
373 Ok(())
374 }
375
376 pub fn dial(&mut self, addr: Multiaddr) -> NetResult<()> {
378 self.swarm
379 .dial(DialOpts::unknown_peer_id().address(addr).build())
380 .map_err(|e| NetError::Dial(e.to_string()))?;
381 Ok(())
382 }
383
384 pub fn health_monitor(&self) -> &HealthMonitor {
386 &self.health_monitor
387 }
388
389 pub fn get_best_peer_for_transfer(&self) -> Option<PeerId> {
391 self.health_monitor.get_best_peer()
392 }
393
394 pub fn get_healthy_peers(&self) -> Vec<PeerId> {
396 self.health_monitor.get_healthy_peers()
397 }
398
399 pub fn cleanup_offline_peers(&mut self) {
401 self.health_monitor.cleanup_offline_peers();
402 }
403
404 pub fn send_transfer_request(&mut self, peer: &PeerId, request: TransferRequest) -> OutboundRequestId {
406 self.swarm.behaviour_mut().transfer.send_request(peer, request)
407 }
408
409 pub fn respond_transfer(&mut self, channel: ResponseChannel<TransferResponse>, response: TransferResponse) -> NetResult<()> {
411 self.swarm.behaviour_mut().transfer.send_response(channel, response)
412 .map_err(|_| NetError::Protocol("Failed to send response".to_string()))
413 }
414
415 pub fn request_chunk(&mut self, peer: &PeerId, hash: String) -> OutboundRequestId {
417 self.send_transfer_request(peer, TransferRequest::GetChunk { hash })
418 }
419
420 pub fn send_message_request(&mut self, peer: &PeerId, request: MessageRequest) -> OutboundRequestId {
426 self.swarm.behaviour_mut().messaging.send_request(peer, request)
427 }
428
429 pub fn respond_message(&mut self, channel: ResponseChannel<MessageResponse>, response: MessageResponse) -> NetResult<()> {
431 self.swarm.behaviour_mut().messaging.send_response(channel, response)
432 .map_err(|_| NetError::Protocol("Failed to send message response".to_string()))
433 }
434
435 pub fn send_friend_request(&mut self, peer: &PeerId, name: Option<String>) -> OutboundRequestId {
437 self.send_message_request(peer, MessageRequest::FriendRequest { name })
438 }
439
440 pub fn send_friend_accept(&mut self, peer: &PeerId) -> OutboundRequestId {
442 self.send_message_request(peer, MessageRequest::FriendAccept)
443 }
444
445 pub fn send_direct_message(
447 &mut self,
448 peer: &PeerId,
449 content: Vec<u8>,
450 message_id: String,
451 timestamp: i64,
452 ) -> OutboundRequestId {
453 self.send_message_request(
454 peer,
455 MessageRequest::DirectMessage {
456 content,
457 message_id,
458 timestamp,
459 },
460 )
461 }
462
463 pub fn ping_peer(&mut self, peer: &PeerId) -> OutboundRequestId {
465 self.send_message_request(peer, MessageRequest::Ping)
466 }
467
468 pub fn store_chunk_on_peer(&mut self, peer: &PeerId, hash: String, data: Vec<u8>, original_size: u64) -> OutboundRequestId {
470 self.send_transfer_request(peer, TransferRequest::StoreChunk { hash, data, original_size })
471 }
472
473 pub fn check_chunk(&mut self, peer: &PeerId, hash: String) -> OutboundRequestId {
475 self.send_transfer_request(peer, TransferRequest::HasChunk { hash })
476 }
477
478 pub fn announce_file(&mut self, file_id: &str) -> kad::QueryId {
483 let key = kad::RecordKey::new(&file_id.as_bytes());
484 self.swarm.behaviour_mut().kademlia.start_providing(key)
485 .expect("Failed to start providing")
486 }
487
488 pub fn stop_announcing_file(&mut self, file_id: &str) {
490 let key = kad::RecordKey::new(&file_id.as_bytes());
491 self.swarm.behaviour_mut().kademlia.stop_providing(&key);
492 }
493
494 pub fn find_file_providers(&mut self, file_id: &str) -> kad::QueryId {
497 let key = kad::RecordKey::new(&file_id.as_bytes());
498 self.swarm.behaviour_mut().kademlia.get_providers(key)
499 }
500
501 pub fn put_dht_record(&mut self, key: &str, value: Vec<u8>) -> kad::QueryId {
504 let record = kad::Record {
505 key: kad::RecordKey::new(&key.as_bytes()),
506 value,
507 publisher: Some(self.local_peer_id),
508 expires: None,
509 };
510 self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One)
511 .expect("Failed to put record")
512 }
513
514 pub fn get_dht_record(&mut self, key: &str) -> kad::QueryId {
517 let key = kad::RecordKey::new(&key.as_bytes());
518 self.swarm.behaviour_mut().kademlia.get_record(key)
519 }
520
521 pub fn bootstrap_dht(&mut self) -> Result<kad::QueryId, kad::NoKnownPeers> {
523 self.swarm.behaviour_mut().kademlia.bootstrap()
524 }
525
526 pub async fn run(&mut self) {
530 loop {
531 tokio::select! {
532 event = self.swarm.select_next_some() => {
533 self.handle_swarm_event(event).await;
534 }
535 }
536 }
537 }
538
539 pub async fn poll_event(&mut self) -> Option<NodeEvent> {
541 tokio::select! {
542 event = self.swarm.select_next_some() => {
543 self.handle_swarm_event(event).await;
544 self.event_rx.try_recv().ok()
545 }
546 event = self.event_rx.recv() => {
547 event
548 }
549 }
550 }
551
552 async fn handle_swarm_event(&mut self, event: SwarmEvent<FireCloudEvent>) {
553 match event {
554 SwarmEvent::NewListenAddr { address, .. } => {
555 info!("Listening on {}/p2p/{}", address, self.local_peer_id);
556 let _ = self.event_tx.send(NodeEvent::Listening(address)).await;
557 }
558
559 SwarmEvent::Behaviour(FireCloudEvent::Mdns(mdns::Event::Discovered(peers))) => {
560 for (peer_id, addr) in peers {
561 if !self.known_peers.contains(&peer_id) {
562 info!("mDNS discovered (local): {} at {}", peer_id, addr);
563 self.known_peers.insert(peer_id);
564 self.local_peers.insert(peer_id);
566
567 self.health_monitor.add_peer(peer_id, true);
569
570 self.swarm.behaviour_mut().kademlia.add_address(&peer_id, addr);
572
573 let _ = self.event_tx.send(NodeEvent::PeerDiscovered(peer_id)).await;
574 }
575 }
576 }
577
578 SwarmEvent::Behaviour(FireCloudEvent::Mdns(mdns::Event::Expired(peers))) => {
579 for (peer_id, _) in peers {
580 debug!("mDNS peer expired: {}", peer_id);
581 self.local_peers.remove(&peer_id);
583 }
585 }
586
587 SwarmEvent::Behaviour(FireCloudEvent::Kademlia(kad::Event::RoutingUpdated {
588 peer,
589 addresses,
590 ..
591 })) => {
592 debug!("Kademlia routing updated: {} with {} addresses", peer, addresses.len());
593 if !self.known_peers.contains(&peer) {
594 self.known_peers.insert(peer);
595 let _ = self.event_tx.send(NodeEvent::PeerDiscovered(peer)).await;
596 }
597 }
598
599 SwarmEvent::Behaviour(FireCloudEvent::Kademlia(kad::Event::OutboundQueryProgressed {
601 result,
602 ..
603 })) => {
604 match result {
605 kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {
606 let key_str = String::from_utf8_lossy(key.as_ref()).to_string();
607 let provider_ids: Vec<PeerId> = providers.into_iter().collect();
608 info!("Found {} providers for {}", provider_ids.len(), key_str);
609 let _ = self.event_tx.send(NodeEvent::ProvidersFound {
610 key: key_str,
611 providers: provider_ids,
612 }).await;
613 }
614 kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. })) => {
615 debug!("GetProviders finished with no additional records");
616 }
617 kad::QueryResult::GetProviders(Err(e)) => {
618 warn!("GetProviders failed: {:?}", e);
619 let _ = self.event_tx.send(NodeEvent::DhtQueryFailed {
620 key: String::from_utf8_lossy(e.key().as_ref()).to_string(),
621 error: format!("{:?}", e),
622 }).await;
623 }
624 kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => {
625 let key_str = String::from_utf8_lossy(key.as_ref()).to_string();
626 info!("Started providing: {}", key_str);
627 let _ = self.event_tx.send(NodeEvent::ProvideStarted {
628 key: key_str,
629 }).await;
630 }
631 kad::QueryResult::StartProviding(Err(e)) => {
632 warn!("StartProviding failed: {:?}", e);
633 }
634 kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(peer_record))) => {
635 let key_str = String::from_utf8_lossy(peer_record.record.key.as_ref()).to_string();
636 info!("Found record: {} ({} bytes)", key_str, peer_record.record.value.len());
637 let _ = self.event_tx.send(NodeEvent::RecordFound {
638 key: key_str,
639 value: peer_record.record.value,
640 }).await;
641 }
642 kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. })) => {
643 debug!("GetRecord finished with no additional records");
644 }
645 kad::QueryResult::GetRecord(Err(e)) => {
646 warn!("GetRecord failed: {:?}", e);
647 let key_bytes = match &e {
648 kad::GetRecordError::NotFound { key, .. } => key.as_ref(),
649 kad::GetRecordError::QuorumFailed { key, .. } => key.as_ref(),
650 kad::GetRecordError::Timeout { key, .. } => key.as_ref(),
651 };
652 let _ = self.event_tx.send(NodeEvent::DhtQueryFailed {
653 key: String::from_utf8_lossy(key_bytes).to_string(),
654 error: format!("{:?}", e),
655 }).await;
656 }
657 kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
658 let key_str = String::from_utf8_lossy(key.as_ref()).to_string();
659 info!("Record stored: {}", key_str);
660 let _ = self.event_tx.send(NodeEvent::RecordStored {
661 key: key_str,
662 }).await;
663 }
664 kad::QueryResult::PutRecord(Err(e)) => {
665 warn!("PutRecord failed: {:?}", e);
666 }
667 kad::QueryResult::Bootstrap(Ok(_)) => {
668 info!("Kademlia bootstrap successful");
669 }
670 kad::QueryResult::Bootstrap(Err(e)) => {
671 warn!("Kademlia bootstrap failed: {:?}", e);
672 }
673 _ => {}
674 }
675 }
676
677 SwarmEvent::Behaviour(FireCloudEvent::Gossipsub(gossipsub::Event::Message {
678 propagation_source,
679 message,
680 ..
681 })) => {
682 info!(
683 "Received message from {} on topic {}",
684 propagation_source,
685 message.topic
686 );
687 let _ = self.event_tx.send(NodeEvent::Message {
688 source: propagation_source,
689 topic: message.topic.to_string(),
690 data: message.data,
691 }).await;
692 }
693
694 SwarmEvent::Behaviour(FireCloudEvent::Identify(identify::Event::Received {
695 peer_id,
696 info,
697 ..
698 })) => {
699 debug!(
700 "Identified peer {}: {} with {} addresses",
701 peer_id,
702 info.protocol_version,
703 info.listen_addrs.len()
704 );
705
706 for addr in &info.listen_addrs {
709 self.swarm.behaviour_mut().kademlia.add_address(&peer_id, addr.clone());
710 }
711 debug!("Kademlia routing updated: {} with {} addresses", peer_id, info.listen_addrs.len());
712 }
713
714 SwarmEvent::Behaviour(FireCloudEvent::Ping(ping::Event { peer, result, .. })) => {
715 match result {
716 Ok(rtt) => {
717 debug!("Ping to {}: {:?}", peer, rtt);
718 self.peer_latencies.insert(peer, rtt);
720 self.health_monitor.record_ping_success(&peer, rtt);
722 }
723 Err(e) => {
724 warn!("Ping to {} failed: {}", peer, e);
725 self.health_monitor.record_ping_failure(&peer);
727 }
728 }
729 }
730
731 SwarmEvent::Behaviour(FireCloudEvent::RelayClient(event)) => {
733 use libp2p::relay;
734 match event {
735 relay::client::Event::ReservationReqAccepted { relay_peer_id, .. } => {
736 info!("✅ Relay reservation accepted by {}", relay_peer_id);
737
738 let bootstrap_relays: Vec<_> = self.relay_manager.config().bootstrap_relays.clone();
740 for relay_addr in bootstrap_relays {
741 if let Some(peer) = extract_peer_id(&relay_addr) {
743 if peer == relay_peer_id {
744 self.relay_manager.mark_connected(&relay_addr, relay_peer_id);
745
746 let circuit_addr = relay_addr
748 .with(libp2p::multiaddr::Protocol::P2pCircuit)
749 .with(libp2p::multiaddr::Protocol::P2p(self.local_peer_id));
750
751 info!("🔊 Listening on relay circuit: {}", circuit_addr);
752
753 match self.swarm.listen_on(circuit_addr.clone()) {
755 Ok(_) => {
756 self.relay_manager.mark_listening(&relay_peer_id, circuit_addr);
757 }
758 Err(e) => {
759 warn!("Failed to listen on relay circuit: {}", e);
760 }
761 }
762 break;
763 }
764 }
765 }
766 }
767 relay::client::Event::OutboundCircuitEstablished { relay_peer_id, .. } => {
768 info!("🔗 Outbound circuit established via relay {}", relay_peer_id);
769 }
770 relay::client::Event::InboundCircuitEstablished { src_peer_id, .. } => {
771 info!("🔗 Inbound circuit established from {}", src_peer_id);
772 }
773 _ => {
774 debug!("Relay client event: {:?}", event);
775 }
776 }
777 }
778
779 SwarmEvent::Behaviour(FireCloudEvent::Dcutr(dcutr::Event {
781 remote_peer_id,
782 result,
783 })) => {
784 match result {
785 Ok(_connection_id) => {
786 info!("✅ Hole punch SUCCESS with {} - direct connection established", remote_peer_id);
787 }
789 Err(error) => {
790 warn!("❌ Hole punch FAILED with {}: {:?}", remote_peer_id, error);
791 }
793 }
794 }
795
796 SwarmEvent::Behaviour(FireCloudEvent::Transfer(event)) => {
797 match event {
798 request_response::Event::Message { peer, message } => {
799 match message {
800 request_response::Message::Request { request, channel, .. } => {
801 info!("Transfer request from {}: {:?}", peer, request);
802 let _ = self.event_tx.send(NodeEvent::TransferRequest {
806 peer,
807 request,
808 channel,
809 }).await;
810 }
811 request_response::Message::Response { request_id, response } => {
812 info!("Transfer response for {:?}: {:?}", request_id, response);
813 let _ = self.event_tx.send(NodeEvent::TransferResponse {
814 peer,
815 request_id,
816 response,
817 }).await;
818 }
819 }
820 }
821 request_response::Event::OutboundFailure { peer, request_id, error, .. } => {
822 warn!("Transfer to {} failed: {:?}", peer, error);
823 let _ = self.event_tx.send(NodeEvent::TransferFailed {
824 peer,
825 request_id,
826 error: format!("{:?}", error),
827 }).await;
828 }
829 request_response::Event::InboundFailure { peer, error, .. } => {
830 warn!("Inbound transfer from {} failed: {:?}", peer, error);
831 }
832 request_response::Event::ResponseSent { peer, .. } => {
833 debug!("Response sent to {}", peer);
834 }
835 }
836 }
837
838 SwarmEvent::Behaviour(FireCloudEvent::Messaging(event)) => {
842 match event {
843 request_response::Event::Message { peer, message } => {
844 match message {
845 request_response::Message::Request { request, channel, .. } => {
846 info!("📨 Message request from {}: {:?}", peer, request);
847 let _ = self.event_tx.send(NodeEvent::MessageRequest {
849 peer,
850 request,
851 channel,
852 }).await;
853 }
854 request_response::Message::Response { request_id, response } => {
855 info!("✅ Message response for {:?}: {:?}", request_id, response);
856 let _ = self.event_tx.send(NodeEvent::MessageResponse {
857 peer,
858 request_id,
859 response,
860 }).await;
861 }
862 }
863 }
864 request_response::Event::OutboundFailure { peer, request_id, error, .. } => {
865 warn!("❌ Message to {} failed: {:?}", peer, error);
866 let _ = self.event_tx.send(NodeEvent::MessageFailed {
867 peer,
868 request_id,
869 error: format!("{:?}", error),
870 }).await;
871 }
872 request_response::Event::InboundFailure { peer, error, .. } => {
873 warn!("❌ Inbound message from {} failed: {:?}", peer, error);
874 }
875 request_response::Event::ResponseSent { peer, .. } => {
876 debug!("✅ Message response sent to {}", peer);
877 }
878 }
879 }
880
881 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
882 info!("Connected to {}", peer_id);
883 if !self.known_peers.contains(&peer_id) {
884 self.known_peers.insert(peer_id);
885 let _ = self.event_tx.send(NodeEvent::PeerDiscovered(peer_id)).await;
886 }
887
888 let bootstrap_relays: Vec<_> = self.relay_manager.config().bootstrap_relays.clone();
890 for relay_addr in bootstrap_relays {
891 if let Some(relay_peer) = extract_peer_id(&relay_addr) {
892 if relay_peer == peer_id {
893 info!("🔗 Connected to relay server, requesting reservation...");
894 self.relay_manager.mark_connected(&relay_addr, peer_id);
895
896 let circuit_addr = relay_addr
898 .with(libp2p::multiaddr::Protocol::P2pCircuit);
899
900 match self.swarm.listen_on(circuit_addr.clone()) {
901 Ok(_) => {
902 info!("📡 Listening on relay circuit: {}", circuit_addr);
903 }
904 Err(e) => {
905 warn!("Failed to listen on relay circuit: {}", e);
906 }
907 }
908 break;
909 }
910 }
911 }
912 }
913
914 SwarmEvent::ConnectionClosed { peer_id, .. } => {
915 debug!("Disconnected from {}", peer_id);
916 let _ = self.event_tx.send(NodeEvent::PeerDisconnected(peer_id)).await;
917 }
918
919 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
920 warn!("Outgoing connection error to {:?}: {}", peer_id, error);
921 }
922
923 SwarmEvent::IncomingConnectionError { error, .. } => {
924 warn!("Incoming connection error: {}", error);
925 }
926
927 _ => {}
928 }
929 }
930
931 pub async fn announce_as_provider(&mut self, available_space: u64) -> NetResult<()> {
935 use crate::provider::ProviderInfo;
936
937 info!("📢 Announcing as storage provider: {} GB available", available_space / (1024 * 1024 * 1024));
938
939 let listen_addrs: Vec<Multiaddr> = self.swarm.listeners().cloned().collect();
941
942 let provider_info = ProviderInfo::new(
944 self.local_peer_id,
945 available_space,
946 listen_addrs,
947 );
948
949 let provider_data = bincode::serialize(&provider_info)
951 .map_err(|e| NetError::Other(format!("Failed to serialize provider info: {}", e)))?;
952
953 let key_string = format!("storage-providers:{}", self.local_peer_id);
955 let key = libp2p::kad::RecordKey::new(&key_string.as_bytes());
956
957 let record = libp2p::kad::Record {
958 key: key.clone(),
959 value: provider_data,
960 publisher: Some(self.local_peer_id),
961 expires: None, };
963
964 self.swarm
965 .behaviour_mut()
966 .kademlia
967 .put_record(record, libp2p::kad::Quorum::One)
968 .map_err(|e| NetError::Other(format!("Failed to put DHT record: {}", e)))?;
969
970 info!("✓ Provider announcement sent to DHT");
971 Ok(())
972 }
973
974 pub async fn find_storage_providers(&mut self, min_count: usize) -> NetResult<Vec<crate::provider::ProviderInfo>> {
976 info!("🔍 Searching for {} storage providers...", min_count);
977
978 let mut providers = Vec::new();
983
984 for bucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
986 for entry in bucket.iter() {
987 let peer_id = entry.node.key.preimage().clone();
988
989 if peer_id == self.local_peer_id {
991 continue;
992 }
993
994 let provider_info = crate::provider::ProviderInfo::new(
997 peer_id,
998 10 * 1024 * 1024 * 1024, vec![], );
1001
1002 providers.push(provider_info);
1003
1004 if providers.len() >= min_count * 2 {
1005 break;
1006 }
1007 }
1008 }
1009
1010 if providers.is_empty() {
1011 warn!("⚠️ No storage providers found in DHT!");
1012 warn!(" Make sure other nodes are running with:");
1013 warn!(" firecloud storage init --quota 10GB");
1014 warn!(" firecloud node --port 4001");
1015 } else {
1016 info!("✓ Found {} potential storage providers", providers.len());
1017 }
1018
1019 Ok(providers)
1020 }
1021
1022 pub async fn send_chunk_to_provider(
1024 &mut self,
1025 provider: PeerId,
1026 chunk_hash_str: &str,
1027 chunk_data: &[u8],
1028 original_size: u64,
1029 ) -> NetResult<()> {
1030 use crate::protocol::TransferRequest;
1031
1032 debug!("📤 Sending chunk {}... to provider {}", &chunk_hash_str[..16], provider);
1033
1034 let request = TransferRequest::StoreChunk {
1035 hash: chunk_hash_str.to_string(),
1036 data: chunk_data.to_vec(),
1037 original_size,
1038 };
1039
1040 self.swarm
1042 .behaviour_mut()
1043 .transfer
1044 .send_request(&provider, request);
1045
1046 debug!("✓ Chunk sent to provider");
1049
1050 Ok(())
1051 }
1052}
1053
1054
1055fn extract_peer_id(addr: &Multiaddr) -> Option<PeerId> {
1057 addr.iter().find_map(|p| {
1058 if let Protocol::P2p(peer_id) = p {
1059 Some(peer_id)
1060 } else {
1061 None
1062 }
1063 })
1064}