guardian_db/ipfs_core_api/backends/
iroh_pubsub.rs

1// Integração do Gossipsub com IrohBackend
2//
3// Implementação de PubSubInterface
4// usando o SwarmManager do LibP2P Gossipsub integrado ao Iroh
5
6use crate::error::{GuardianError, Result};
7use crate::ipfs_core_api::backends::iroh::IrohBackend;
8use crate::p2p::manager::SwarmManager;
9use crate::traits::{EventPubSubMessage, PubSubInterface, PubSubTopic};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use libp2p::{PeerId, gossipsub::TopicHash};
13use std::collections::HashMap;
14use std::pin::Pin;
15use std::sync::Arc;
16use tokio::sync::{RwLock, broadcast};
17use tokio_stream::wrappers::BroadcastStream;
18use tracing::{debug, info, warn};
19
20/// Wrapper do IrohBackend que implementa PubSubInterface
21pub struct IrohPubSub {
22    /// Referência ao backend Iroh
23    iroh_backend: Arc<IrohBackend>,
24    /// Cache de tópicos ativos
25    topics: Arc<RwLock<HashMap<String, Arc<IrohTopic>>>>,
26}
27
28/// Implementação de PubSubTopic para Iroh com Gossipsub
29pub struct IrohTopic {
30    /// Nome do tópico
31    topic_name: String,
32    /// Hash do tópico para Gossipsub
33    topic_hash: TopicHash,
34    /// Referência ao SwarmManager
35    swarm: Arc<RwLock<Option<SwarmManager>>>,
36    /// Canal para broadcast de mensagens recebidas
37    message_sender: Arc<broadcast::Sender<Vec<u8>>>,
38    /// Peers conectados neste tópico
39    peers: Arc<RwLock<Vec<PeerId>>>,
40    /// Canal para eventos de peers (join/leave)
41    peer_events_sender: Arc<broadcast::Sender<crate::events::Event>>,
42}
43
44impl IrohPubSub {
45    /// Cria uma nova instância do IrohPubSub
46    pub fn new(iroh_backend: Arc<IrohBackend>) -> Self {
47        Self {
48            iroh_backend,
49            topics: Arc::new(RwLock::new(HashMap::new())),
50        }
51    }
52
53    /// Obtém ou cria um tópico
54    async fn get_or_create_topic(&self, topic: &str) -> Result<Arc<IrohTopic>> {
55        let mut topics = self.topics.write().await;
56
57        if let Some(existing_topic) = topics.get(topic) {
58            return Ok(existing_topic.clone());
59        }
60
61        // Cria novo tópico
62        let topic_hash = TopicHash::from_raw(topic);
63        let (sender, _) = broadcast::channel(1000); // Buffer para 1000 mensagens
64        let (peer_events_sender, _) = broadcast::channel(1000); // Buffer para eventos de peers
65
66        // Obtém referência ao SwarmManager do IrohBackend
67        let swarm = self.iroh_backend.get_swarm_manager().await?;
68
69        // Subscreve ao tópico no Gossipsub
70        {
71            let swarm_lock = swarm.read().await;
72            if let Some(swarm_manager) = swarm_lock.as_ref() {
73                swarm_manager
74                    .subscribe_topic(&topic_hash)
75                    .await
76                    .map_err(|e| {
77                        GuardianError::Other(format!("Erro ao subscrever tópico {}: {}", topic, e))
78                    })?;
79                info!("Subscrito com sucesso ao tópico Gossipsub: {}", topic);
80            } else {
81                return Err(GuardianError::Other(
82                    "SwarmManager não disponível".to_string(),
83                ));
84            }
85        }
86
87        let iroh_topic = Arc::new(IrohTopic {
88            topic_name: topic.to_string(),
89            topic_hash,
90            swarm,
91            message_sender: Arc::new(sender),
92            peers: Arc::new(RwLock::new(Vec::new())),
93            peer_events_sender: Arc::new(peer_events_sender),
94        });
95
96        topics.insert(topic.to_string(), iroh_topic.clone());
97
98        Ok(iroh_topic)
99    }
100}
101
102#[async_trait]
103impl PubSubInterface for IrohPubSub {
104    type Error = GuardianError;
105
106    async fn topic_subscribe(
107        &mut self,
108        topic: &str,
109    ) -> std::result::Result<Arc<dyn PubSubTopic<Error = GuardianError>>, Self::Error> {
110        debug!("Subscrevendo ao tópico via IrohPubSub: {}", topic);
111
112        let iroh_topic = self.get_or_create_topic(topic).await?;
113
114        Ok(iroh_topic as Arc<dyn PubSubTopic<Error = GuardianError>>)
115    }
116
117    fn as_any(&self) -> &dyn std::any::Any {
118        self
119    }
120}
121
122impl IrohTopic {
123    /// Publica mensagem no tópico
124    pub async fn publish(&self, data: &[u8]) -> Result<()> {
125        debug!(
126            "Publicando mensagem no tópico {}: {} bytes",
127            self.topic_name,
128            data.len()
129        );
130
131        let swarm_lock = self.swarm.read().await;
132        if let Some(swarm_manager) = swarm_lock.as_ref() {
133            swarm_manager
134                .publish_message(&self.topic_hash, data)
135                .await
136                .map_err(|e| {
137                    GuardianError::Other(format!("Erro ao publicar no Gossipsub: {}", e))
138                })?;
139
140            debug!(
141                "Mensagem publicada com sucesso no tópico {}",
142                self.topic_name
143            );
144            Ok(())
145        } else {
146            Err(GuardianError::Other(
147                "SwarmManager não disponível".to_string(),
148            ))
149        }
150    }
151
152    /// Cria um receiver para mensagens do tópico
153    pub fn subscribe_messages(&self) -> broadcast::Receiver<Vec<u8>> {
154        self.message_sender.subscribe()
155    }
156
157    /// Notifica sobre mensagem recebida (chamado pelo event loop do SwarmManager)
158    pub async fn notify_message(&self, data: Vec<u8>) -> Result<()> {
159        match self.message_sender.send(data) {
160            Ok(_) => {
161                debug!(
162                    "Mensagem propagada para subscribers do tópico {}",
163                    self.topic_name
164                );
165                Ok(())
166            }
167            Err(_) => {
168                warn!("Nenhum subscriber ativo para o tópico {}", self.topic_name);
169                Ok(()) // Não é erro fatal
170            }
171        }
172    }
173
174    /// Adiciona peer ao tópico
175    pub async fn add_peer(&self, peer_id: PeerId) {
176        let mut peers = self.peers.write().await;
177        if !peers.contains(&peer_id) {
178            peers.push(peer_id);
179            debug!("Peer {} adicionado ao tópico {}", peer_id, self.topic_name);
180
181            // Envia evento de peer conectado (simplesmente usando o PeerId como evento)
182            let event: crate::events::Event =
183                Arc::new(format!("PeerConnected:{}:{}", peer_id, self.topic_name));
184            let _ = self.peer_events_sender.send(event);
185        }
186    }
187
188    /// Remove peer do tópico
189    pub async fn remove_peer(&self, peer_id: &PeerId) {
190        let mut peers = self.peers.write().await;
191        let was_present = peers.iter().any(|p| p == peer_id);
192        peers.retain(|p| p != peer_id);
193
194        if was_present {
195            debug!("Peer {} removido do tópico {}", peer_id, self.topic_name);
196
197            // Envia evento de peer desconectado (simplesmente usando o PeerId como evento)
198            let event: crate::events::Event =
199                Arc::new(format!("PeerDisconnected:{}:{}", peer_id, self.topic_name));
200            let _ = self.peer_events_sender.send(event);
201        }
202    }
203
204    /// Lista peers conectados ao tópico
205    pub async fn list_peers(&self) -> Vec<PeerId> {
206        let peers = self.peers.read().await;
207        peers.clone()
208    }
209}
210
211#[async_trait]
212impl PubSubTopic for IrohTopic {
213    type Error = GuardianError;
214
215    async fn publish(&self, message: Vec<u8>) -> std::result::Result<(), Self::Error> {
216        self.publish(&message).await
217    }
218
219    async fn peers(&self) -> std::result::Result<Vec<PeerId>, Self::Error> {
220        Ok(self.list_peers().await)
221    }
222
223    async fn watch_peers(
224        &self,
225    ) -> std::result::Result<Pin<Box<dyn Stream<Item = crate::events::Event> + Send>>, Self::Error>
226    {
227        // Cria receiver do canal de eventos de peers
228        let receiver = self.peer_events_sender.subscribe();
229
230        // Converte broadcast::Receiver em Stream
231        let stream = BroadcastStream::new(receiver).filter_map(|result| async {
232            result.ok() // Ignora erros de lagged/closed
233        });
234
235        Ok(Box::pin(stream))
236    }
237
238    async fn watch_messages(
239        &self,
240    ) -> std::result::Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>, Self::Error>
241    {
242        // Cria receiver do canal de mensagens
243        let receiver = self.message_sender.subscribe();
244        let _topic_name = self.topic_name.clone();
245
246        // Converte broadcast::Receiver em Stream de EventPubSubMessage
247        let stream = BroadcastStream::new(receiver).filter_map(move |result| async move {
248            match result {
249                Ok(data) => {
250                    // Cria EventPubSubMessage a partir dos dados recebidos
251                    Some(EventPubSubMessage { content: data })
252                }
253                Err(_) => None, // Ignora erros de lagged/closed
254            }
255        });
256
257        Ok(Box::pin(stream))
258    }
259
260    fn topic(&self) -> &str {
261        &self.topic_name
262    }
263}
264
265impl IrohBackend {
266    /// Cria uma interface PubSub para este backend
267    pub fn create_pubsub_interface(self: Arc<Self>) -> IrohPubSub {
268        IrohPubSub::new(self)
269    }
270}