1use crate::error::{GuardianError, Result};
2use crate::p2p::manager::SwarmManager;
3use crate::p2p::pubsub::{HEARTBEAT_INTERVAL, MAX_MESSAGE_SIZE, PROTOCOL};
4use crate::traits::{
5 DirectChannelEmitter, DirectChannelFactory, DirectChannelOptions, EventPubSubPayload,
6};
7use async_trait::async_trait;
8use futures;
9use libp2p::{
10 PeerId,
11 gossipsub::{Message, TopicHash},
12 identity::Keypair,
13};
14use serde::{Deserialize, Serialize};
15use std::{
16 collections::HashMap,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20use tokio::sync::{Mutex, RwLock, mpsc};
21use tracing::Span;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct DirectChannelMessage {
26 pub message_type: MessageType,
27 pub payload: Vec<u8>,
28 pub timestamp: u64,
29 pub sender: String, }
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub enum MessageType {
34 Data,
35 Heartbeat,
36 Ack,
37}
38
39pub trait DirectChannelNetwork: Send + Sync {
40 fn publish_message(&self, topic: &TopicHash, message: &[u8]) -> Result<()>;
41 fn subscribe_topic(&self, topic: &TopicHash) -> Result<()>;
42 fn get_connected_peers(&self) -> Vec<PeerId>;
43 fn get_topic_peers(&self, topic: &TopicHash) -> Vec<PeerId>;
44}
45
46pub struct SwarmBridge {
48 span: Span,
49 swarm_manager: Arc<Mutex<SwarmManager>>,
50 connected_peers: Arc<RwLock<Vec<PeerId>>>,
51 topic_peers: Arc<RwLock<HashMap<TopicHash, Vec<PeerId>>>>,
52 subscribed_topics: Arc<RwLock<HashMap<TopicHash, bool>>>,
53}
54
55impl SwarmBridge {
56 pub async fn new(span: Span) -> Result<Self> {
57 let keypair = Keypair::generate_ed25519();
58 let swarm_manager = SwarmManager::new(span.clone(), keypair)?;
59
60 Ok(Self {
61 span,
62 swarm_manager: Arc::new(Mutex::new(swarm_manager)),
63 connected_peers: Arc::new(RwLock::new(Vec::new())),
64 topic_peers: Arc::new(RwLock::new(HashMap::new())),
65 subscribed_topics: Arc::new(RwLock::new(HashMap::new())),
66 })
67 }
68
69 pub fn span(&self) -> &Span {
71 &self.span
72 }
73
74 pub async fn start(&self) -> Result<()> {
75 let _entered = self.span.enter();
76 let mut manager = self.swarm_manager.lock().await;
77 manager.start().await?;
78 tracing::info!("SwarmBridge iniciada com SwarmManager");
79 Ok(())
80 }
81
82 pub async fn update_connected_peers(&self, peers: Vec<PeerId>) {
84 let _entered = self.span.enter();
85 let mut connected = self.connected_peers.write().await;
86 *connected = peers.clone();
87
88 let manager = self.swarm_manager.lock().await;
90 for peer in peers {
91 manager.notify_peer_connected(peer).await;
92 }
93
94 tracing::debug!(
95 "Peers conectados atualizados pelo SwarmManager: {}",
96 connected.len()
97 );
98 }
99
100 pub async fn update_topic_peers(&self, topic: TopicHash, peers: Vec<PeerId>) {
102 let mut topic_peers = self.topic_peers.write().await;
103 topic_peers.insert(topic.clone(), peers.clone());
104
105 let manager = self.swarm_manager.lock().await;
107 manager
108 .update_topic_peers(topic.clone(), peers.clone())
109 .await;
110
111 tracing::debug!(
112 "Peers do tópico {:?} atualizados pelo SwarmManager: {}",
113 topic,
114 peers.len()
115 );
116 }
117
118 async fn publish(&self, topic: &TopicHash, message: &[u8]) -> Result<()> {
120 let manager = self.swarm_manager.lock().await;
121 manager.publish_message(topic, message).await?;
122 tracing::debug!(
123 "Mensagem publicada pelo SwarmManager no tópico: {:?}",
124 topic
125 );
126 Ok(())
127 }
128
129 pub async fn stop(&self) -> Result<()> {
130 let manager = self.swarm_manager.lock().await;
131 manager.stop().await?;
132 tracing::info!("SwarmBridge parada");
133 Ok(())
134 }
135
136 pub async fn get_interface_stats(&self) -> HashMap<String, u64> {
138 let manager = self.swarm_manager.lock().await;
139 let mut stats = manager.get_detailed_stats().await;
140
141 let connected = self.connected_peers.read().await;
143 stats.insert(
144 "interface_connected_peers".to_string(),
145 connected.len() as u64,
146 );
147
148 let topics = self.topic_peers.read().await;
149 stats.insert("interface_tracked_topics".to_string(), topics.len() as u64);
150
151 stats
152 }
153}
154
155impl DirectChannelNetwork for SwarmBridge {
156 fn publish_message(&self, topic: &TopicHash, message: &[u8]) -> Result<()> {
157 tracing::debug!(
158 "Publicando mensagem no tópico: {:?}, {} bytes",
159 topic,
160 message.len()
161 );
162
163 let subscribed = {
165 let topics = futures::executor::block_on(self.subscribed_topics.read());
166 topics.get(topic).copied().unwrap_or(false)
167 };
168
169 if !subscribed {
170 return Err(GuardianError::Other(format!(
171 "Tópico {:?} não está inscrito",
172 topic
173 )));
174 }
175
176 futures::executor::block_on(self.publish(topic, message))?;
178
179 tracing::info!(
180 "Mensagem publicada com sucesso no tópico via SwarmManager: {:?}",
181 topic
182 );
183 Ok(())
184 }
185
186 fn subscribe_topic(&self, topic: &TopicHash) -> Result<()> {
187 tracing::debug!("Inscrevendo no tópico: {:?}", topic);
188
189 let mut topics = futures::executor::block_on(self.subscribed_topics.write());
191 topics.insert(topic.clone(), true);
192
193 let mut topic_peers = futures::executor::block_on(self.topic_peers.write());
195 if !topic_peers.contains_key(topic) {
196 topic_peers.insert(topic.clone(), Vec::new());
197 }
198
199 let manager = futures::executor::block_on(self.swarm_manager.lock());
201 futures::executor::block_on(manager.subscribe_topic(topic))?;
202
203 tracing::info!(
204 "Inscrição realizada com sucesso no tópico via SwarmManager: {:?}",
205 topic
206 );
207 Ok(())
208 }
209
210 fn get_connected_peers(&self) -> Vec<PeerId> {
211 let peers = futures::executor::block_on(self.connected_peers.read());
212 let peer_list = peers.clone();
213 tracing::debug!(
214 "Retornando {} peers conectados via SwarmManager",
215 peer_list.len()
216 );
217 peer_list
218 }
219
220 fn get_topic_peers(&self, topic: &TopicHash) -> Vec<PeerId> {
221 tracing::debug!("Obtendo peers do tópico: {:?}", topic);
222
223 let topic_peers = futures::executor::block_on(self.topic_peers.read());
224 let peers = topic_peers.get(topic).cloned().unwrap_or_default();
225
226 tracing::debug!(
227 "Tópico {:?} tem {} peers conectados via SwarmManager",
228 topic,
229 peers.len()
230 );
231 peers
232 }
233}
234
235#[derive(Debug, Clone)]
237struct ChannelState {
238 #[allow(dead_code)]
239 peer_id: PeerId,
240 topic: TopicHash,
241 connection_status: ConnectionStatus,
242 last_activity: Instant,
243 message_count: u64,
244 last_heartbeat: Instant,
245}
246
247#[derive(Debug, Clone)]
248enum ConnectionStatus {
249 Disconnected,
250 Connecting,
251 Connected,
252 #[allow(dead_code)]
253 Error(String),
254}
255
256#[derive(Debug)]
258enum DirectChannelEvent {
259 PeerConnected(PeerId),
260 PeerDisconnected(PeerId),
261 MessageReceived {
262 peer: PeerId,
263 payload: Vec<u8>,
264 },
265 MessageSent {
266 peer: PeerId,
267 success: bool,
268 error: Option<String>,
269 },
270 HeartbeatReceived(PeerId),
271 HeartbeatTimeout(PeerId),
272}
273
274pub struct DirectChannel {
275 span: Span,
276 libp2p: Arc<dyn DirectChannelNetwork>,
277 emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
278 channels: Arc<RwLock<HashMap<PeerId, ChannelState>>>,
279 event_sender: mpsc::UnboundedSender<DirectChannelEvent>,
280 _event_receiver: Arc<Mutex<Option<mpsc::UnboundedReceiver<DirectChannelEvent>>>>,
281 own_peer_id: PeerId,
282 running: Arc<Mutex<bool>>,
283}
284
285impl DirectChannel {
286 pub fn new(
288 span: Span,
289 libp2p: Arc<dyn DirectChannelNetwork>,
290 emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
291 own_peer_id: PeerId,
292 ) -> Self {
293 let (event_sender, event_receiver) = mpsc::unbounded_channel();
294
295 Self {
296 span,
297 libp2p,
298 emitter,
299 channels: Arc::new(RwLock::new(HashMap::new())),
300 event_sender,
301 _event_receiver: Arc::new(Mutex::new(Some(event_receiver))),
302 own_peer_id,
303 running: Arc::new(Mutex::new(false)),
304 }
305 }
306
307 fn get_channel_topic(&self, peer: PeerId) -> TopicHash {
309 let (first, second) = if self.own_peer_id < peer {
311 (self.own_peer_id, peer)
312 } else {
313 (peer, self.own_peer_id)
314 };
315 let topic_string = format!("{}/channel/{}/{}", PROTOCOL, first, second);
316 TopicHash::from_raw(topic_string)
317 }
318
319 pub async fn start(&self) -> Result<()> {
321 let mut running = self.running.lock().await;
322 if *running {
323 return Ok(());
324 }
325 *running = true;
326
327 let mut receiver = self
328 ._event_receiver
329 .lock()
330 .await
331 .take()
332 .ok_or_else(|| GuardianError::Other("Event receiver already taken".to_string()))?;
333
334 let emitter = self.emitter.clone();
335 let span = self.span.clone();
336 let channels = self.channels.clone();
337 let running_flag = self.running.clone();
338
339 tokio::spawn(async move {
340 while let Some(event) = receiver.recv().await {
341 let running = *running_flag.lock().await;
342 if !running {
343 break;
344 }
345
346 if let Err(e) = Self::handle_event(event, &emitter, &span, &channels).await {
347 tracing::error!("Erro ao processar evento: {}", e);
348 }
349 }
350 tracing::info!("Event processing loop terminated");
351 });
352
353 self.start_heartbeat_loop().await;
355
356 Ok(())
357 }
358
359 async fn start_heartbeat_loop(&self) {
361 let channels = self.channels.clone();
362 let event_sender = self.event_sender.clone();
363 let span = self.span.clone();
364 let running_flag = self.running.clone();
365 let libp2p = self.libp2p.clone();
366
367 tokio::spawn(async move {
368 let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
369
370 loop {
371 interval.tick().await;
372
373 let running = *running_flag.lock().await;
374 if !running {
375 break;
376 }
377
378 let peers_to_heartbeat: Vec<(PeerId, TopicHash)> = {
379 let channels_map = channels.read().await;
380 channels_map
381 .iter()
382 .filter_map(|(peer_id, state)| {
383 match state.connection_status {
384 ConnectionStatus::Connected => {
385 if state.last_heartbeat.elapsed() > HEARTBEAT_INTERVAL {
387 Some((*peer_id, state.topic.clone()))
388 } else {
389 None
390 }
391 }
392 _ => None,
393 }
394 })
395 .collect()
396 };
397
398 for (peer, topic) in peers_to_heartbeat {
399 if let Err(e) = Self::send_heartbeat(&libp2p, &topic, &span).await {
401 tracing::warn!("Falha ao enviar heartbeat para {}: {}", peer, e);
402 let _ = event_sender.send(DirectChannelEvent::HeartbeatTimeout(peer));
403 } else {
404 tracing::trace!(peer = %peer, "Heartbeat enviado para peer");
405 }
406 }
407
408 let peers_to_reconnect: Vec<PeerId> = {
410 let channels_map = channels.read().await;
411 channels_map
412 .iter()
413 .filter_map(|(peer_id, state)| {
414 match &state.connection_status {
415 ConnectionStatus::Error(err) => {
416 if state.last_activity.elapsed() > Duration::from_secs(30) {
418 tracing::debug!(
419 "Tentando reconexão com peer {} após erro: {}",
420 peer_id,
421 err
422 );
423 Some(*peer_id)
424 } else {
425 None
426 }
427 }
428 ConnectionStatus::Disconnected => {
429 if state.last_activity.elapsed() > Duration::from_secs(60) {
431 tracing::debug!(
432 "Tentando reconexão com peer desconectado: {}",
433 peer_id
434 );
435 Some(*peer_id)
436 } else {
437 None
438 }
439 }
440 _ => None,
441 }
442 })
443 .collect()
444 };
445
446 for peer in peers_to_reconnect {
448 let mut channels_map = channels.write().await;
449 if let Some(state) = channels_map.get_mut(&peer) {
450 state.connection_status = ConnectionStatus::Connecting;
451 state.last_activity = Instant::now();
452
453 if let Err(e) = Self::send_heartbeat(&libp2p, &state.topic, &span).await {
455 tracing::warn!("Falha na tentativa de reconexão com {}: {}", peer, e);
456 } else {
457 tracing::info!("Tentativa de reconexão iniciada para peer: {}", peer);
458 }
459 }
460 }
461 }
462 });
463 }
464
465 async fn send_heartbeat(
467 libp2p: &Arc<dyn DirectChannelNetwork>,
468 topic: &TopicHash,
469 _span: &Span,
470 ) -> Result<()> {
471 let heartbeat_msg = DirectChannelMessage {
472 message_type: MessageType::Heartbeat,
473 payload: vec![],
474 timestamp: std::time::SystemTime::now()
475 .duration_since(std::time::UNIX_EPOCH)
476 .unwrap_or_default()
477 .as_secs(),
478 sender: "heartbeat".to_string(),
479 };
480
481 let serialized = serde_cbor::to_vec(&heartbeat_msg)
482 .map_err(|e| GuardianError::Other(format!("Erro de serialização heartbeat: {}", e)))?;
483
484 libp2p.publish_message(topic, &serialized)?;
485 tracing::trace!(topic = ?topic, "Heartbeat enviado no tópico");
486 Ok(())
487 }
488
489 async fn handle_event(
491 event: DirectChannelEvent,
492 emitter: &Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
493 _span: &Span,
494 channels: &Arc<RwLock<HashMap<PeerId, ChannelState>>>,
495 ) -> Result<()> {
496 match event {
497 DirectChannelEvent::MessageReceived { peer, payload } => {
498 tracing::debug!("Mensagem recebida de {}: {} bytes", peer, payload.len());
499
500 if payload.len() > MAX_MESSAGE_SIZE {
502 tracing::warn!("Mensagem muito grande de {}: {} bytes", peer, payload.len());
503 return Ok(());
504 }
505
506 {
508 let mut channels_map = channels.write().await;
509 if let Some(state) = channels_map.get_mut(&peer) {
510 state.last_activity = Instant::now();
511 state.message_count += 1;
512 }
513 }
514
515 let event_payload = EventPubSubPayload { payload, peer };
516 emitter
517 .emit(event_payload)
518 .await
519 .map_err(|e| GuardianError::Other(format!("Erro ao emitir evento: {}", e)))?;
520 }
521 DirectChannelEvent::PeerConnected(peer) => {
522 tracing::info!("Peer conectado: {}", peer);
523 let mut channels_map = channels.write().await;
524 if let Some(state) = channels_map.get_mut(&peer) {
525 state.connection_status = ConnectionStatus::Connected;
526 state.last_activity = Instant::now();
527 state.last_heartbeat = Instant::now();
528 }
529 }
530 DirectChannelEvent::PeerDisconnected(peer) => {
531 tracing::info!("Peer desconectado: {}", peer);
532 let mut channels_map = channels.write().await;
533 if let Some(state) = channels_map.get_mut(&peer) {
534 state.connection_status = ConnectionStatus::Disconnected;
535 }
536 }
537 DirectChannelEvent::MessageSent {
538 peer,
539 success,
540 error,
541 } => {
542 if success {
543 tracing::debug!("Mensagem enviada com sucesso para: {}", peer);
544 } else {
545 tracing::warn!("Falha ao enviar mensagem para {}: {:?}", peer, error);
546 }
547 }
548 DirectChannelEvent::HeartbeatReceived(peer) => {
549 tracing::trace!(peer = %peer, "Heartbeat recebido de");
550 let mut channels_map = channels.write().await;
551 if let Some(state) = channels_map.get_mut(&peer) {
552 state.last_activity = Instant::now();
553 state.last_heartbeat = Instant::now();
554 }
555 }
556 DirectChannelEvent::HeartbeatTimeout(peer) => {
557 tracing::warn!("Timeout de heartbeat para peer: {}", peer);
558 let mut channels_map = channels.write().await;
559 if let Some(state) = channels_map.get_mut(&peer) {
560 state.connection_status =
561 ConnectionStatus::Error("Heartbeat timeout".to_string());
562 }
563 }
564 }
565 Ok(())
566 }
567
568 pub async fn send_data(&self, peer: PeerId, payload: Vec<u8>) -> Result<()> {
570 if payload.len() > MAX_MESSAGE_SIZE {
571 return Err(GuardianError::Other(format!(
572 "Mensagem muito grande: {} bytes (máximo: {})",
573 payload.len(),
574 MAX_MESSAGE_SIZE
575 )));
576 }
577
578 let topic = self.get_channel_topic(peer);
579 let message = DirectChannelMessage {
580 message_type: MessageType::Data,
581 payload,
582 timestamp: std::time::SystemTime::now()
583 .duration_since(std::time::UNIX_EPOCH)
584 .unwrap_or_default()
585 .as_secs(),
586 sender: self.own_peer_id.to_string(),
587 };
588
589 let serialized = serde_cbor::to_vec(&message)
590 .map_err(|e| GuardianError::Other(format!("Erro de serialização: {}", e)))?;
591
592 match self.libp2p.publish_message(&topic, &serialized) {
593 Ok(()) => {
594 let _ = self.event_sender.send(DirectChannelEvent::MessageSent {
595 peer,
596 success: true,
597 error: None,
598 });
599 tracing::debug!(
600 "Dados enviados para {}: {} bytes",
601 peer,
602 message.payload.len()
603 );
604 Ok(())
605 }
606 Err(e) => {
607 let error_msg = format!("Erro ao publicar mensagem: {}", e);
608 let _ = self.event_sender.send(DirectChannelEvent::MessageSent {
609 peer,
610 success: false,
611 error: Some(error_msg.clone()),
612 });
613 Err(GuardianError::Other(error_msg))
614 }
615 }
616 }
617
618 pub async fn connect_to_peer(&self, peer: PeerId) -> Result<()> {
620 let topic = self.get_channel_topic(peer);
621 let mut channels_map = self.channels.write().await;
622
623 if let Some(state) = channels_map.get(&peer) {
624 match state.connection_status {
625 ConnectionStatus::Connected => {
626 tracing::debug!("Já conectado ao peer: {}", peer);
627 return Ok(());
628 }
629 ConnectionStatus::Connecting => {
630 tracing::debug!("Conexão em andamento com peer: {}", peer);
631 return Ok(());
632 }
633 _ => {}
634 }
635 }
636 self.libp2p.subscribe_topic(&topic)?;
638 channels_map.insert(
640 peer,
641 ChannelState {
642 peer_id: peer,
643 topic: topic.clone(),
644 connection_status: ConnectionStatus::Connecting,
645 last_activity: Instant::now(),
646 message_count: 0,
647 last_heartbeat: Instant::now(),
648 },
649 );
650 tracing::info!("Conectando ao peer {} no tópico: {:?}", peer, topic);
651 self.establish_peer_connection(peer, topic.clone()).await?;
652 Ok(())
653 }
654
655 async fn establish_peer_connection(&self, peer: PeerId, topic: TopicHash) -> Result<()> {
657 tracing::debug!("Estabelecendo conexão com peer: {}", peer);
658
659 let connected_peers = self.libp2p.get_connected_peers();
661 let is_peer_connected = connected_peers.contains(&peer);
662
663 if is_peer_connected {
664 tracing::debug!("Peer {} já está conectado globalmente", peer);
665 let _ = self
667 .event_sender
668 .send(DirectChannelEvent::PeerConnected(peer));
669 return Ok(());
670 }
671
672 let discovery_timeout = Duration::from_secs(10);
674 let start_time = Instant::now();
675
676 while start_time.elapsed() < discovery_timeout {
677 let topic_peers = self.libp2p.get_topic_peers(&topic);
679
680 if topic_peers.contains(&peer) {
681 tracing::info!("Peer {} descoberto no tópico: {:?}", peer, topic);
682
683 if self.send_handshake_message(&topic, peer).await.is_ok() {
685 tracing::info!("Handshake bem-sucedido com peer: {}", peer);
686 let _ = self
687 .event_sender
688 .send(DirectChannelEvent::PeerConnected(peer));
689 return Ok(());
690 }
691 }
692
693 let updated_peers = self.libp2p.get_connected_peers();
695 if updated_peers.contains(&peer) {
696 tracing::info!("Peer {} conectado via discovery global", peer);
697 let _ = self
698 .event_sender
699 .send(DirectChannelEvent::PeerConnected(peer));
700 return Ok(());
701 }
702
703 tokio::time::sleep(Duration::from_millis(500)).await;
705 }
706
707 tracing::warn!(
709 "Peer {} não encontrado diretamente, enviando beacon de descoberta",
710 peer
711 );
712 if let Err(e) = self.send_discovery_beacon(&topic, peer).await {
713 tracing::error!("Falha ao enviar beacon de descoberta para {}: {}", peer, e);
714
715 let mut channels_map = self.channels.write().await;
717 if let Some(state) = channels_map.get_mut(&peer) {
718 state.connection_status =
719 ConnectionStatus::Error(format!("Discovery timeout: {}", e));
720 }
721
722 return Err(GuardianError::Other(format!(
723 "Timeout na descoberta do peer {} após {}s",
724 peer,
725 discovery_timeout.as_secs()
726 )));
727 }
728
729 let beacon_timeout = Duration::from_secs(5);
731 let beacon_start = Instant::now();
732
733 while beacon_start.elapsed() < beacon_timeout {
734 let topic_peers = self.libp2p.get_topic_peers(&topic);
735 if topic_peers.contains(&peer) {
736 tracing::info!("Peer {} respondeu ao beacon de descoberta", peer);
737 let _ = self
738 .event_sender
739 .send(DirectChannelEvent::PeerConnected(peer));
740 return Ok(());
741 }
742
743 tokio::time::sleep(Duration::from_millis(200)).await;
744 }
745
746 tracing::warn!(
748 "Conexão com peer {} não pôde ser estabelecida no momento",
749 peer
750 );
751 Ok(())
752 }
753
754 async fn send_handshake_message(&self, topic: &TopicHash, target_peer: PeerId) -> Result<()> {
756 let handshake_msg = DirectChannelMessage {
757 message_type: MessageType::Ack, payload: format!("handshake:{}", self.own_peer_id).into_bytes(),
759 timestamp: std::time::SystemTime::now()
760 .duration_since(std::time::UNIX_EPOCH)
761 .unwrap_or_default()
762 .as_secs(),
763 sender: self.own_peer_id.to_string(),
764 };
765
766 let serialized = serde_cbor::to_vec(&handshake_msg)
767 .map_err(|e| GuardianError::Other(format!("Erro serialização handshake: {}", e)))?;
768
769 self.libp2p.publish_message(topic, &serialized)?;
770 tracing::debug!("Handshake enviado para peer: {}", target_peer);
771 Ok(())
772 }
773
774 async fn send_discovery_beacon(&self, topic: &TopicHash, target_peer: PeerId) -> Result<()> {
776 let beacon_msg = DirectChannelMessage {
777 message_type: MessageType::Heartbeat, payload: format!("discovery_beacon:{}:{}", self.own_peer_id, target_peer).into_bytes(),
779 timestamp: std::time::SystemTime::now()
780 .duration_since(std::time::UNIX_EPOCH)
781 .unwrap_or_default()
782 .as_secs(),
783 sender: self.own_peer_id.to_string(),
784 };
785
786 let serialized = serde_cbor::to_vec(&beacon_msg)
787 .map_err(|e| GuardianError::Other(format!("Erro serialização beacon: {}", e)))?;
788
789 self.libp2p.publish_message(topic, &serialized)?;
790 tracing::debug!("Discovery beacon enviado no tópico: {:?}", topic);
791 Ok(())
792 }
793
794 pub async fn handle_gossipsub_message(&self, message: Message) -> Result<()> {
796 let decoded_msg: DirectChannelMessage = serde_cbor::from_slice(&message.data)
798 .map_err(|e| GuardianError::Other(format!("Erro ao decodificar mensagem: {}", e)))?;
799
800 let sender_peer = message
801 .source
802 .ok_or_else(|| GuardianError::Other("Mensagem sem remetente".to_string()))?;
803
804 match decoded_msg.message_type {
805 MessageType::Data => {
806 let _ = self.event_sender.send(DirectChannelEvent::MessageReceived {
807 peer: sender_peer,
808 payload: decoded_msg.payload,
809 });
810 }
811 MessageType::Heartbeat => {
812 if let Ok(payload_str) = String::from_utf8(decoded_msg.payload.clone()) {
814 if payload_str.starts_with("discovery_beacon:") {
815 self.handle_discovery_beacon(sender_peer, payload_str)
816 .await?;
817 } else {
818 let _ = self
819 .event_sender
820 .send(DirectChannelEvent::HeartbeatReceived(sender_peer));
821 }
822 } else {
823 let _ = self
824 .event_sender
825 .send(DirectChannelEvent::HeartbeatReceived(sender_peer));
826 }
827 }
828 MessageType::Ack => {
829 if let Ok(payload_str) = String::from_utf8(decoded_msg.payload.clone()) {
831 if payload_str.starts_with("handshake:") {
832 self.handle_handshake_response(sender_peer, payload_str)
833 .await?;
834 } else {
835 tracing::trace!(sender_peer = %sender_peer, "ACK recebido de");
836 }
837 } else {
838 tracing::trace!(sender_peer = %sender_peer, "ACK recebido de");
839 }
840 }
841 }
842
843 Ok(())
844 }
845
846 async fn handle_discovery_beacon(
848 &self,
849 sender_peer: PeerId,
850 beacon_payload: String,
851 ) -> Result<()> {
852 tracing::debug!(
853 "Discovery beacon recebido de: {} - {}",
854 sender_peer,
855 beacon_payload
856 );
857
858 let parts: Vec<&str> = beacon_payload.split(':').collect();
860 if parts.len() >= 3 {
861 let _beacon_sender = parts[1]; let beacon_target = parts[2];
863
864 if beacon_target == self.own_peer_id.to_string() {
866 tracing::info!("Beacon de descoberta direcionado a nós de: {}", sender_peer);
867
868 let channels_map = self.channels.read().await;
870 if let Some(state) = channels_map.get(&sender_peer)
871 && matches!(
872 state.connection_status,
873 ConnectionStatus::Connecting | ConnectionStatus::Disconnected
874 )
875 {
876 drop(channels_map); let topic = self.get_channel_topic(sender_peer);
880 if let Err(e) = self.send_handshake_message(&topic, sender_peer).await {
881 tracing::warn!("Falha ao responder beacon de {}: {}", sender_peer, e);
882 } else {
883 tracing::info!("Handshake de resposta enviado para: {}", sender_peer);
884 }
885 }
886 }
887 }
888
889 Ok(())
890 }
891
892 async fn handle_handshake_response(
894 &self,
895 sender_peer: PeerId,
896 handshake_payload: String,
897 ) -> Result<()> {
898 tracing::debug!(
899 "Handshake recebido de: {} - {}",
900 sender_peer,
901 handshake_payload
902 );
903
904 let parts: Vec<&str> = handshake_payload.split(':').collect();
906 if parts.len() >= 2 {
907 let handshake_peer = parts[1];
908 tracing::info!(
909 "Handshake válido recebido de peer: {} (id: {})",
910 sender_peer,
911 handshake_peer
912 );
913
914 let mut channels_map = self.channels.write().await;
916 if let Some(state) = channels_map.get_mut(&sender_peer) {
917 match state.connection_status {
918 ConnectionStatus::Connecting => {
919 state.connection_status = ConnectionStatus::Connected;
920 state.last_activity = Instant::now();
921 state.last_heartbeat = Instant::now();
922
923 let _ = self
925 .event_sender
926 .send(DirectChannelEvent::PeerConnected(sender_peer));
927
928 tracing::info!("Conexão estabelecida com peer: {}", sender_peer);
929 }
930 ConnectionStatus::Connected => {
931 state.last_activity = Instant::now();
933 state.last_heartbeat = Instant::now();
934 tracing::trace!("Handshake de manutenção recebido de: {}", sender_peer);
935 }
936 _ => {
937 tracing::debug!(
938 "Handshake recebido de peer em estado: {:?}",
939 state.connection_status
940 );
941 }
942 }
943 }
944 }
945
946 Ok(())
947 }
948
949 pub async fn stop(&self) -> Result<()> {
951 let mut running = self.running.lock().await;
952 *running = false;
953
954 let peers: Vec<PeerId> = {
956 let channels_map = self.channels.read().await;
957 channels_map.keys().cloned().collect()
958 };
959
960 for peer in peers {
961 let mut channels_map = self.channels.write().await;
962 if let Some(state) = channels_map.remove(&peer) {
963 tracing::info!("Peer removido: {} (tópico: {:?})", peer, state.topic);
964 let _ = self
965 .event_sender
966 .send(DirectChannelEvent::PeerDisconnected(peer));
967 }
968 }
969
970 tracing::info!("DirectChannel parado");
971 Ok(())
972 }
973
974 pub async fn list_connected_peers(&self) -> Vec<PeerId> {
976 let channels_map = self.channels.read().await;
977 channels_map
978 .iter()
979 .filter_map(|(peer_id, state)| match state.connection_status {
980 ConnectionStatus::Connected => Some(*peer_id),
981 _ => None,
982 })
983 .collect()
984 }
985
986 pub async fn get_channel_stats(&self) -> HashMap<PeerId, (u64, Duration)> {
988 let channels_map = self.channels.read().await;
989 channels_map
990 .iter()
991 .map(|(peer_id, state)| {
992 (
993 *peer_id,
994 (state.message_count, state.last_activity.elapsed()),
995 )
996 })
997 .collect()
998 }
999
1000 async fn close_internal(&self) -> Result<()> {
1002 tracing::info!("Fechando DirectChannel...");
1003
1004 self.stop().await?;
1006
1007 if let Err(e) = self.emitter.close().await {
1009 tracing::warn!("Erro ao fechar emitter: {}", e);
1010 }
1011
1012 tracing::info!("DirectChannel fechado com sucesso");
1013 Ok(())
1014 }
1015}
1016
1017#[async_trait]
1019impl crate::traits::DirectChannel for DirectChannel {
1020 type Error = GuardianError;
1021
1022 async fn connect(&mut self, peer: PeerId) -> std::result::Result<(), Self::Error> {
1023 tracing::info!("Conectando ao peer: {}", peer);
1024 self.connect_to_peer(peer).await
1025 }
1026
1027 async fn send(&mut self, peer: PeerId, data: Vec<u8>) -> std::result::Result<(), Self::Error> {
1028 tracing::debug!("Enviando {} bytes para {}", data.len(), peer);
1029 self.send_data(peer, data).await
1030 }
1031
1032 async fn close(&mut self) -> std::result::Result<(), Self::Error> {
1033 self.close_internal().await
1034 }
1035
1036 async fn close_shared(&self) -> std::result::Result<(), Self::Error> {
1037 self.close_internal().await
1038 }
1039
1040 fn as_any(&self) -> &dyn std::any::Any {
1041 self
1042 }
1043}
1044
1045pub struct HolderChannels {
1046 libp2p: Arc<dyn DirectChannelNetwork>,
1047 span: Span,
1048 own_peer_id: PeerId,
1049}
1050
1051impl HolderChannels {
1052 pub fn new(span: Span, libp2p: Arc<dyn DirectChannelNetwork>, own_peer_id: PeerId) -> Self {
1053 Self {
1054 libp2p,
1055 span,
1056 own_peer_id,
1057 }
1058 }
1059
1060 pub async fn new_channel(
1061 &self,
1062 emitter: Box<dyn DirectChannelEmitter<Error = GuardianError>>,
1063 opts: Option<DirectChannelOptions>,
1064 ) -> Result<Box<dyn crate::traits::DirectChannel<Error = GuardianError>>> {
1065 let resolved_opts = opts.unwrap_or_default();
1066 let span = resolved_opts.span.unwrap_or_else(|| self.span.clone());
1067
1068 let dc = DirectChannel::new(
1069 span.clone(),
1070 self.libp2p.clone(),
1071 Arc::from(emitter),
1072 self.own_peer_id,
1073 );
1074
1075 dc.start().await?;
1077
1078 tracing::info!(protocol = PROTOCOL, "DirectChannel criado com protocolo");
1079
1080 Ok(Box::new(dc))
1081 }
1082}
1083
1084pub fn init_direct_channel_factory(span: Span, own_peer_id: PeerId) -> DirectChannelFactory {
1085 Arc::new(
1086 move |emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
1087 opts: Option<DirectChannelOptions>| {
1088 let span = span.clone();
1089 let own_peer_id = own_peer_id;
1090 Box::pin(async move {
1091 tracing::info!(
1092 "Inicializando DirectChannel factory para peer: {}",
1093 own_peer_id
1094 );
1095
1096 let libp2p_interface = Arc::new(
1098 create_unified_libp2p_interface(span.clone())
1099 .await
1100 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
1101 );
1102
1103 let holder = HolderChannels::new(span.clone(), libp2p_interface, own_peer_id);
1105
1106 let emitter_box = Box::new(EmitterWrapper(emitter));
1108
1109 let channel = holder
1111 .new_channel(emitter_box, opts)
1112 .await
1113 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
1114
1115 Ok(Arc::from(channel)
1116 as Arc<
1117 dyn crate::traits::DirectChannel<Error = GuardianError>,
1118 >)
1119 })
1120 },
1121 )
1122}
1123
1124struct EmitterWrapper(Arc<dyn DirectChannelEmitter<Error = GuardianError>>);
1126
1127#[async_trait]
1128impl DirectChannelEmitter for EmitterWrapper {
1129 type Error = GuardianError;
1130
1131 async fn emit(&self, payload: EventPubSubPayload) -> std::result::Result<(), Self::Error> {
1132 self.0.emit(payload).await
1133 }
1134
1135 async fn close(&self) -> std::result::Result<(), Self::Error> {
1136 self.0.close().await
1137 }
1138}
1139
1140pub async fn create_direct_channel_with_libp2p(
1142 libp2p: Arc<dyn DirectChannelNetwork>,
1143 emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
1144 span: Span,
1145 own_peer_id: PeerId,
1146) -> Result<DirectChannel> {
1147 let channel = DirectChannel::new(span.clone(), libp2p, emitter, own_peer_id);
1148
1149 channel.start().await?;
1151
1152 tracing::info!("DirectChannel criado com interface libp2p integrada");
1153 Ok(channel)
1154}
1155
1156pub async fn create_unified_libp2p_interface(span: Span) -> Result<SwarmBridge> {
1158 let interface = SwarmBridge::new(span.clone()).await?;
1159
1160 interface.start().await?;
1162
1163 tracing::info!("Interface libp2p unificada inicializada com SwarmManager integrado");
1164 Ok(interface)
1165}
1166
1167pub fn create_test_peer_id() -> PeerId {
1169 let keypair = libp2p::identity::Keypair::generate_ed25519();
1170 PeerId::from(keypair.public())
1171}