guardian_db/p2p/pubsub/
direct_channel.rs

1use crate::error::{GuardianError, Result};
2use crate::p2p::manager::SwarmManager;
3use crate::p2p::pubsub::{HEARTBEAT_INTERVAL, MAX_MESSAGE_SIZE, PROTOCOL};
4use crate::traits::{
5    DirectChannelEmitter, DirectChannelFactory, DirectChannelOptions, EventPubSubPayload,
6};
7use async_trait::async_trait;
8use futures;
9use libp2p::{
10    PeerId,
11    gossipsub::{Message, TopicHash},
12    identity::Keypair,
13};
14use serde::{Deserialize, Serialize};
15use std::{
16    collections::HashMap,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20use tokio::sync::{Mutex, RwLock, mpsc};
21use tracing::Span;
22
23// Mensagens do protocolo direct channel
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct DirectChannelMessage {
26    pub message_type: MessageType,
27    pub payload: Vec<u8>,
28    pub timestamp: u64,
29    pub sender: String, // PeerId as string
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub enum MessageType {
34    Data,
35    Heartbeat,
36    Ack,
37}
38
39pub trait DirectChannelNetwork: Send + Sync {
40    fn publish_message(&self, topic: &TopicHash, message: &[u8]) -> Result<()>;
41    fn subscribe_topic(&self, topic: &TopicHash) -> Result<()>;
42    fn get_connected_peers(&self) -> Vec<PeerId>;
43    fn get_topic_peers(&self, topic: &TopicHash) -> Vec<PeerId>;
44}
45
46// Implementação do DirectChannelNetwork usando SwarmManager
47pub struct SwarmBridge {
48    span: Span,
49    swarm_manager: Arc<Mutex<SwarmManager>>,
50    connected_peers: Arc<RwLock<Vec<PeerId>>>,
51    topic_peers: Arc<RwLock<HashMap<TopicHash, Vec<PeerId>>>>,
52    subscribed_topics: Arc<RwLock<HashMap<TopicHash, bool>>>,
53}
54
55impl SwarmBridge {
56    pub async fn new(span: Span) -> Result<Self> {
57        let keypair = Keypair::generate_ed25519();
58        let swarm_manager = SwarmManager::new(span.clone(), keypair)?;
59
60        Ok(Self {
61            span,
62            swarm_manager: Arc::new(Mutex::new(swarm_manager)),
63            connected_peers: Arc::new(RwLock::new(Vec::new())),
64            topic_peers: Arc::new(RwLock::new(HashMap::new())),
65            subscribed_topics: Arc::new(RwLock::new(HashMap::new())),
66        })
67    }
68
69    /// Retorna uma referência ao span span para instrumentação
70    pub fn span(&self) -> &Span {
71        &self.span
72    }
73
74    pub async fn start(&self) -> Result<()> {
75        let _entered = self.span.enter();
76        let mut manager = self.swarm_manager.lock().await;
77        manager.start().await?;
78        tracing::info!("SwarmBridge iniciada com SwarmManager");
79        Ok(())
80    }
81
82    /// Atualiza a lista de peers conectados (chamado pelo SwarmManager)
83    pub async fn update_connected_peers(&self, peers: Vec<PeerId>) {
84        let _entered = self.span.enter();
85        let mut connected = self.connected_peers.write().await;
86        *connected = peers.clone();
87
88        // Notifica o SwarmManager sobre novos peers
89        let manager = self.swarm_manager.lock().await;
90        for peer in peers {
91            manager.notify_peer_connected(peer).await;
92        }
93
94        tracing::debug!(
95            "Peers conectados atualizados pelo SwarmManager: {}",
96            connected.len()
97        );
98    }
99
100    /// Atualiza peers de um tópico específico (chamado pelo SwarmManager)
101    pub async fn update_topic_peers(&self, topic: TopicHash, peers: Vec<PeerId>) {
102        let mut topic_peers = self.topic_peers.write().await;
103        topic_peers.insert(topic.clone(), peers.clone());
104
105        // Atualiza no SwarmManager
106        let manager = self.swarm_manager.lock().await;
107        manager
108            .update_topic_peers(topic.clone(), peers.clone())
109            .await;
110
111        tracing::debug!(
112            "Peers do tópico {:?} atualizados pelo SwarmManager: {}",
113            topic,
114            peers.len()
115        );
116    }
117
118    /// Publicação de mensagem integrada com o SwarmManager
119    async fn publish(&self, topic: &TopicHash, message: &[u8]) -> Result<()> {
120        let manager = self.swarm_manager.lock().await;
121        manager.publish_message(topic, message).await?;
122        tracing::debug!(
123            "Mensagem publicada pelo SwarmManager no tópico: {:?}",
124            topic
125        );
126        Ok(())
127    }
128
129    pub async fn stop(&self) -> Result<()> {
130        let manager = self.swarm_manager.lock().await;
131        manager.stop().await?;
132        tracing::info!("SwarmBridge parada");
133        Ok(())
134    }
135
136    /// Obtém estatísticas essenciais da interface
137    pub async fn get_interface_stats(&self) -> HashMap<String, u64> {
138        let manager = self.swarm_manager.lock().await;
139        let mut stats = manager.get_detailed_stats().await;
140
141        // Adiciona estatísticas específicas da interface
142        let connected = self.connected_peers.read().await;
143        stats.insert(
144            "interface_connected_peers".to_string(),
145            connected.len() as u64,
146        );
147
148        let topics = self.topic_peers.read().await;
149        stats.insert("interface_tracked_topics".to_string(), topics.len() as u64);
150
151        stats
152    }
153}
154
155impl DirectChannelNetwork for SwarmBridge {
156    fn publish_message(&self, topic: &TopicHash, message: &[u8]) -> Result<()> {
157        tracing::debug!(
158            "Publicando mensagem no tópico: {:?}, {} bytes",
159            topic,
160            message.len()
161        );
162
163        // Verifica se o tópico está inscrito
164        let subscribed = {
165            let topics = futures::executor::block_on(self.subscribed_topics.read());
166            topics.get(topic).copied().unwrap_or(false)
167        };
168
169        if !subscribed {
170            return Err(GuardianError::Other(format!(
171                "Tópico {:?} não está inscrito",
172                topic
173            )));
174        }
175
176        // Usa publicação integrada com SwarmManager
177        futures::executor::block_on(self.publish(topic, message))?;
178
179        tracing::info!(
180            "Mensagem publicada com sucesso no tópico via SwarmManager: {:?}",
181            topic
182        );
183        Ok(())
184    }
185
186    fn subscribe_topic(&self, topic: &TopicHash) -> Result<()> {
187        tracing::debug!("Inscrevendo no tópico: {:?}", topic);
188
189        // Marca o tópico como inscrito
190        let mut topics = futures::executor::block_on(self.subscribed_topics.write());
191        topics.insert(topic.clone(), true);
192
193        // Inicializa lista de peers para o tópico
194        let mut topic_peers = futures::executor::block_on(self.topic_peers.write());
195        if !topic_peers.contains_key(topic) {
196            topic_peers.insert(topic.clone(), Vec::new());
197        }
198
199        // Usa inscrição do SwarmManager
200        let manager = futures::executor::block_on(self.swarm_manager.lock());
201        futures::executor::block_on(manager.subscribe_topic(topic))?;
202
203        tracing::info!(
204            "Inscrição realizada com sucesso no tópico via SwarmManager: {:?}",
205            topic
206        );
207        Ok(())
208    }
209
210    fn get_connected_peers(&self) -> Vec<PeerId> {
211        let peers = futures::executor::block_on(self.connected_peers.read());
212        let peer_list = peers.clone();
213        tracing::debug!(
214            "Retornando {} peers conectados via SwarmManager",
215            peer_list.len()
216        );
217        peer_list
218    }
219
220    fn get_topic_peers(&self, topic: &TopicHash) -> Vec<PeerId> {
221        tracing::debug!("Obtendo peers do tópico: {:?}", topic);
222
223        let topic_peers = futures::executor::block_on(self.topic_peers.read());
224        let peers = topic_peers.get(topic).cloned().unwrap_or_default();
225
226        tracing::debug!(
227            "Tópico {:?} tem {} peers conectados via SwarmManager",
228            topic,
229            peers.len()
230        );
231        peers
232    }
233}
234
235// Estado interno do DirectChannel
236#[derive(Debug, Clone)]
237struct ChannelState {
238    #[allow(dead_code)]
239    peer_id: PeerId,
240    topic: TopicHash,
241    connection_status: ConnectionStatus,
242    last_activity: Instant,
243    message_count: u64,
244    last_heartbeat: Instant,
245}
246
247#[derive(Debug, Clone)]
248enum ConnectionStatus {
249    Disconnected,
250    Connecting,
251    Connected,
252    #[allow(dead_code)]
253    Error(String),
254}
255
256// Eventos internos do DirectChannel
257#[derive(Debug)]
258enum DirectChannelEvent {
259    PeerConnected(PeerId),
260    PeerDisconnected(PeerId),
261    MessageReceived {
262        peer: PeerId,
263        payload: Vec<u8>,
264    },
265    MessageSent {
266        peer: PeerId,
267        success: bool,
268        error: Option<String>,
269    },
270    HeartbeatReceived(PeerId),
271    HeartbeatTimeout(PeerId),
272}
273
274pub struct DirectChannel {
275    span: Span,
276    libp2p: Arc<dyn DirectChannelNetwork>,
277    emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
278    channels: Arc<RwLock<HashMap<PeerId, ChannelState>>>,
279    event_sender: mpsc::UnboundedSender<DirectChannelEvent>,
280    _event_receiver: Arc<Mutex<Option<mpsc::UnboundedReceiver<DirectChannelEvent>>>>,
281    own_peer_id: PeerId,
282    running: Arc<Mutex<bool>>,
283}
284
285impl DirectChannel {
286    // Construtor público
287    pub fn new(
288        span: Span,
289        libp2p: Arc<dyn DirectChannelNetwork>,
290        emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
291        own_peer_id: PeerId,
292    ) -> Self {
293        let (event_sender, event_receiver) = mpsc::unbounded_channel();
294
295        Self {
296            span,
297            libp2p,
298            emitter,
299            channels: Arc::new(RwLock::new(HashMap::new())),
300            event_sender,
301            _event_receiver: Arc::new(Mutex::new(Some(event_receiver))),
302            own_peer_id,
303            running: Arc::new(Mutex::new(false)),
304        }
305    }
306
307    // Gera o tópico único para comunicação com um peer específico
308    fn get_channel_topic(&self, peer: PeerId) -> TopicHash {
309        // Ordena os peer IDs para garantir o mesmo tópico em ambos os lados
310        let (first, second) = if self.own_peer_id < peer {
311            (self.own_peer_id, peer)
312        } else {
313            (peer, self.own_peer_id)
314        };
315        let topic_string = format!("{}/channel/{}/{}", PROTOCOL, first, second);
316        TopicHash::from_raw(topic_string)
317    }
318
319    // Inicia o processamento de eventos
320    pub async fn start(&self) -> Result<()> {
321        let mut running = self.running.lock().await;
322        if *running {
323            return Ok(());
324        }
325        *running = true;
326
327        let mut receiver = self
328            ._event_receiver
329            .lock()
330            .await
331            .take()
332            .ok_or_else(|| GuardianError::Other("Event receiver already taken".to_string()))?;
333
334        let emitter = self.emitter.clone();
335        let span = self.span.clone();
336        let channels = self.channels.clone();
337        let running_flag = self.running.clone();
338
339        tokio::spawn(async move {
340            while let Some(event) = receiver.recv().await {
341                let running = *running_flag.lock().await;
342                if !running {
343                    break;
344                }
345
346                if let Err(e) = Self::handle_event(event, &emitter, &span, &channels).await {
347                    tracing::error!("Erro ao processar evento: {}", e);
348                }
349            }
350            tracing::info!("Event processing loop terminated");
351        });
352
353        // Inicia o heartbeat loop
354        self.start_heartbeat_loop().await;
355
356        Ok(())
357    }
358
359    // Inicia o loop de heartbeat para manter conexões ativas
360    async fn start_heartbeat_loop(&self) {
361        let channels = self.channels.clone();
362        let event_sender = self.event_sender.clone();
363        let span = self.span.clone();
364        let running_flag = self.running.clone();
365        let libp2p = self.libp2p.clone();
366
367        tokio::spawn(async move {
368            let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
369
370            loop {
371                interval.tick().await;
372
373                let running = *running_flag.lock().await;
374                if !running {
375                    break;
376                }
377
378                let peers_to_heartbeat: Vec<(PeerId, TopicHash)> = {
379                    let channels_map = channels.read().await;
380                    channels_map
381                        .iter()
382                        .filter_map(|(peer_id, state)| {
383                            match state.connection_status {
384                                ConnectionStatus::Connected => {
385                                    // Verifica se precisa de heartbeat
386                                    if state.last_heartbeat.elapsed() > HEARTBEAT_INTERVAL {
387                                        Some((*peer_id, state.topic.clone()))
388                                    } else {
389                                        None
390                                    }
391                                }
392                                _ => None,
393                            }
394                        })
395                        .collect()
396                };
397
398                for (peer, topic) in peers_to_heartbeat {
399                    // Envia heartbeat
400                    if let Err(e) = Self::send_heartbeat(&libp2p, &topic, &span).await {
401                        tracing::warn!("Falha ao enviar heartbeat para {}: {}", peer, e);
402                        let _ = event_sender.send(DirectChannelEvent::HeartbeatTimeout(peer));
403                    } else {
404                        tracing::trace!(peer = %peer, "Heartbeat enviado para peer");
405                    }
406                }
407
408                // Verifica peers em estado de erro e tenta reconectar
409                let peers_to_reconnect: Vec<PeerId> = {
410                    let channels_map = channels.read().await;
411                    channels_map
412                        .iter()
413                        .filter_map(|(peer_id, state)| {
414                            match &state.connection_status {
415                                ConnectionStatus::Error(err) => {
416                                    // Tenta reconectar após 30 segundos em erro
417                                    if state.last_activity.elapsed() > Duration::from_secs(30) {
418                                        tracing::debug!(
419                                            "Tentando reconexão com peer {} após erro: {}",
420                                            peer_id,
421                                            err
422                                        );
423                                        Some(*peer_id)
424                                    } else {
425                                        None
426                                    }
427                                }
428                                ConnectionStatus::Disconnected => {
429                                    // Tenta reconectar peers desconectados após 60 segundos
430                                    if state.last_activity.elapsed() > Duration::from_secs(60) {
431                                        tracing::debug!(
432                                            "Tentando reconexão com peer desconectado: {}",
433                                            peer_id
434                                        );
435                                        Some(*peer_id)
436                                    } else {
437                                        None
438                                    }
439                                }
440                                _ => None,
441                            }
442                        })
443                        .collect()
444                };
445
446                // Atualiza estado para "Connecting" e tenta reconectar
447                for peer in peers_to_reconnect {
448                    let mut channels_map = channels.write().await;
449                    if let Some(state) = channels_map.get_mut(&peer) {
450                        state.connection_status = ConnectionStatus::Connecting;
451                        state.last_activity = Instant::now();
452
453                        // Tenta reconectar (beacon de descoberta)
454                        if let Err(e) = Self::send_heartbeat(&libp2p, &state.topic, &span).await {
455                            tracing::warn!("Falha na tentativa de reconexão com {}: {}", peer, e);
456                        } else {
457                            tracing::info!("Tentativa de reconexão iniciada para peer: {}", peer);
458                        }
459                    }
460                }
461            }
462        });
463    }
464
465    // Envia um heartbeat para um tópico específico
466    async fn send_heartbeat(
467        libp2p: &Arc<dyn DirectChannelNetwork>,
468        topic: &TopicHash,
469        _span: &Span,
470    ) -> Result<()> {
471        let heartbeat_msg = DirectChannelMessage {
472            message_type: MessageType::Heartbeat,
473            payload: vec![],
474            timestamp: std::time::SystemTime::now()
475                .duration_since(std::time::UNIX_EPOCH)
476                .unwrap_or_default()
477                .as_secs(),
478            sender: "heartbeat".to_string(),
479        };
480
481        let serialized = serde_cbor::to_vec(&heartbeat_msg)
482            .map_err(|e| GuardianError::Other(format!("Erro de serialização heartbeat: {}", e)))?;
483
484        libp2p.publish_message(topic, &serialized)?;
485        tracing::trace!(topic = ?topic, "Heartbeat enviado no tópico");
486        Ok(())
487    }
488
489    // Processa eventos internos
490    async fn handle_event(
491        event: DirectChannelEvent,
492        emitter: &Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
493        _span: &Span,
494        channels: &Arc<RwLock<HashMap<PeerId, ChannelState>>>,
495    ) -> Result<()> {
496        match event {
497            DirectChannelEvent::MessageReceived { peer, payload } => {
498                tracing::debug!("Mensagem recebida de {}: {} bytes", peer, payload.len());
499
500                // Valida tamanho da mensagem
501                if payload.len() > MAX_MESSAGE_SIZE {
502                    tracing::warn!("Mensagem muito grande de {}: {} bytes", peer, payload.len());
503                    return Ok(());
504                }
505
506                // Atualiza atividade do canal
507                {
508                    let mut channels_map = channels.write().await;
509                    if let Some(state) = channels_map.get_mut(&peer) {
510                        state.last_activity = Instant::now();
511                        state.message_count += 1;
512                    }
513                }
514
515                let event_payload = EventPubSubPayload { payload, peer };
516                emitter
517                    .emit(event_payload)
518                    .await
519                    .map_err(|e| GuardianError::Other(format!("Erro ao emitir evento: {}", e)))?;
520            }
521            DirectChannelEvent::PeerConnected(peer) => {
522                tracing::info!("Peer conectado: {}", peer);
523                let mut channels_map = channels.write().await;
524                if let Some(state) = channels_map.get_mut(&peer) {
525                    state.connection_status = ConnectionStatus::Connected;
526                    state.last_activity = Instant::now();
527                    state.last_heartbeat = Instant::now();
528                }
529            }
530            DirectChannelEvent::PeerDisconnected(peer) => {
531                tracing::info!("Peer desconectado: {}", peer);
532                let mut channels_map = channels.write().await;
533                if let Some(state) = channels_map.get_mut(&peer) {
534                    state.connection_status = ConnectionStatus::Disconnected;
535                }
536            }
537            DirectChannelEvent::MessageSent {
538                peer,
539                success,
540                error,
541            } => {
542                if success {
543                    tracing::debug!("Mensagem enviada com sucesso para: {}", peer);
544                } else {
545                    tracing::warn!("Falha ao enviar mensagem para {}: {:?}", peer, error);
546                }
547            }
548            DirectChannelEvent::HeartbeatReceived(peer) => {
549                tracing::trace!(peer = %peer, "Heartbeat recebido de");
550                let mut channels_map = channels.write().await;
551                if let Some(state) = channels_map.get_mut(&peer) {
552                    state.last_activity = Instant::now();
553                    state.last_heartbeat = Instant::now();
554                }
555            }
556            DirectChannelEvent::HeartbeatTimeout(peer) => {
557                tracing::warn!("Timeout de heartbeat para peer: {}", peer);
558                let mut channels_map = channels.write().await;
559                if let Some(state) = channels_map.get_mut(&peer) {
560                    state.connection_status =
561                        ConnectionStatus::Error("Heartbeat timeout".to_string());
562                }
563            }
564        }
565        Ok(())
566    }
567
568    // Envia dados para um peer específico
569    pub async fn send_data(&self, peer: PeerId, payload: Vec<u8>) -> Result<()> {
570        if payload.len() > MAX_MESSAGE_SIZE {
571            return Err(GuardianError::Other(format!(
572                "Mensagem muito grande: {} bytes (máximo: {})",
573                payload.len(),
574                MAX_MESSAGE_SIZE
575            )));
576        }
577
578        let topic = self.get_channel_topic(peer);
579        let message = DirectChannelMessage {
580            message_type: MessageType::Data,
581            payload,
582            timestamp: std::time::SystemTime::now()
583                .duration_since(std::time::UNIX_EPOCH)
584                .unwrap_or_default()
585                .as_secs(),
586            sender: self.own_peer_id.to_string(),
587        };
588
589        let serialized = serde_cbor::to_vec(&message)
590            .map_err(|e| GuardianError::Other(format!("Erro de serialização: {}", e)))?;
591
592        match self.libp2p.publish_message(&topic, &serialized) {
593            Ok(()) => {
594                let _ = self.event_sender.send(DirectChannelEvent::MessageSent {
595                    peer,
596                    success: true,
597                    error: None,
598                });
599                tracing::debug!(
600                    "Dados enviados para {}: {} bytes",
601                    peer,
602                    message.payload.len()
603                );
604                Ok(())
605            }
606            Err(e) => {
607                let error_msg = format!("Erro ao publicar mensagem: {}", e);
608                let _ = self.event_sender.send(DirectChannelEvent::MessageSent {
609                    peer,
610                    success: false,
611                    error: Some(error_msg.clone()),
612                });
613                Err(GuardianError::Other(error_msg))
614            }
615        }
616    }
617
618    // Conecta a um peer específico
619    pub async fn connect_to_peer(&self, peer: PeerId) -> Result<()> {
620        let topic = self.get_channel_topic(peer);
621        let mut channels_map = self.channels.write().await;
622
623        if let Some(state) = channels_map.get(&peer) {
624            match state.connection_status {
625                ConnectionStatus::Connected => {
626                    tracing::debug!("Já conectado ao peer: {}", peer);
627                    return Ok(());
628                }
629                ConnectionStatus::Connecting => {
630                    tracing::debug!("Conexão em andamento com peer: {}", peer);
631                    return Ok(());
632                }
633                _ => {}
634            }
635        }
636        // Inscreve no tópico
637        self.libp2p.subscribe_topic(&topic)?;
638        // Adiciona ou atualiza o estado do canal
639        channels_map.insert(
640            peer,
641            ChannelState {
642                peer_id: peer,
643                topic: topic.clone(),
644                connection_status: ConnectionStatus::Connecting,
645                last_activity: Instant::now(),
646                message_count: 0,
647                last_heartbeat: Instant::now(),
648            },
649        );
650        tracing::info!("Conectando ao peer {} no tópico: {:?}", peer, topic);
651        self.establish_peer_connection(peer, topic.clone()).await?;
652        Ok(())
653    }
654
655    // Estabelece conexão com um peer específico
656    async fn establish_peer_connection(&self, peer: PeerId, topic: TopicHash) -> Result<()> {
657        tracing::debug!("Estabelecendo conexão com peer: {}", peer);
658
659        // 1. Verifica se o peer já está nos peers conectados do libp2p
660        let connected_peers = self.libp2p.get_connected_peers();
661        let is_peer_connected = connected_peers.contains(&peer);
662
663        if is_peer_connected {
664            tracing::debug!("Peer {} já está conectado globalmente", peer);
665            // Envia evento de conexão estabelecida
666            let _ = self
667                .event_sender
668                .send(DirectChannelEvent::PeerConnected(peer));
669            return Ok(());
670        }
671
672        // 2. Aguarda um tempo para descoberta de peers no tópico
673        let discovery_timeout = Duration::from_secs(10);
674        let start_time = Instant::now();
675
676        while start_time.elapsed() < discovery_timeout {
677            // Verifica peers do tópico específico
678            let topic_peers = self.libp2p.get_topic_peers(&topic);
679
680            if topic_peers.contains(&peer) {
681                tracing::info!("Peer {} descoberto no tópico: {:?}", peer, topic);
682
683                // Envia mensagem de handshake para verificar conectividade
684                if self.send_handshake_message(&topic, peer).await.is_ok() {
685                    tracing::info!("Handshake bem-sucedido com peer: {}", peer);
686                    let _ = self
687                        .event_sender
688                        .send(DirectChannelEvent::PeerConnected(peer));
689                    return Ok(());
690                }
691            }
692
693            // Verifica novamente peers globais (podem ter sido descobertos via mDNS/Kademlia)
694            let updated_peers = self.libp2p.get_connected_peers();
695            if updated_peers.contains(&peer) {
696                tracing::info!("Peer {} conectado via discovery global", peer);
697                let _ = self
698                    .event_sender
699                    .send(DirectChannelEvent::PeerConnected(peer));
700                return Ok(());
701            }
702
703            // Aguarda antes da próxima verificação
704            tokio::time::sleep(Duration::from_millis(500)).await;
705        }
706
707        // 3. Se não conseguiu conectar diretamente, tenta envio de beacon
708        tracing::warn!(
709            "Peer {} não encontrado diretamente, enviando beacon de descoberta",
710            peer
711        );
712        if let Err(e) = self.send_discovery_beacon(&topic, peer).await {
713            tracing::error!("Falha ao enviar beacon de descoberta para {}: {}", peer, e);
714
715            // Marca como erro de conexão mas não falha completamente
716            let mut channels_map = self.channels.write().await;
717            if let Some(state) = channels_map.get_mut(&peer) {
718                state.connection_status =
719                    ConnectionStatus::Error(format!("Discovery timeout: {}", e));
720            }
721
722            return Err(GuardianError::Other(format!(
723                "Timeout na descoberta do peer {} após {}s",
724                peer,
725                discovery_timeout.as_secs()
726            )));
727        }
728
729        // 4. Aguarda resposta ao beacon por mais um tempo limitado
730        let beacon_timeout = Duration::from_secs(5);
731        let beacon_start = Instant::now();
732
733        while beacon_start.elapsed() < beacon_timeout {
734            let topic_peers = self.libp2p.get_topic_peers(&topic);
735            if topic_peers.contains(&peer) {
736                tracing::info!("Peer {} respondeu ao beacon de descoberta", peer);
737                let _ = self
738                    .event_sender
739                    .send(DirectChannelEvent::PeerConnected(peer));
740                return Ok(());
741            }
742
743            tokio::time::sleep(Duration::from_millis(200)).await;
744        }
745
746        // 5. Conexão não estabelecida - mantém estado como "Connecting" para retry futuro
747        tracing::warn!(
748            "Conexão com peer {} não pôde ser estabelecida no momento",
749            peer
750        );
751        Ok(())
752    }
753
754    // Envia mensagem de handshake para verificar conectividade
755    async fn send_handshake_message(&self, topic: &TopicHash, target_peer: PeerId) -> Result<()> {
756        let handshake_msg = DirectChannelMessage {
757            message_type: MessageType::Ack, // Usa ACK como handshake
758            payload: format!("handshake:{}", self.own_peer_id).into_bytes(),
759            timestamp: std::time::SystemTime::now()
760                .duration_since(std::time::UNIX_EPOCH)
761                .unwrap_or_default()
762                .as_secs(),
763            sender: self.own_peer_id.to_string(),
764        };
765
766        let serialized = serde_cbor::to_vec(&handshake_msg)
767            .map_err(|e| GuardianError::Other(format!("Erro serialização handshake: {}", e)))?;
768
769        self.libp2p.publish_message(topic, &serialized)?;
770        tracing::debug!("Handshake enviado para peer: {}", target_peer);
771        Ok(())
772    }
773
774    // Envia beacon de descoberta para atrair peers
775    async fn send_discovery_beacon(&self, topic: &TopicHash, target_peer: PeerId) -> Result<()> {
776        let beacon_msg = DirectChannelMessage {
777            message_type: MessageType::Heartbeat, // Usa Heartbeat como beacon
778            payload: format!("discovery_beacon:{}:{}", self.own_peer_id, target_peer).into_bytes(),
779            timestamp: std::time::SystemTime::now()
780                .duration_since(std::time::UNIX_EPOCH)
781                .unwrap_or_default()
782                .as_secs(),
783            sender: self.own_peer_id.to_string(),
784        };
785
786        let serialized = serde_cbor::to_vec(&beacon_msg)
787            .map_err(|e| GuardianError::Other(format!("Erro serialização beacon: {}", e)))?;
788
789        self.libp2p.publish_message(topic, &serialized)?;
790        tracing::debug!("Discovery beacon enviado no tópico: {:?}", topic);
791        Ok(())
792    }
793
794    // Processa mensagem recebida do Gossipsub
795    pub async fn handle_gossipsub_message(&self, message: Message) -> Result<()> {
796        // Decodifica a mensagem
797        let decoded_msg: DirectChannelMessage = serde_cbor::from_slice(&message.data)
798            .map_err(|e| GuardianError::Other(format!("Erro ao decodificar mensagem: {}", e)))?;
799
800        let sender_peer = message
801            .source
802            .ok_or_else(|| GuardianError::Other("Mensagem sem remetente".to_string()))?;
803
804        match decoded_msg.message_type {
805            MessageType::Data => {
806                let _ = self.event_sender.send(DirectChannelEvent::MessageReceived {
807                    peer: sender_peer,
808                    payload: decoded_msg.payload,
809                });
810            }
811            MessageType::Heartbeat => {
812                // Verifica se é um discovery beacon
813                if let Ok(payload_str) = String::from_utf8(decoded_msg.payload.clone()) {
814                    if payload_str.starts_with("discovery_beacon:") {
815                        self.handle_discovery_beacon(sender_peer, payload_str)
816                            .await?;
817                    } else {
818                        let _ = self
819                            .event_sender
820                            .send(DirectChannelEvent::HeartbeatReceived(sender_peer));
821                    }
822                } else {
823                    let _ = self
824                        .event_sender
825                        .send(DirectChannelEvent::HeartbeatReceived(sender_peer));
826                }
827            }
828            MessageType::Ack => {
829                // Verifica se é um handshake
830                if let Ok(payload_str) = String::from_utf8(decoded_msg.payload.clone()) {
831                    if payload_str.starts_with("handshake:") {
832                        self.handle_handshake_response(sender_peer, payload_str)
833                            .await?;
834                    } else {
835                        tracing::trace!(sender_peer = %sender_peer, "ACK recebido de");
836                    }
837                } else {
838                    tracing::trace!(sender_peer = %sender_peer, "ACK recebido de");
839                }
840            }
841        }
842
843        Ok(())
844    }
845
846    // Processa beacon de descoberta recebido
847    async fn handle_discovery_beacon(
848        &self,
849        sender_peer: PeerId,
850        beacon_payload: String,
851    ) -> Result<()> {
852        tracing::debug!(
853            "Discovery beacon recebido de: {} - {}",
854            sender_peer,
855            beacon_payload
856        );
857
858        // Parse do beacon: "discovery_beacon:sender_peer:target_peer"
859        let parts: Vec<&str> = beacon_payload.split(':').collect();
860        if parts.len() >= 3 {
861            let _beacon_sender = parts[1]; // ID do remetente original
862            let beacon_target = parts[2];
863
864            // Verifica se somos o alvo do beacon
865            if beacon_target == self.own_peer_id.to_string() {
866                tracing::info!("Beacon de descoberta direcionado a nós de: {}", sender_peer);
867
868                // Responde com handshake se ainda não estamos conectados
869                let channels_map = self.channels.read().await;
870                if let Some(state) = channels_map.get(&sender_peer)
871                    && matches!(
872                        state.connection_status,
873                        ConnectionStatus::Connecting | ConnectionStatus::Disconnected
874                    )
875                {
876                    drop(channels_map); // Libera o lock
877
878                    // Responde ao beacon
879                    let topic = self.get_channel_topic(sender_peer);
880                    if let Err(e) = self.send_handshake_message(&topic, sender_peer).await {
881                        tracing::warn!("Falha ao responder beacon de {}: {}", sender_peer, e);
882                    } else {
883                        tracing::info!("Handshake de resposta enviado para: {}", sender_peer);
884                    }
885                }
886            }
887        }
888
889        Ok(())
890    }
891
892    // Processa resposta de handshake
893    async fn handle_handshake_response(
894        &self,
895        sender_peer: PeerId,
896        handshake_payload: String,
897    ) -> Result<()> {
898        tracing::debug!(
899            "Handshake recebido de: {} - {}",
900            sender_peer,
901            handshake_payload
902        );
903
904        // Parse do handshake: "handshake:peer_id"
905        let parts: Vec<&str> = handshake_payload.split(':').collect();
906        if parts.len() >= 2 {
907            let handshake_peer = parts[1];
908            tracing::info!(
909                "Handshake válido recebido de peer: {} (id: {})",
910                sender_peer,
911                handshake_peer
912            );
913
914            // Atualiza estado para conectado se ainda estava conectando
915            let mut channels_map = self.channels.write().await;
916            if let Some(state) = channels_map.get_mut(&sender_peer) {
917                match state.connection_status {
918                    ConnectionStatus::Connecting => {
919                        state.connection_status = ConnectionStatus::Connected;
920                        state.last_activity = Instant::now();
921                        state.last_heartbeat = Instant::now();
922
923                        // Notifica conexão estabelecida
924                        let _ = self
925                            .event_sender
926                            .send(DirectChannelEvent::PeerConnected(sender_peer));
927
928                        tracing::info!("Conexão estabelecida com peer: {}", sender_peer);
929                    }
930                    ConnectionStatus::Connected => {
931                        // Atualiza apenas timestamps
932                        state.last_activity = Instant::now();
933                        state.last_heartbeat = Instant::now();
934                        tracing::trace!("Handshake de manutenção recebido de: {}", sender_peer);
935                    }
936                    _ => {
937                        tracing::debug!(
938                            "Handshake recebido de peer em estado: {:?}",
939                            state.connection_status
940                        );
941                    }
942                }
943            }
944        }
945
946        Ok(())
947    }
948
949    // Para o DirectChannel
950    pub async fn stop(&self) -> Result<()> {
951        let mut running = self.running.lock().await;
952        *running = false;
953
954        // Desconecta todos os peers
955        let peers: Vec<PeerId> = {
956            let channels_map = self.channels.read().await;
957            channels_map.keys().cloned().collect()
958        };
959
960        for peer in peers {
961            let mut channels_map = self.channels.write().await;
962            if let Some(state) = channels_map.remove(&peer) {
963                tracing::info!("Peer removido: {} (tópico: {:?})", peer, state.topic);
964                let _ = self
965                    .event_sender
966                    .send(DirectChannelEvent::PeerDisconnected(peer));
967            }
968        }
969
970        tracing::info!("DirectChannel parado");
971        Ok(())
972    }
973
974    // Lista peers conectados
975    pub async fn list_connected_peers(&self) -> Vec<PeerId> {
976        let channels_map = self.channels.read().await;
977        channels_map
978            .iter()
979            .filter_map(|(peer_id, state)| match state.connection_status {
980                ConnectionStatus::Connected => Some(*peer_id),
981                _ => None,
982            })
983            .collect()
984    }
985
986    // Obter estatísticas do canal
987    pub async fn get_channel_stats(&self) -> HashMap<PeerId, (u64, Duration)> {
988        let channels_map = self.channels.read().await;
989        channels_map
990            .iter()
991            .map(|(peer_id, state)| {
992                (
993                    *peer_id,
994                    (state.message_count, state.last_activity.elapsed()),
995                )
996            })
997            .collect()
998    }
999
1000    /// Método interno unificado para fechamento
1001    async fn close_internal(&self) -> Result<()> {
1002        tracing::info!("Fechando DirectChannel...");
1003
1004        // Para o processamento
1005        self.stop().await?;
1006
1007        // Fecha o emitter
1008        if let Err(e) = self.emitter.close().await {
1009            tracing::warn!("Erro ao fechar emitter: {}", e);
1010        }
1011
1012        tracing::info!("DirectChannel fechado com sucesso");
1013        Ok(())
1014    }
1015}
1016
1017// Implementação do trait DirectChannel do traits.rs
1018#[async_trait]
1019impl crate::traits::DirectChannel for DirectChannel {
1020    type Error = GuardianError;
1021
1022    async fn connect(&mut self, peer: PeerId) -> std::result::Result<(), Self::Error> {
1023        tracing::info!("Conectando ao peer: {}", peer);
1024        self.connect_to_peer(peer).await
1025    }
1026
1027    async fn send(&mut self, peer: PeerId, data: Vec<u8>) -> std::result::Result<(), Self::Error> {
1028        tracing::debug!("Enviando {} bytes para {}", data.len(), peer);
1029        self.send_data(peer, data).await
1030    }
1031
1032    async fn close(&mut self) -> std::result::Result<(), Self::Error> {
1033        self.close_internal().await
1034    }
1035
1036    async fn close_shared(&self) -> std::result::Result<(), Self::Error> {
1037        self.close_internal().await
1038    }
1039
1040    fn as_any(&self) -> &dyn std::any::Any {
1041        self
1042    }
1043}
1044
1045pub struct HolderChannels {
1046    libp2p: Arc<dyn DirectChannelNetwork>,
1047    span: Span,
1048    own_peer_id: PeerId,
1049}
1050
1051impl HolderChannels {
1052    pub fn new(span: Span, libp2p: Arc<dyn DirectChannelNetwork>, own_peer_id: PeerId) -> Self {
1053        Self {
1054            libp2p,
1055            span,
1056            own_peer_id,
1057        }
1058    }
1059
1060    pub async fn new_channel(
1061        &self,
1062        emitter: Box<dyn DirectChannelEmitter<Error = GuardianError>>,
1063        opts: Option<DirectChannelOptions>,
1064    ) -> Result<Box<dyn crate::traits::DirectChannel<Error = GuardianError>>> {
1065        let resolved_opts = opts.unwrap_or_default();
1066        let span = resolved_opts.span.unwrap_or_else(|| self.span.clone());
1067
1068        let dc = DirectChannel::new(
1069            span.clone(),
1070            self.libp2p.clone(),
1071            Arc::from(emitter),
1072            self.own_peer_id,
1073        );
1074
1075        // Inicia o processamento
1076        dc.start().await?;
1077
1078        tracing::info!(protocol = PROTOCOL, "DirectChannel criado com protocolo");
1079
1080        Ok(Box::new(dc))
1081    }
1082}
1083
1084pub fn init_direct_channel_factory(span: Span, own_peer_id: PeerId) -> DirectChannelFactory {
1085    Arc::new(
1086        move |emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
1087              opts: Option<DirectChannelOptions>| {
1088            let span = span.clone();
1089            let own_peer_id = own_peer_id;
1090            Box::pin(async move {
1091                tracing::info!(
1092                    "Inicializando DirectChannel factory para peer: {}",
1093                    own_peer_id
1094                );
1095
1096                // Cria uma interface para libp2p usando SwarmBridge integrada
1097                let libp2p_interface = Arc::new(
1098                    create_unified_libp2p_interface(span.clone())
1099                        .await
1100                        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
1101                );
1102
1103                // Cria o holder para gerenciar o DirectChannel
1104                let holder = HolderChannels::new(span.clone(), libp2p_interface, own_peer_id);
1105
1106                // Converte Arc para Box para compatibilidade
1107                let emitter_box = Box::new(EmitterWrapper(emitter));
1108
1109                // Cria o canal direto
1110                let channel = holder
1111                    .new_channel(emitter_box, opts)
1112                    .await
1113                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1114
1115                Ok(Arc::from(channel)
1116                    as Arc<
1117                        dyn crate::traits::DirectChannel<Error = GuardianError>,
1118                    >)
1119            })
1120        },
1121    )
1122}
1123
1124// Wrapper simplificado para converter Arc<dyn DirectChannelEmitter> para Box<dyn DirectChannelEmitter>
1125struct EmitterWrapper(Arc<dyn DirectChannelEmitter<Error = GuardianError>>);
1126
1127#[async_trait]
1128impl DirectChannelEmitter for EmitterWrapper {
1129    type Error = GuardianError;
1130
1131    async fn emit(&self, payload: EventPubSubPayload) -> std::result::Result<(), Self::Error> {
1132        self.0.emit(payload).await
1133    }
1134
1135    async fn close(&self) -> std::result::Result<(), Self::Error> {
1136        self.0.close().await
1137    }
1138}
1139
1140// Função auxiliar para criar um DirectChannel com interface libp2p customizada
1141pub async fn create_direct_channel_with_libp2p(
1142    libp2p: Arc<dyn DirectChannelNetwork>,
1143    emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
1144    span: Span,
1145    own_peer_id: PeerId,
1146) -> Result<DirectChannel> {
1147    let channel = DirectChannel::new(span.clone(), libp2p, emitter, own_peer_id);
1148
1149    // Inicia o processamento
1150    channel.start().await?;
1151
1152    tracing::info!("DirectChannel criado com interface libp2p integrada");
1153    Ok(channel)
1154}
1155
1156// Configuração unificada da interface LibP2P
1157pub async fn create_unified_libp2p_interface(span: Span) -> Result<SwarmBridge> {
1158    let interface = SwarmBridge::new(span.clone()).await?;
1159
1160    // Inicia o SwarmManager
1161    interface.start().await?;
1162
1163    tracing::info!("Interface libp2p unificada inicializada com SwarmManager integrado");
1164    Ok(interface)
1165}
1166
1167// Função para criar um PeerId de teste
1168pub fn create_test_peer_id() -> PeerId {
1169    let keypair = libp2p::identity::Keypair::generate_ed25519();
1170    PeerId::from(keypair.public())
1171}