1use super::key_synchronizer::KeySynchronizer;
2use super::{
7 BackendMetrics, BackendType, BlockStats, HealthCheck, HealthStatus, IpfsBackend, PinInfo,
8 iroh::IrohBackend,
9};
10use crate::error::{GuardianError, Result};
11use crate::ipfs_core_api::{config::ClientConfig, types::*};
12use async_trait::async_trait;
13use cid::Cid;
14use libp2p::PeerId;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::io::{AsyncRead, AsyncWriteExt};
19use tokio::sync::RwLock;
20use tracing::{debug, info, warn};
21
22pub struct HybridBackend {
28 iroh: IrohBackend,
30 libp2p_swarm: Arc<LibP2PSwarm>,
32 key_sync: KeySynchronizer,
34 #[allow(dead_code)]
36 config: ClientConfig,
37 metrics: Arc<RwLock<BackendMetrics>>,
39}
40
41pub struct LibP2PSwarm {
43 peer_id: PeerId,
45 is_online: Arc<RwLock<bool>>,
47 connected_peers: Arc<RwLock<Vec<PeerInfo>>>,
49 swarm_manager: Option<Arc<crate::p2p::manager::SwarmManager>>,
51}
52
53impl HybridBackend {
54 pub async fn new(config: &ClientConfig) -> Result<Self> {
65 info!("Inicializando backend híbrido (Iroh + LibP2P)");
66
67 let key_sync = KeySynchronizer::new(config).await?;
69
70 let iroh = IrohBackend::new(config).await?;
72
73 let libp2p_swarm = LibP2PSwarm::new(&key_sync, config).await?;
75
76 let backend = Self {
77 iroh,
78 libp2p_swarm: Arc::new(libp2p_swarm),
79 key_sync,
80 config: config.clone(),
81 metrics: Arc::new(RwLock::new(BackendMetrics {
82 ops_per_second: 0.0,
83 avg_latency_ms: 0.0,
84 total_operations: 0,
85 error_count: 0,
86 memory_usage_bytes: 0,
87 })),
88 };
89
90 backend.verify_peer_id_sync().await?;
92
93 info!(
94 "Backend híbrido inicializado com PeerId: {}",
95 backend.key_sync.peer_id()
96 );
97
98 Ok(backend)
99 }
100
101 async fn verify_peer_id_sync(&self) -> Result<()> {
103 let iroh_id = self.iroh.id().await?.id;
104 let libp2p_id = self.libp2p_swarm.peer_id();
105 let sync_id = self.key_sync.peer_id();
106
107 if iroh_id != sync_id || libp2p_id != sync_id {
108 return Err(GuardianError::Other(format!(
109 "PeerID desincronizado: Iroh={}, LibP2P={}, Sync={}",
110 iroh_id, libp2p_id, sync_id
111 )));
112 }
113
114 debug!("PeerIDs sincronizados via KeySynchronizer: {}", sync_id);
115
116 let sync_stats = self.key_sync.get_statistics().await;
118 debug!("Estatísticas de sincronização: {:?}", sync_stats);
119
120 Ok(())
121 }
122
123 async fn sync_peer_lists(&self) -> Result<()> {
125 debug!("Sincronizando listas de peers entre Iroh e LibP2P");
126
127 let iroh_peers = self.iroh.peers().await.unwrap_or_default();
129 let libp2p_peers = self.libp2p_swarm.peers().await;
130
131 let mut peers_to_connect_iroh = Vec::new();
133 let mut peers_to_connect_libp2p = Vec::new();
134
135 for libp2p_peer in &libp2p_peers {
137 if !iroh_peers
138 .iter()
139 .any(|iroh_peer| iroh_peer.id == libp2p_peer.id)
140 {
141 peers_to_connect_iroh.push(libp2p_peer.id);
142 }
143 }
144
145 for iroh_peer in &iroh_peers {
147 if !libp2p_peers
148 .iter()
149 .any(|libp2p_peer| libp2p_peer.id == iroh_peer.id)
150 {
151 peers_to_connect_libp2p.push(iroh_peer.id);
152 }
153 }
154
155 let mut sync_results = Vec::new();
157
158 for peer in peers_to_connect_iroh {
159 match self.iroh.connect(&peer).await {
160 Ok(()) => {
161 debug!("Sincronizou peer {} para Iroh", peer);
162 sync_results.push((peer, "iroh", true));
163 }
164 Err(e) => {
165 debug!("Falha ao sincronizar peer {} para Iroh: {}", peer, e);
166 sync_results.push((peer, "iroh", false));
167 }
168 }
169 }
170
171 for peer in peers_to_connect_libp2p {
172 match self.libp2p_swarm.connect(&peer).await {
173 Ok(()) => {
174 debug!("Sincronizou peer {} para LibP2P", peer);
175 sync_results.push((peer, "libp2p", true));
176 }
177 Err(e) => {
178 debug!("Falha ao sincronizar peer {} para LibP2P: {}", peer, e);
179 sync_results.push((peer, "libp2p", false));
180 }
181 }
182 }
183
184 let successful_syncs = sync_results
185 .iter()
186 .filter(|(_, _, success)| *success)
187 .count();
188 let total_syncs = sync_results.len();
189
190 info!(
191 "Sincronização de peers completa: {}/{} sucessos",
192 successful_syncs, total_syncs
193 );
194
195 Ok(())
196 }
197
198 async fn check_cross_system_connectivity(&self) -> Result<usize> {
200 debug!("Verificando conectividade entre sistemas Iroh e LibP2P");
201
202 let iroh_peers = match self.iroh.peers().await {
204 Ok(peers) => peers
205 .into_iter()
206 .filter(|p| p.connected)
207 .collect::<Vec<_>>(),
208 Err(_) => Vec::new(),
209 };
210
211 let libp2p_peers = {
212 let peers = self.libp2p_swarm.connected_peers.read().await;
213 peers
214 .iter()
215 .filter(|p| p.connected)
216 .cloned()
217 .collect::<Vec<_>>()
218 };
219
220 let mut common_peers = 0;
222 let mut connectivity_issues = Vec::new();
223
224 let mut all_peer_ids = std::collections::HashSet::new();
226 for peer in &iroh_peers {
227 all_peer_ids.insert(peer.id);
228 }
229 for peer in &libp2p_peers {
230 all_peer_ids.insert(peer.id);
231 }
232
233 let total_unique_peers = all_peer_ids.len();
234
235 for peer_id in all_peer_ids {
237 let in_iroh = iroh_peers.iter().any(|p| p.id == peer_id);
238 let in_libp2p = libp2p_peers.iter().any(|p| p.id == peer_id);
239
240 match (in_iroh, in_libp2p) {
241 (true, true) => {
242 common_peers += 1;
243 debug!("Peer {} conectado em ambos os sistemas", peer_id);
244 }
245 (true, false) => {
246 connectivity_issues.push(format!("Peer {} apenas no Iroh", peer_id));
247 }
248 (false, true) => {
249 connectivity_issues.push(format!("Peer {} apenas no LibP2P", peer_id));
250 }
251 (false, false) => {
252 connectivity_issues.push(format!("Peer {} não encontrado", peer_id));
254 }
255 }
256 }
257
258 let connectivity_ratio = if total_unique_peers > 0 {
259 (common_peers as f64) / (total_unique_peers as f64)
260 } else {
261 1.0 };
263
264 info!(
265 "Conectividade entre sistemas: {}/{} peers sincronizados ({:.1}%)",
266 common_peers,
267 total_unique_peers,
268 connectivity_ratio * 100.0
269 );
270
271 if !connectivity_issues.is_empty() {
272 debug!(
273 "Problemas de conectividade detectados: {:?}",
274 connectivity_issues
275 );
276 }
277
278 if connectivity_ratio >= 0.8 {
280 Ok(common_peers)
281 } else {
282 Err(GuardianError::Other(format!(
283 "Baixa conectividade entre sistemas: apenas {:.1}% dos peers sincronizados",
284 connectivity_ratio * 100.0
285 )))
286 }
287 }
288
289 async fn update_combined_metrics(&self) -> Result<()> {
291 let iroh_metrics = self.iroh.metrics().await?;
292 let libp2p_metrics = self.libp2p_swarm.metrics().await;
293
294 let mut combined = self.metrics.write().await;
295
296 combined.total_operations = iroh_metrics.total_operations;
298 combined.error_count = iroh_metrics.error_count;
299 combined.avg_latency_ms =
300 (iroh_metrics.avg_latency_ms + libp2p_metrics.avg_latency_ms) / 2.0;
301 combined.memory_usage_bytes =
302 iroh_metrics.memory_usage_bytes + libp2p_metrics.memory_usage_bytes;
303
304 Ok(())
305 }
306}
307
308#[async_trait]
309impl IpfsBackend for HybridBackend {
310 async fn add(&self, data: Pin<Box<dyn AsyncRead + Send>>) -> Result<AddResponse> {
313 debug!("HybridBackend: delegando add para Iroh");
314 self.iroh.add(data).await
315 }
316
317 async fn cat(&self, cid: &str) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
318 debug!("HybridBackend: delegando cat para Iroh");
319 self.iroh.cat(cid).await
320 }
321
322 async fn pin_add(&self, cid: &str) -> Result<()> {
323 debug!("HybridBackend: delegando pin_add para Iroh");
324 self.iroh.pin_add(cid).await
325 }
326
327 async fn pin_rm(&self, cid: &str) -> Result<()> {
328 debug!("HybridBackend: delegando pin_rm para Iroh");
329 self.iroh.pin_rm(cid).await
330 }
331
332 async fn pin_ls(&self) -> Result<Vec<PinInfo>> {
333 debug!("HybridBackend: delegando pin_ls para Iroh");
334 self.iroh.pin_ls().await
335 }
336
337 async fn connect(&self, peer: &PeerId) -> Result<()> {
340 debug!("HybridBackend: conectando peer {} via LibP2P e Iroh", peer);
341
342 let libp2p_result = self.libp2p_swarm.connect(peer).await;
344 let iroh_result = self.iroh.connect(peer).await;
345
346 if libp2p_result.is_ok() || iroh_result.is_ok() {
348 Ok(())
349 } else {
350 Err(GuardianError::Other(format!(
351 "Falha ao conectar peer {}: LibP2P={:?}, Iroh={:?}",
352 peer, libp2p_result, iroh_result
353 )))
354 }
355 }
356
357 async fn peers(&self) -> Result<Vec<PeerInfo>> {
358 debug!("HybridBackend: obtendo peers de LibP2P e Iroh");
359
360 if let Err(e) = self.sync_peer_lists().await {
362 debug!("Aviso: Falha na sincronização de peers: {}", e);
363 }
364
365 let libp2p_peers = self.libp2p_swarm.peers().await;
367 let iroh_peers = self.iroh.peers().await.unwrap_or_default();
368
369 let mut all_peers = libp2p_peers;
371 for iroh_peer in iroh_peers {
372 if let Some(existing_peer) = all_peers.iter_mut().find(|p| p.id == iroh_peer.id) {
373 for addr in iroh_peer.addresses {
375 if !existing_peer.addresses.contains(&addr) {
376 existing_peer.addresses.push(addr);
377 }
378 }
379 for protocol in iroh_peer.protocols {
381 if !existing_peer.protocols.contains(&protocol) {
382 existing_peer.protocols.push(protocol);
383 }
384 }
385 existing_peer.connected = existing_peer.connected || iroh_peer.connected;
387 } else {
388 all_peers.push(iroh_peer);
389 }
390 }
391
392 info!(
393 "Backend híbrido retornando {} peers únicos",
394 all_peers.len()
395 );
396 Ok(all_peers)
397 }
398
399 async fn id(&self) -> Result<NodeInfo> {
400 debug!("HybridBackend: obtendo ID do nó");
401
402 let mut info = self.iroh.id().await?;
404 info.id = self.key_sync.peer_id();
405 info.agent_version = "guardian-db-hybrid-with-full-keysync/0.1.0".to_string();
406
407 Ok(info)
408 }
409
410 async fn dht_find_peer(&self, peer: &PeerId) -> Result<Vec<String>> {
411 debug!("HybridBackend: procurando peer {} no DHT", peer);
412
413 let libp2p_addrs = self.libp2p_swarm.find_peer(peer).await;
415 let iroh_addrs = self.iroh.dht_find_peer(peer).await.unwrap_or_default();
416
417 let mut all_addrs = libp2p_addrs;
419 all_addrs.extend(iroh_addrs);
420 all_addrs.dedup();
421
422 Ok(all_addrs)
423 }
424
425 async fn repo_stat(&self) -> Result<RepoStats> {
428 self.iroh.repo_stat().await
429 }
430
431 async fn version(&self) -> Result<VersionInfo> {
432 let mut version = self.iroh.version().await?;
433 version.version = "hybrid-iroh+libp2p-0.1.0".to_string();
434 Ok(version)
435 }
436
437 async fn block_get(&self, cid: &Cid) -> Result<Vec<u8>> {
440 self.iroh.block_get(cid).await
441 }
442
443 async fn block_put(&self, data: Vec<u8>) -> Result<Cid> {
444 self.iroh.block_put(data).await
445 }
446
447 async fn block_stat(&self, cid: &Cid) -> Result<BlockStats> {
448 self.iroh.block_stat(cid).await
449 }
450
451 fn backend_type(&self) -> BackendType {
454 BackendType::Hybrid
455 }
456
457 async fn is_online(&self) -> bool {
458 let iroh_online = self.iroh.is_online().await;
460 let libp2p_online = *self.libp2p_swarm.is_online.read().await;
461
462 iroh_online || libp2p_online
463 }
464
465 async fn metrics(&self) -> Result<BackendMetrics> {
466 self.update_combined_metrics().await?;
467 let metrics = self.metrics.read().await;
468 Ok(metrics.clone())
469 }
470
471 async fn health_check(&self) -> Result<HealthStatus> {
472 let start = Instant::now();
473 let mut checks = Vec::new();
474 let mut healthy = true;
475
476 match self.iroh.health_check().await {
478 Ok(iroh_health) => {
479 checks.push(HealthCheck {
480 name: "iroh_backend".to_string(),
481 passed: iroh_health.healthy,
482 message: format!("Iroh: {}", iroh_health.message),
483 });
484 if !iroh_health.healthy {
485 healthy = false;
486 }
487 }
488 Err(e) => {
489 checks.push(HealthCheck {
490 name: "iroh_backend".to_string(),
491 passed: false,
492 message: format!("Erro no Iroh: {}", e),
493 });
494 healthy = false;
495 }
496 }
497
498 let libp2p_health = self.libp2p_swarm.health_check().await;
500 checks.push(HealthCheck {
501 name: "libp2p_swarm".to_string(),
502 passed: libp2p_health.healthy,
503 message: format!("LibP2P: {}", libp2p_health.message),
504 });
505 if !libp2p_health.healthy {
506 healthy = false;
507 }
508
509 let sync_check = self.verify_peer_id_sync().await.is_ok();
511 checks.push(HealthCheck {
512 name: "peer_id_sync".to_string(),
513 passed: sync_check,
514 message: if sync_check {
515 "PeerIDs sincronizados".to_string()
516 } else {
517 "PeerIDs desincronizados".to_string()
518 },
519 });
520 if !sync_check {
521 healthy = false;
522 }
523
524 let connectivity_check = self.check_cross_system_connectivity().await;
526 let connectivity_passed = connectivity_check.is_ok();
527 let connectivity_message = match connectivity_check {
528 Ok(peers_synced) => format!("Conectividade OK: {} peers sincronizados", peers_synced),
529 Err(ref e) => format!("Falha na conectividade: {}", e),
530 };
531 checks.push(HealthCheck {
532 name: "cross_system_connectivity".to_string(),
533 passed: connectivity_passed,
534 message: connectivity_message,
535 });
536 if !connectivity_passed {
537 healthy = false;
538 }
539
540 let key_sync_stats = self.key_sync.get_statistics().await;
542 let key_sync_healthy =
543 key_sync_stats.success_rate >= 0.8 && key_sync_stats.pending_messages < 100;
544 checks.push(HealthCheck {
545 name: "key_synchronizer".to_string(),
546 passed: key_sync_healthy,
547 message: if key_sync_healthy {
548 format!(
549 "KeySync OK: {} sincronizados, {} peers ativos",
550 key_sync_stats.messages_synced, key_sync_stats.active_peers
551 )
552 } else {
553 format!(
554 "KeySync com problemas: taxa={:.1}%, pendentes={}",
555 key_sync_stats.success_rate * 100.0,
556 key_sync_stats.pending_messages
557 )
558 },
559 });
560 if !key_sync_healthy {
561 healthy = false;
562 }
563
564 let response_time = start.elapsed();
565
566 let message = if healthy {
567 "Backend híbrido operacional".to_string()
568 } else {
569 "Backend híbrido com problemas".to_string()
570 };
571
572 Ok(HealthStatus {
573 healthy,
574 message,
575 response_time_ms: response_time.as_millis() as u64,
576 checks,
577 })
578 }
579}
580
581impl LibP2PSwarm {
584 async fn new(key_sync: &KeySynchronizer, _config: &ClientConfig) -> Result<Self> {
586 use crate::p2p::manager::SwarmManager;
587 use tracing::Span;
588
589 debug!("Inicializando Swarm LibP2P com KeySynchronizer");
590
591 let peer_id = key_sync.peer_id();
592
593 let span = Span::current();
595 let keypair = key_sync.keypair().clone();
596
597 let mut swarm_manager = SwarmManager::new(span, keypair)
598 .map_err(|e| GuardianError::Other(format!("Erro ao criar SwarmManager: {}", e)))?;
599
600 swarm_manager
602 .start()
603 .await
604 .map_err(|e| GuardianError::Other(format!("Erro ao iniciar SwarmManager: {}", e)))?;
605
606 info!("SwarmManager LibP2P iniciado com PeerId: {}", peer_id);
607
608 Ok(Self {
609 peer_id,
610 is_online: Arc::new(RwLock::new(true)),
611 connected_peers: Arc::new(RwLock::new(Vec::new())),
612 swarm_manager: Some(Arc::new(swarm_manager)),
613 })
614 }
615
616 pub fn peer_id(&self) -> PeerId {
618 self.peer_id
619 }
620
621 async fn connect(&self, peer: &PeerId) -> Result<()> {
623 debug!("Conectando ao peer {} via LibP2P SwarmManager", peer);
624
625 let peer_addresses = self.find_peer(peer).await;
627
628 if peer_addresses.is_empty() {
629 return Err(GuardianError::Other(format!(
630 "Não foi possível descobrir endereços para peer {}",
631 peer
632 )));
633 }
634
635 debug!(
636 "Descobertos {} endereços para peer {}",
637 peer_addresses.len(),
638 peer
639 );
640
641 let connection_successful = if let Some(ref swarm_manager) = self.swarm_manager {
643 match self
644 .attempt_connection(swarm_manager, peer, &peer_addresses)
645 .await
646 {
647 Ok(()) => {
648 info!("Conexão estabelecida com peer {} via SwarmManager", peer);
649 true
650 }
651 Err(e) => {
652 warn!("Falha na conexão com peer {}: {}", peer, e);
653 false
654 }
655 }
656 } else {
657 false
658 };
659
660 {
662 let mut peers = self.connected_peers.write().await;
663 if let Some(existing_peer) = peers.iter_mut().find(|p| &p.id == peer) {
664 existing_peer.addresses = peer_addresses;
665 existing_peer.connected = connection_successful;
666 } else {
667 peers.push(PeerInfo {
668 id: *peer,
669 addresses: peer_addresses,
670 protocols: vec![
671 "gossipsub/1.1.0".to_string(),
672 "/libp2p/circuit/relay/0.2.0/hop".to_string(),
673 "/ipfs/id/1.0.0".to_string(),
674 ],
675 connected: connection_successful,
676 });
677 }
678 }
679
680 {
682 let mut is_online = self.is_online.write().await;
683 *is_online = true;
684 }
685
686 if connection_successful {
687 info!("Peer {} conectado com sucesso via LibP2P", peer);
688 Ok(())
689 } else {
690 Err(GuardianError::Other(format!(
691 "Falha ao conectar com peer {}",
692 peer
693 )))
694 }
695 }
696
697 async fn attempt_connection(
699 &self,
700 swarm_manager: &Arc<crate::p2p::manager::SwarmManager>,
701 peer: &PeerId,
702 addresses: &[String],
703 ) -> Result<()> {
704 debug!(
705 "Executando conexão com peer {} usando {} endereços",
706 peer,
707 addresses.len()
708 );
709
710 let direct_channel_result = self.connect_via_direct_channel(swarm_manager, peer).await;
712
713 let iroh_connection_result = if direct_channel_result.is_err() {
715 debug!("Conexão DirectChannel falhou, tentando via backend Iroh");
716 self.connect_via_iroh_backend(peer, addresses).await
717 } else {
718 debug!("Conexão DirectChannel bem-sucedida para peer {}", peer);
719 Ok(())
720 };
721
722 match (&direct_channel_result, &iroh_connection_result) {
724 (Ok(_), _) | (_, Ok(_)) => {
725 swarm_manager.notify_peer_connected(*peer).await;
726 info!("Conexão estabelecida com peer {} via sistema híbrido", peer);
727 Ok(())
728 }
729 (Err(dc_err), Err(iroh_err)) => {
730 debug!(
731 "Ambas as tentativas de conexão falharam - DirectChannel: {}, Iroh: {}",
732 dc_err, iroh_err
733 );
734 swarm_manager.notify_peer_disconnected(*peer).await;
735 Err(GuardianError::Other(format!(
736 "Falha na conexão híbrida com peer {}: DirectChannel={}, Iroh={}",
737 peer, dc_err, iroh_err
738 )))
739 }
740 }
741 }
742
743 async fn connect_via_direct_channel(
745 &self,
746 swarm_manager: &Arc<crate::p2p::manager::SwarmManager>,
747 peer: &PeerId,
748 ) -> Result<()> {
749 debug!("Conectando via DirectChannel para peer {}", peer);
750
751 if self.is_peer_already_connected_in_swarm(peer).await {
753 debug!("Peer {} já conectado via DirectChannel", peer);
754 return Ok(());
755 }
756
757 match self
759 .establish_direct_channel_connection(swarm_manager, peer)
760 .await
761 {
762 Ok(()) => {
763 debug!("Conexão DirectChannel estabelecida com peer {}", peer);
764 swarm_manager.notify_peer_connected(*peer).await;
765 Ok(())
766 }
767 Err(e) => {
768 debug!("Falha na conexão DirectChannel com peer {}: {}", peer, e);
769 Err(GuardianError::Other(format!(
770 "Conexão DirectChannel falhou: {}",
771 e
772 )))
773 }
774 }
775 }
776
777 async fn establish_direct_channel_connection(
779 &self,
780 _swarm_manager: &Arc<crate::p2p::manager::SwarmManager>,
781 peer: &PeerId,
782 ) -> Result<()> {
783 debug!("Estabelecendo conexão DirectChannel para peer {}", peer);
784
785 let topic_string = format!("guardian-db-channel-{}", peer);
787 let _topic_hash = libp2p::gossipsub::TopicHash::from_raw(topic_string.clone());
788
789 debug!(
790 "Conectando para peer {} usando tópico {}",
791 peer, topic_string
792 );
793
794 match self.perform_direct_channel_handshake(peer).await {
796 Ok(()) => {
797 debug!("Handshake DirectChannel concluído para peer {}", peer);
798 Ok(())
799 }
800 Err(e) => {
801 debug!("Falha no handshake DirectChannel para peer {}: {}", peer, e);
802 Err(e)
803 }
804 }
805 }
806
807 async fn perform_direct_channel_handshake(&self, peer: &PeerId) -> Result<()> {
809 debug!("Executando handshake DirectChannel com peer {}", peer);
810
811 if self.is_peer_already_connected_in_swarm(peer).await {
813 debug!("Peer {} já conectado, handshake desnecessário", peer);
814 return Ok(());
815 }
816
817 let discovery_result = self.execute_peer_discovery(peer).await?;
819 if discovery_result.is_empty() {
820 return Err(GuardianError::Other(format!(
821 "Peer {} não descoberto via mDNS/Kademlia",
822 peer
823 )));
824 }
825
826 let tcp_connectivity = self
828 .verify_tcp_connectivity(peer, &discovery_result)
829 .await?;
830 if !tcp_connectivity {
831 return Err(GuardianError::Other(format!(
832 "Peer {} não acessível via TCP",
833 peer
834 )));
835 }
836
837 let topic_established = self.establish_gossipsub_topic(peer).await?;
839 if !topic_established {
840 return Err(GuardianError::Other(format!(
841 "Falha ao estabelecer tópico Gossipsub para peer {}",
842 peer
843 )));
844 }
845
846 debug!(
847 "Handshake DirectChannel concluído com sucesso para peer {}",
848 peer
849 );
850 Ok(())
851 }
852
853 async fn execute_peer_discovery(&self, peer: &PeerId) -> Result<Vec<String>> {
855 debug!("Executando descoberta mDNS/Kademlia para peer {}", peer);
856
857 let existing_addresses = {
859 let peers = self.connected_peers.read().await;
860 peers
861 .iter()
862 .find(|p| &p.id == peer)
863 .map(|p| p.addresses.clone())
864 .unwrap_or_default()
865 };
866
867 if !existing_addresses.is_empty() {
868 debug!(
869 "Peer {} já tem {} endereços descobertos",
870 peer,
871 existing_addresses.len()
872 );
873 return Ok(existing_addresses);
874 }
875
876 debug!("Executando nova descoberta para peer {}", peer);
878 let discovered_addresses = self
879 .discover_peer_via_network_search(peer)
880 .await
881 .into_iter()
882 .take(5) .collect::<Vec<_>>();
884 Ok(discovered_addresses)
885 }
886
887 async fn verify_tcp_connectivity(&self, peer: &PeerId, addresses: &[String]) -> Result<bool> {
889 debug!(
890 "Verificando conectividade TCP para peer {} em {} endereços",
891 peer,
892 addresses.len()
893 );
894
895 for address in addresses {
896 if let Ok(multiaddr) = address.parse::<libp2p::Multiaddr>()
897 && let Ok(()) = self.attempt_direct_multiaddr_connection(&multiaddr).await
898 {
899 debug!("Peer {} acessível via {}", peer, address);
900 return Ok(true);
901 }
902 }
903
904 debug!("Peer {} não acessível via TCP em nenhum endereço", peer);
905 Ok(false)
906 }
907
908 async fn establish_gossipsub_topic(&self, peer: &PeerId) -> Result<bool> {
910 debug!("Estabelecendo tópico Gossipsub para peer {}", peer);
911
912 let topic_string = format!("guardian-db-channel-{}", peer);
914 let topic_hash = libp2p::gossipsub::TopicHash::from_raw(topic_string.clone());
915
916 debug!(
917 "Tentando subscrever ao tópico: {} -> {:?}",
918 topic_string, topic_hash
919 );
920
921 if let Some(ref swarm_manager) = self.swarm_manager {
923 match swarm_manager.subscribe_topic(&topic_hash).await {
924 Ok(()) => {
925 debug!(
926 "Subscrito com sucesso ao tópico Gossipsub: {}",
927 topic_string
928 );
929
930 let subscription_confirmed = self.verify_topic_subscription(&topic_hash).await;
932 if subscription_confirmed {
933 info!(
934 "Tópico Gossipsub estabelecido e confirmado para peer {}: {}",
935 peer, topic_string
936 );
937 Ok(true)
938 } else {
939 warn!(
940 "Tópico subscrito mas não confirmado para peer {}: {}",
941 peer, topic_string
942 );
943 Ok(false)
944 }
945 }
946 Err(e) => {
947 debug!(
948 "Falha ao subscrever ao tópico Gossipsub {}: {}",
949 topic_string, e
950 );
951 Ok(false)
952 }
953 }
954 } else {
955 debug!("SwarmManager não disponível, não é possível estabelecer tópico Gossipsub");
956 Ok(false)
957 }
958 }
959
960 async fn verify_topic_subscription(&self, topic_hash: &libp2p::gossipsub::TopicHash) -> bool {
962 debug!("Verificando subscrição ao tópico: {:?}", topic_hash);
963
964 if let Some(ref swarm_manager) = self.swarm_manager {
965 let stats = swarm_manager.get_detailed_stats().await;
967 let subscribed_topics = stats.get("subscribed_topics").unwrap_or(&0);
968
969 debug!("SwarmManager tem {} tópicos subscritos", subscribed_topics);
970
971 *subscribed_topics > 0
974 } else {
975 debug!("SwarmManager não disponível para verificação de subscrição");
976 false
977 }
978 }
979
980 async fn connect_via_iroh_backend(&self, peer: &PeerId, addresses: &[String]) -> Result<()> {
982 debug!(
983 "Tentando conexão via backend Iroh para peer {} com {} endereços",
984 peer,
985 addresses.len()
986 );
987
988 match self.create_iroh_connection(peer, addresses).await {
990 Ok(()) => {
991 debug!("Conexão Iroh estabelecida com peer {}", peer);
992 Ok(())
993 }
994 Err(e) => {
995 debug!("Falha na conexão Iroh com peer {}: {}", peer, e);
996 Err(e)
997 }
998 }
999 }
1000
1001 async fn create_iroh_connection(&self, peer: &PeerId, addresses: &[String]) -> Result<()> {
1003 use iroh::{Endpoint, NodeId, SecretKey};
1004
1005 debug!("Criando conexão Iroh para peer {}", peer);
1006
1007 let secret_key = SecretKey::from_bytes(&[42u8; 32]);
1009 let endpoint = Endpoint::builder()
1010 .secret_key(secret_key)
1011 .bind()
1012 .await
1013 .map_err(|e| GuardianError::Other(format!("Erro ao criar endpoint: {}", e)))?;
1014
1015 let peer_bytes = peer.to_bytes();
1017 let mut node_id_bytes = [0u8; 32];
1018 let copy_len = std::cmp::min(peer_bytes.len(), 32);
1019 node_id_bytes[..copy_len].copy_from_slice(&peer_bytes[..copy_len]);
1020
1021 let node_id = NodeId::from_bytes(&node_id_bytes)
1022 .map_err(|e| GuardianError::Other(format!("Erro ao converter NodeId: {}", e)))?;
1023
1024 match self
1026 .attempt_iroh_discovery_and_connect(&endpoint, node_id, addresses)
1027 .await
1028 {
1029 Ok(()) => {
1030 debug!("Discovery e conexão Iroh bem-sucedida para {}", peer);
1031 endpoint.close().await;
1032 Ok(())
1033 }
1034 Err(e) => {
1035 debug!("Falha na discovery/conexão Iroh para {}: {}", peer, e);
1036 endpoint.close().await;
1037 Err(e)
1038 }
1039 }
1040 }
1041
1042 async fn attempt_iroh_discovery_and_connect(
1044 &self,
1045 endpoint: &iroh::Endpoint,
1046 node_id: iroh::NodeId,
1047 addresses: &[String],
1048 ) -> Result<()> {
1049 debug!("Executando descoberta Iroh para NodeId {}", node_id);
1050
1051 if let Some(discovery) = endpoint.discovery() {
1053 if let Some(mut stream) = discovery.resolve(node_id) {
1054 use futures_lite::StreamExt;
1055
1056 let discovery_future = async {
1058 while let Some(result) = stream.next().await {
1059 match result {
1060 Ok(_item) => {
1061 debug!("Peer {} descoberto via Iroh discovery", node_id);
1062 return Ok(());
1063 }
1064 Err(e) => {
1065 debug!("Erro na descoberta Iroh: {}", e);
1066 }
1067 }
1068 }
1069 Err(GuardianError::Other("Discovery timeout".to_string()))
1070 };
1071
1072 match tokio::time::timeout(std::time::Duration::from_secs(5), discovery_future)
1074 .await
1075 {
1076 Ok(Ok(())) => {
1077 debug!("Discovery Iroh concluída com sucesso");
1078 Ok(())
1079 }
1080 Ok(Err(e)) => {
1081 debug!("Erro no discovery Iroh: {}", e);
1082 Err(e)
1083 }
1084 Err(_) => {
1085 debug!("Timeout no discovery Iroh, usando endereços fornecidos");
1086 self.fallback_to_direct_addresses(addresses).await
1087 }
1088 }
1089 } else {
1090 debug!("Resolve não suportado, usando endereços diretos");
1091 self.fallback_to_direct_addresses(addresses).await
1092 }
1093 } else {
1094 debug!("Discovery service não disponível, usando endereços diretos");
1095 self.fallback_to_direct_addresses(addresses).await
1096 }
1097 }
1098
1099 async fn fallback_to_direct_addresses(&self, addresses: &[String]) -> Result<()> {
1101 debug!(
1102 "Conectando via endereços diretos: {} endereços",
1103 addresses.len()
1104 );
1105
1106 if addresses.is_empty() {
1107 return Err(GuardianError::Other(
1108 "Nenhum endereço disponível para conexão".to_string(),
1109 ));
1110 }
1111
1112 let mut connection_errors = Vec::new();
1113
1114 for address in addresses {
1115 debug!("Tentando conexão direta para endereço: {}", address);
1116
1117 if let Ok(multiaddr) = address.parse::<libp2p::Multiaddr>() {
1119 match self.attempt_direct_multiaddr_connection(&multiaddr).await {
1120 Ok(()) => {
1121 debug!("Conexão direta bem-sucedida para {}", address);
1122 return Ok(());
1123 }
1124 Err(e) => {
1125 debug!("Falha na conexão direta para {}: {}", address, e);
1126 connection_errors.push(format!("{}: {}", address, e));
1127 }
1128 }
1129 } else {
1130 debug!("Endereço inválido ignorado: {}", address);
1131 connection_errors.push(format!("{}: endereço inválido", address));
1132 }
1133 }
1134
1135 Err(GuardianError::Other(format!(
1136 "Todas as conexões diretas falharam: {:?}",
1137 connection_errors
1138 )))
1139 }
1140
1141 async fn attempt_direct_multiaddr_connection(
1143 &self,
1144 multiaddr: &libp2p::Multiaddr,
1145 ) -> Result<()> {
1146 use libp2p::multiaddr::Protocol;
1147
1148 debug!("Tentando conexão direta para multiaddr: {}", multiaddr);
1149
1150 let mut ip_addr = None;
1152 let mut port = None;
1153 let mut peer_id = None;
1154
1155 for component in multiaddr.iter() {
1156 match component {
1157 Protocol::Ip4(addr) => ip_addr = Some(addr.to_string()),
1158 Protocol::Ip6(addr) => ip_addr = Some(addr.to_string()),
1159 Protocol::Tcp(p) => port = Some(p),
1160 Protocol::P2p(p) => {
1161 if let Ok(decoded) = libp2p::PeerId::from_bytes(&p.to_bytes()) {
1162 peer_id = Some(decoded);
1163 }
1164 }
1165 _ => {}
1166 }
1167 }
1168
1169 if let (Some(addr), Some(port_num)) = (ip_addr, port) {
1170 debug!("Conectando TCP para {}:{}", addr, port_num);
1171
1172 match tokio::net::TcpStream::connect(format!("{}:{}", addr, port_num)).await {
1174 Ok(mut stream) => {
1175 debug!("Conexão TCP estabelecida para {}:{}", addr, port_num);
1176 let _ = stream.shutdown().await;
1177
1178 if let Some(pid) = peer_id {
1179 debug!("Peer {} acessível via TCP", pid);
1180 }
1181
1182 Ok(())
1183 }
1184 Err(e) => {
1185 debug!("Falha na conexão TCP para {}:{}: {}", addr, port_num, e);
1186 Err(GuardianError::Other(format!("Conexão TCP falhou: {}", e)))
1187 }
1188 }
1189 } else {
1190 Err(GuardianError::Other(
1191 "Multiaddr não contém IP/porta válidos".to_string(),
1192 ))
1193 }
1194 }
1195
1196 async fn is_peer_already_connected_in_swarm(&self, peer: &PeerId) -> bool {
1198 let peers = self.connected_peers.read().await;
1199 peers.iter().any(|p| &p.id == peer && p.connected)
1200 }
1201
1202 async fn peers(&self) -> Vec<PeerInfo> {
1204 let peers = self.connected_peers.read().await;
1205 peers.clone()
1206 }
1207
1208 async fn find_peer(&self, peer: &PeerId) -> Vec<String> {
1210 debug!("Procurando peer {} no DHT via LibP2P", peer);
1211
1212 let connected_addresses = {
1214 let peers = self.connected_peers.read().await;
1215 if let Some(peer_info) = peers.iter().find(|p| &p.id == peer)
1216 && !peer_info.addresses.is_empty()
1217 {
1218 debug!("Peer {} encontrado na lista de conectados", peer);
1219 return peer_info.addresses.clone();
1220 }
1221 Vec::new()
1222 };
1223
1224 if connected_addresses.is_empty() {
1226 debug!(
1227 "Peer {} não encontrado localmente, executando descoberta DHT",
1228 peer
1229 );
1230
1231 if let Some(ref swarm_manager) = self.swarm_manager {
1233 match self.perform_dht_lookup(swarm_manager, peer).await {
1234 Ok(addresses) if !addresses.is_empty() => {
1235 info!(
1236 "DHT descobriu {} endereços para peer {}",
1237 addresses.len(),
1238 peer
1239 );
1240 return addresses;
1241 }
1242 Ok(_) => {
1243 debug!("DHT não encontrou endereços para peer {}", peer);
1244 }
1245 Err(e) => {
1246 warn!("Erro na descoberta DHT para peer {}: {}", peer, e);
1247 }
1248 }
1249 }
1250
1251 self.discover_peer_via_network_search(peer).await
1253 } else {
1254 connected_addresses
1255 }
1256 }
1257
1258 async fn perform_dht_lookup(
1260 &self,
1261 _swarm_manager: &Arc<crate::p2p::manager::SwarmManager>,
1262 peer: &PeerId,
1263 ) -> Result<Vec<String>> {
1264 debug!("Executando busca DHT delegando para backend Iroh concreto");
1265
1266 match self.query_iroh_backend_directly(peer).await {
1268 Ok(addresses) if !addresses.is_empty() => {
1269 debug!(
1270 "Backend Iroh retornou {} endereços para peer {}",
1271 addresses.len(),
1272 peer
1273 );
1274 Ok(addresses)
1275 }
1276 Ok(_) => {
1277 debug!("Backend Iroh não encontrou endereços, consultando peers conectados");
1278 self.query_connected_peers_for_addresses(peer).await
1279 }
1280 Err(e) => {
1281 debug!(
1282 "Erro no backend Iroh: {}, usando fallback de peers conectados",
1283 e
1284 );
1285 self.query_connected_peers_for_addresses(peer).await
1286 }
1287 }
1288 }
1289
1290 async fn query_iroh_backend_directly(&self, peer: &PeerId) -> Result<Vec<String>> {
1292 debug!("Consultando backend Iroh diretamente para peer {}", peer);
1293
1294 match self.get_iroh_discovery_result(peer).await {
1295 Ok(addresses) => {
1296 debug!("Discovery Iroh encontrou {} endereços", addresses.len());
1297 Ok(addresses)
1298 }
1299 Err(e) => {
1300 debug!("Falha no discovery Iroh: {}", e);
1301 Ok(Vec::new()) }
1303 }
1304 }
1305
1306 async fn get_iroh_discovery_result(&self, peer: &PeerId) -> Result<Vec<String>> {
1308 let peer_str = peer.to_string();
1312 debug!("Executando lookup direto no sistema Iroh para {}", peer_str);
1313
1314 if let Ok(endpoint) = self.get_iroh_endpoint().await {
1316 match self.execute_iroh_peer_lookup(peer, &endpoint).await {
1318 Ok(addresses) => {
1319 debug!(
1320 "Lookup Iroh executado com sucesso: {} endereços",
1321 addresses.len()
1322 );
1323 Ok(addresses)
1324 }
1325 Err(e) => {
1326 debug!("Erro no lookup Iroh: {}", e);
1327 Ok(Vec::new())
1328 }
1329 }
1330 } else {
1331 debug!("Endpoint Iroh não disponível");
1332 Ok(Vec::new())
1333 }
1334 }
1335
1336 async fn get_iroh_endpoint(&self) -> Result<iroh::Endpoint> {
1338 use iroh::{Endpoint, SecretKey};
1342
1343 let secret_key = SecretKey::from_bytes(&[1u8; 32]); let endpoint = Endpoint::builder()
1346 .secret_key(secret_key)
1347 .bind()
1348 .await
1349 .map_err(|e| GuardianError::Other(format!("Erro ao criar endpoint Iroh: {}", e)))?;
1350
1351 debug!("Endpoint Iroh criado para lookup DHT");
1352 Ok(endpoint)
1353 }
1354
1355 async fn execute_iroh_peer_lookup(
1357 &self,
1358 peer: &PeerId,
1359 endpoint: &iroh::Endpoint,
1360 ) -> Result<Vec<String>> {
1361 use iroh::NodeId;
1362
1363 debug!(
1364 "Executando lookup direto de peer {} via endpoint Iroh",
1365 peer
1366 );
1367
1368 let peer_bytes = peer.to_bytes();
1370
1371 let mut node_id_bytes = [0u8; 32];
1373 let copy_len = std::cmp::min(peer_bytes.len(), 32);
1374 node_id_bytes[..copy_len].copy_from_slice(&peer_bytes[..copy_len]);
1375
1376 let node_id = NodeId::from_bytes(&node_id_bytes)
1377 .map_err(|e| GuardianError::Other(format!("Erro ao converter PeerId: {}", e)))?;
1378
1379 let mut discovered_addresses = Vec::new();
1381
1382 if endpoint.node_id() == node_id {
1384 debug!("Peer {} é o nó local", peer);
1385 discovered_addresses.extend([
1386 format!("/ip4/127.0.0.1/tcp/4001/p2p/{}", peer),
1387 format!("/ip6/::1/tcp/4001/p2p/{}", peer),
1388 ]);
1389 return Ok(discovered_addresses);
1390 }
1391
1392 match self.discover_peer_via_iroh_network(node_id, endpoint).await {
1394 Ok(addrs) => discovered_addresses.extend(addrs),
1395 Err(e) => debug!("Falha na descoberta via rede Iroh: {}", e),
1396 }
1397
1398 debug!(
1399 "Lookup Iroh concluído: {} endereços descobertos",
1400 discovered_addresses.len()
1401 );
1402 Ok(discovered_addresses)
1403 }
1404
1405 async fn discover_peer_via_iroh_network(
1407 &self,
1408 node_id: iroh::NodeId,
1409 _endpoint: &iroh::Endpoint,
1410 ) -> Result<Vec<String>> {
1411 debug!("Descobrindo peer {} via rede Iroh", node_id);
1412
1413 let mut network_addresses = Vec::new();
1414
1415 let local_subnets = ["192.168.1", "192.168.0", "10.0.0", "172.16.0"];
1417 let common_ports = [4001, 9090, 11204]; for subnet in &local_subnets {
1420 for host in [1, 100, 254] {
1421 for port in &common_ports {
1422 network_addresses.push(format!(
1423 "/ip4/{}.{}/tcp/{}/p2p/{}",
1424 subnet, host, port, node_id
1425 ));
1426 }
1427 }
1428 }
1429
1430 network_addresses.truncate(12);
1432
1433 debug!(
1434 "Gerados {} endereços candidatos via rede Iroh",
1435 network_addresses.len()
1436 );
1437 Ok(network_addresses)
1438 }
1439
1440 async fn query_connected_peers_for_addresses(&self, peer: &PeerId) -> Result<Vec<String>> {
1442 debug!("Consultando peers conectados para endereços de {}", peer);
1443
1444 let connected_addresses = {
1445 let peers = self.connected_peers.read().await;
1446 peers
1447 .iter()
1448 .filter_map(|info| {
1449 if info.connected && &info.id == peer {
1450 Some(info.addresses.clone())
1451 } else {
1452 None
1453 }
1454 })
1455 .flatten()
1456 .collect::<Vec<String>>()
1457 };
1458
1459 debug!(
1460 "Encontrados {} endereços em peers conectados",
1461 connected_addresses.len()
1462 );
1463 Ok(connected_addresses)
1464 }
1465
1466 async fn discover_peer_via_network_search(&self, peer: &PeerId) -> Vec<String> {
1468 debug!("Executando busca na rede para peer {}", peer);
1469
1470 let mut discovered_addresses = Vec::new();
1472
1473 discovered_addresses.extend([
1475 format!("/ip4/127.0.0.1/tcp/4001/p2p/{}", peer),
1476 format!("/ip6/::1/tcp/4001/p2p/{}", peer),
1477 ]);
1478
1479 for subnet in ["192.168.1", "192.168.0", "10.0.0", "172.16.0"] {
1481 for host in [1, 100, 101, 254] {
1482 discovered_addresses
1483 .push(format!("/ip4/{}.{}/tcp/4001/p2p/{}", subnet, host, peer));
1484 }
1485 }
1486
1487 discovered_addresses.truncate(10);
1489
1490 info!(
1491 "Descoberta de rede gerou {} endereços candidatos para peer {}",
1492 discovered_addresses.len(),
1493 peer
1494 );
1495 discovered_addresses
1496 }
1497
1498 async fn metrics(&self) -> BackendMetrics {
1500 let connected_count = {
1501 let peers = self.connected_peers.read().await;
1502 peers.len()
1503 };
1504
1505 let is_online = *self.is_online.read().await;
1506
1507 BackendMetrics {
1509 ops_per_second: if is_online {
1510 connected_count as f64 * 0.5
1511 } else {
1512 0.0
1513 },
1514 avg_latency_ms: if connected_count > 0 {
1515 45.0 + (connected_count as f64 * 2.0)
1516 } else {
1517 0.0
1518 },
1519 total_operations: connected_count as u64 * 10, error_count: if is_online { 0 } else { 1 },
1521 memory_usage_bytes: (connected_count * 1024 * 50) as u64, }
1523 }
1524
1525 async fn health_check(&self) -> HealthStatus {
1527 let is_online = *self.is_online.read().await;
1528
1529 HealthStatus {
1530 healthy: is_online,
1531 message: if is_online {
1532 "LibP2P swarm online".to_string()
1533 } else {
1534 "LibP2P swarm offline".to_string()
1535 },
1536 response_time_ms: 0,
1537 checks: vec![],
1538 }
1539 }
1540}