guardian_db/p2p/pubsub/
raw.rs

1use crate::error::{GuardianError, Result};
2use crate::events;
3use crate::ipfs_core_api::IpfsClient;
4use crate::p2p::events::new_event_message;
5use crate::traits::{EventPubSubMessage, PubSubInterface, PubSubTopic};
6use async_trait::async_trait;
7use futures::stream::{Stream, StreamExt};
8use libp2p::PeerId;
9use std::collections::HashMap;
10use std::pin::Pin;
11use std::sync::Arc;
12use tokio::sync::{RwLock, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, warn};
16
17/// Tópico PubSub integrado com ipfs_core_api
18pub struct PsTopic {
19    topic_name: String,
20    ipfs_client: Arc<IpfsClient>,
21    // Token para cancelamento de operações
22    cancellation_token: CancellationToken,
23}
24
25/// PubSub principal usando IPFS Core API
26pub struct RawPubSub {
27    ipfs_client: Arc<IpfsClient>,
28    id: PeerId,
29    // Mapa thread-safe de tópicos ativos
30    topics: RwLock<HashMap<String, Arc<PsTopic>>>,
31}
32
33#[async_trait]
34impl PubSubTopic for PsTopic {
35    type Error = GuardianError;
36
37    /// Publica uma mensagem no tópico usando ipfs_core_api
38    async fn publish(&self, message: Vec<u8>) -> std::result::Result<(), Self::Error> {
39        info!(
40            "Publicando mensagem no tópico '{}': {} bytes",
41            self.topic_name,
42            message.len()
43        );
44
45        self.ipfs_client
46            .pubsub_publish(&self.topic_name, &message)
47            .await?;
48
49        debug!(
50            "Mensagem publicada com sucesso no tópico '{}'",
51            self.topic_name
52        );
53        Ok(())
54    }
55
56    /// Lista peers conectados ao tópico
57    async fn peers(&self) -> std::result::Result<Vec<PeerId>, Self::Error> {
58        debug!("Listando peers do tópico: {}", self.topic_name);
59
60        let peers = self.ipfs_client.pubsub_peers(&self.topic_name).await?;
61
62        debug!(
63            "Encontrados {} peers para tópico '{}'",
64            peers.len(),
65            self.topic_name
66        );
67        Ok(peers)
68    }
69
70    /// Monitora mudanças nos peers do tópico
71    async fn watch_peers(
72        &self,
73    ) -> std::result::Result<Pin<Box<dyn Stream<Item = events::Event> + Send>>, Self::Error> {
74        let (tx, rx) = mpsc::channel(32);
75        let ipfs_client = self.ipfs_client.clone();
76        let topic_name = self.topic_name.clone();
77        let token = self.cancellation_token.clone();
78
79        tokio::spawn(async move {
80            let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
81            let mut last_peers: Vec<PeerId> = Vec::new();
82
83            loop {
84                tokio::select! {
85                    _ = token.cancelled() => {
86                        debug!("Monitoramento de peers cancelado para tópico: {}", topic_name);
87                        break;
88                    }
89                    _ = interval.tick() => {
90                        match ipfs_client.pubsub_peers(&topic_name).await {
91                            Ok(current_peers) => {
92                                // Detecta mudanças nos peers
93                                for peer in &current_peers {
94                                    if !last_peers.contains(peer) {
95                                        // Cria evento de join como Arc<dyn Any>
96                                        let join_event: events::Event = Arc::new(crate::traits::EventPubSub::Join {
97                                            peer: *peer,
98                                            topic: topic_name.clone()
99                                        });
100                                        if tx.send(join_event).await.is_err() {
101                                            break;
102                                        }
103                                    }
104                                }
105
106                                for peer in &last_peers {
107                                    if !current_peers.contains(peer) {
108                                        // Cria evento de leave como Arc<dyn Any>
109                                        let leave_event: events::Event = Arc::new(crate::traits::EventPubSub::Leave {
110                                            peer: *peer,
111                                            topic: topic_name.clone()
112                                        });
113                                        if tx.send(leave_event).await.is_err() {
114                                            break;
115                                        }
116                                    }
117                                }
118
119                                last_peers = current_peers;
120                            }
121                            Err(e) => {
122                                error!("Erro ao obter peers do tópico '{}': {}", topic_name, e);
123                            }
124                        }
125                    }
126                }
127            }
128        });
129
130        let stream = ReceiverStream::new(rx);
131        Ok(Box::pin(stream))
132    }
133
134    /// Monitora mensagens recebidas no tópico
135    async fn watch_messages(
136        &self,
137    ) -> std::result::Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>, Self::Error>
138    {
139        let (tx, rx) = mpsc::channel(128);
140        let ipfs_client = self.ipfs_client.clone();
141        let topic_name = self.topic_name.clone();
142        let token = self.cancellation_token.clone();
143
144        // Inicia o monitoramento de mensagens
145        tokio::spawn(async move {
146            match ipfs_client.pubsub_subscribe(&topic_name).await {
147                Ok(mut stream) => {
148                    debug!("Stream de mensagens iniciado para tópico: {}", topic_name);
149
150                    loop {
151                        tokio::select! {
152                            _ = token.cancelled() => {
153                                debug!("Monitoramento de mensagens cancelado para tópico: {}", topic_name);
154                                break;
155                            }
156                            msg_result = stream.next() => {
157                                match msg_result {
158                                    Some(Ok(msg)) => {
159                                        let event = new_event_message(msg.data);
160                                        if tx.send(event).await.is_err() {
161                                            debug!("Receptor de mensagens desconectado para tópico: {}", topic_name);
162                                            break;
163                                        }
164                                    }
165                                    Some(Err(e)) => {
166                                        error!("Erro no stream de mensagens para tópico '{}': {}", topic_name, e);
167                                        break;
168                                    }
169                                    None => {
170                                        debug!("Stream de mensagens finalizado para tópico: {}", topic_name);
171                                        break;
172                                    }
173                                }
174                            }
175                        }
176                    }
177                }
178                Err(e) => {
179                    error!(
180                        "Erro ao criar stream de mensagens para tópico '{}': {}",
181                        topic_name, e
182                    );
183                }
184            }
185        });
186
187        let stream = ReceiverStream::new(rx);
188        Ok(Box::pin(stream))
189    }
190
191    /// Retorna o nome do tópico
192    fn topic(&self) -> &str {
193        &self.topic_name
194    }
195}
196
197impl RawPubSub {
198    /// Cria uma nova instância do RawPubSub usando ipfs_core_api
199    pub async fn new(ipfs_client: Arc<IpfsClient>) -> Result<Arc<Self>> {
200        let node_info = ipfs_client.id().await?;
201        let id = node_info.id;
202
203        info!("Inicializando RawPubSub com node ID: {}", id);
204
205        Ok(Arc::new(RawPubSub {
206            ipfs_client,
207            id,
208            topics: RwLock::new(HashMap::new()),
209        }))
210    }
211
212    /// Método auxiliar para criar com configuração customizada
213    pub async fn new_with_config(ipfs_client: Arc<IpfsClient>) -> Result<Arc<Self>> {
214        Self::new(ipfs_client).await
215    }
216
217    /// Obtém estatísticas dos tópicos ativos
218    pub async fn get_topic_stats(&self) -> HashMap<String, usize> {
219        let topics = self.topics.read().await;
220        let mut stats = HashMap::new();
221
222        for (topic_name, topic) in topics.iter() {
223            match topic.peers().await {
224                Ok(peers) => {
225                    stats.insert(topic_name.clone(), peers.len());
226                }
227                Err(e) => {
228                    warn!("Erro ao obter peers do tópico '{}': {}", topic_name, e);
229                    stats.insert(topic_name.clone(), 0);
230                }
231            }
232        }
233
234        stats
235    }
236
237    /// Lista todos os tópicos ativos
238    pub async fn list_topics(&self) -> Vec<String> {
239        let topics = self.topics.read().await;
240        topics.keys().cloned().collect()
241    }
242
243    /// Obtém o ID do peer local
244    pub fn local_peer_id(&self) -> PeerId {
245        self.id
246    }
247
248    /// Remove um tópico (unsubscribe)
249    pub async fn topic_unsubscribe(&self, topic_name: &str) -> Result<()> {
250        let mut topics = self.topics.write().await;
251
252        if let Some(topic) = topics.remove(topic_name) {
253            topic.cancellation_token.cancel();
254            info!("Tópico '{}' removido com sucesso", topic_name);
255        } else {
256            warn!("Tentativa de remover tópico inexistente: {}", topic_name);
257        }
258
259        Ok(())
260    }
261}
262
263/// Implementação da interface PubSub usando ipfs_core_api
264#[async_trait]
265impl PubSubInterface for RawPubSub {
266    type Error = GuardianError;
267
268    /// Subscreve a um tópico e retorna uma instância PubSubTopic
269    async fn topic_subscribe(
270        &mut self,
271        topic_name: &str,
272    ) -> std::result::Result<Arc<dyn PubSubTopic<Error = GuardianError>>, Self::Error> {
273        let mut topics = self.topics.write().await;
274
275        // Se o tópico já existe, retorna a instância existente
276        if let Some(existing_topic) = topics.get(topic_name) {
277            info!("Reutilizando tópico existente: {}", topic_name);
278            return Ok(existing_topic.clone() as Arc<dyn PubSubTopic<Error = GuardianError>>);
279        }
280
281        info!("Criando nova subscrição para tópico: {}", topic_name);
282
283        // Cria novo tópico
284        let new_topic = Arc::new(PsTopic {
285            topic_name: topic_name.to_string(),
286            ipfs_client: self.ipfs_client.clone(),
287            cancellation_token: CancellationToken::new(),
288        });
289
290        // Registra o tópico na subscrição do IPFS
291        let _subscription_stream = self
292            .ipfs_client
293            .pubsub_subscribe(topic_name)
294            .await
295            .map_err(|e| GuardianError::Ipfs(format!("Erro ao subscrever tópico IPFS: {}", e)))?;
296
297        // Adiciona ao nosso mapa local
298        topics.insert(topic_name.to_string(), new_topic.clone());
299
300        info!("Tópico '{}' criado e subscrito com sucesso", topic_name);
301
302        Ok(new_topic as Arc<dyn PubSubTopic<Error = GuardianError>>)
303    }
304
305    fn as_any(&self) -> &dyn std::any::Any {
306        self
307    }
308}