guardian_db/p2p/pubsub/
mod.rs

1use crate::error::GuardianError;
2use crate::events;
3use crate::ipfs_core_api::client::IpfsClient;
4use crate::p2p::events as event;
5use crate::traits::{EventPubSubMessage, PubSubInterface, PubSubTopic, TracerWrapper};
6use futures::Stream;
7use libp2p::PeerId;
8use opentelemetry::trace::noop::NoopTracer;
9use std::collections::{HashMap, HashSet};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, RwLock, mpsc};
14use tokio_stream::{StreamExt, wrappers::ReceiverStream};
15use tokio_util::sync::CancellationToken;
16use tracing::{Span, error, instrument, warn};
17
18pub mod direct_channel;
19pub mod one_on_one_channel;
20pub mod raw;
21
22pub const PROTOCOL: &str = "/guardian-db/direct-channel/1.2.0";
23#[allow(dead_code)]
24pub const DELIMITED_READ_MAX_SIZE: usize = 1024 * 1024 * 4; // 4mb
25#[allow(dead_code)]
26pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
27pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
28pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB
29
30/// A struct `CoreApiPubSub` gerencia a lógica de pubsub para o nó do GuardianDB.
31pub struct CoreApiPubSub {
32    pub api: IpfsClient,
33    pub span: Span,
34    pub id: PeerId,
35    pub poll_interval: Duration,
36    pub tracer: Arc<TracerWrapper>,
37    topics: Mutex<HashMap<String, Arc<PsTopic>>>,
38    /// Token para cancelamento gracioso de todas as operações
39    cancellation_token: CancellationToken,
40}
41
42#[async_trait::async_trait]
43impl PubSubInterface for CoreApiPubSub {
44    type Error = GuardianError;
45
46    #[instrument(level = "debug", skip(self))]
47    async fn topic_subscribe(
48        &mut self,
49        topic: &str,
50    ) -> Result<Arc<dyn crate::traits::PubSubTopic<Error = GuardianError>>, Self::Error> {
51        // Usa o método auxiliar para evitar problemas de ownership
52        let mut topics_guard = self.topics.lock().await;
53
54        // Se o tópico já estiver na nossa cache, retorna a instância existente.
55        if let Some(t) = topics_guard.get(topic) {
56            return Ok(t.clone() as Arc<dyn crate::traits::PubSubTopic<Error = GuardianError>>);
57        }
58
59        // Cria um novo tópico usando o método auxiliar
60        let new_topic = self.create_topic(topic).await;
61
62        // Insere o novo tópico no nosso cache.
63        topics_guard.insert(topic.to_string(), new_topic.clone());
64
65        Ok(new_topic as Arc<dyn crate::traits::PubSubTopic<Error = GuardianError>>)
66    }
67
68    fn as_any(&self) -> &dyn std::any::Any {
69        self
70    }
71}
72
73// A struct `PsTopic` será
74// armazenada em um `Arc` para compartilhamento seguro entre threads.
75/// A struct `PsTopic` contém o estado e a lógica para um único tópico do pubsub.
76pub struct PsTopic {
77    topic: String,
78    ps: Arc<CoreApiPubSub>,
79    members: RwLock<Vec<PeerId>>,
80    /// Token para cancelamento gracioso das operações deste tópico
81    cancellation_token: CancellationToken,
82}
83
84impl PsTopic {
85    // Adiciona validação e melhor tratamento de erros
86    #[instrument(level = "debug", skip(self, message))]
87    pub async fn publish(&self, message: &[u8]) -> crate::error::Result<()> {
88        // Verifica se o tópico não foi cancelado
89        if self.cancellation_token.is_cancelled() {
90            return Err(crate::error::GuardianError::Store(
91                "Cannot publish to cancelled topic".to_string(),
92            ));
93        }
94
95        // Validação básica da mensagem
96        if message.is_empty() {
97            return Err(crate::error::GuardianError::Store(
98                "Cannot publish empty message".to_string(),
99            ));
100        }
101
102        self.ps.api.pubsub_publish(&self.topic, message).await?;
103        Ok(())
104    }
105
106    #[instrument(level = "debug", skip(self))]
107    pub async fn peers(&self) -> crate::error::Result<Vec<PeerId>> {
108        // O bloqueio de leitura é assíncrono com tokio::sync::RwLock
109        let members_guard = self.members.read().await;
110
111        // .clone() cria uma nova cópia do Vec, liberando o bloqueio
112        // quando `members_guard` sai de escopo.
113        Ok(members_guard.clone())
114    }
115
116    // Otimiza operações de lock e melhor gestão de conjuntos
117    #[instrument(level = "debug", skip(self))]
118    pub async fn peers_diff(&self) -> crate::error::Result<(Vec<PeerId>, Vec<PeerId>)> {
119        // Usa a nova API do ipfs_core_api para obter peers atuais
120        let all_current_peers_vec = self.ps.api.pubsub_peers(&self.topic).await?;
121        let current_peers_set: HashSet<PeerId> = all_current_peers_vec.iter().cloned().collect();
122
123        // Obtém e atualiza membros em uma única operação para eficiência
124        let (joining, leaving) = {
125            let mut members_guard = self.members.write().await;
126            let old_members_set: HashSet<PeerId> = members_guard.iter().cloned().collect();
127
128            // Usa operações de conjunto para encontrar diferenças de forma eficiente e idiomática.
129            let joining: Vec<PeerId> = current_peers_set
130                .difference(&old_members_set)
131                .cloned()
132                .collect();
133            let leaving: Vec<PeerId> = old_members_set
134                .difference(&current_peers_set)
135                .cloned()
136                .collect();
137
138            // Atualiza a lista de membros internos no mesmo guard
139            *members_guard = all_current_peers_vec;
140
141            (joining, leaving)
142        };
143
144        Ok((joining, leaving))
145    }
146
147    // Retorna um receptor de canal (`Receiver`) que emitirá eventos de peers
148    // entrando ou saindo do tópico.
149    // Adiciona cancelamento adequado e melhor gestão de recursos
150    #[instrument(level = "debug", skip(self))]
151    pub async fn watch_peers_channel(
152        self: &Arc<Self>,
153    ) -> crate::error::Result<mpsc::Receiver<Arc<dyn std::any::Any + Send + Sync>>> {
154        let (tx, rx) = mpsc::channel(32);
155
156        // Clona o Arc para que a nova task possa ter sua própria referência.
157        let topic_clone = self.clone();
158        let cancellation_token = self.cancellation_token.clone();
159
160        tokio::spawn(async move {
161            loop {
162                // Verifica cancelamento antes de cada iteração
163                if cancellation_token.is_cancelled() {
164                    break;
165                }
166
167                // Chama a função que calcula a diferença de peers
168                let peers_diff_result = topic_clone.peers_diff().await;
169
170                let (joining, leaving) = match peers_diff_result {
171                    Ok((j, l)) => (j, l),
172                    Err(e) => {
173                        // Loga o erro e encerra a task.
174                        // Quando `tx` sai de escopo (é dropado), o lado do receptor
175                        // saberá que o canal foi fechado.
176                        error!("Erro ao verificar a diferença de peers: {:?}", e);
177                        return;
178                    }
179                };
180
181                for pid in joining {
182                    let event = event::new_event_peer_join(pid, topic_clone.topic().to_string());
183                    // Converte EventPubSub para o tipo esperado
184                    let event_any: Arc<dyn std::any::Any + Send + Sync> = Arc::new(event);
185                    if tx.send(event_any).await.is_err() {
186                        // O receptor foi fechado, então a task não precisa mais rodar.
187                        return;
188                    }
189                }
190
191                for pid in leaving {
192                    let event = event::new_event_peer_leave(pid, topic_clone.topic().to_string());
193                    // Converte EventPubSub para o tipo esperado
194                    let event_any: Arc<dyn std::any::Any + Send + Sync> = Arc::new(event);
195                    if tx.send(event_any).await.is_err() {
196                        return;
197                    }
198                }
199
200                // Usa select! para permitir cancelamento durante o sleep
201                tokio::select! {
202                    _ = tokio::time::sleep(topic_clone.ps.poll_interval) => {},
203                    _ = cancellation_token.cancelled() => {
204                        break;
205                    }
206                }
207            }
208        });
209
210        Ok(rx)
211    }
212
213    #[instrument(level = "debug", skip(self))]
214    pub async fn watch_messages(&self) -> crate::error::Result<mpsc::Receiver<EventPubSubMessage>> {
215        let mut subscription = self.ps.api.pubsub_subscribe(&self.topic).await?;
216
217        let (tx, rx) = mpsc::channel(128);
218        let self_peer_id = self.ps.id;
219        let cancellation_token = self.cancellation_token.clone();
220        let topic_name = self.topic.clone();
221
222        tokio::spawn(async move {
223            loop {
224                // Verifica cancelamento antes de cada iteração
225                if cancellation_token.is_cancelled() {
226                    break;
227                }
228
229                // Usa select! para permitir cancelamento durante a espera de mensagens
230                tokio::select! {
231                    msg_result = subscription.next() => {
232                        match msg_result {
233                            Some(Ok(msg)) => {
234                                // Ignora mensagens enviadas pelo próprio nó.
235                                if msg.from == self_peer_id {
236                                    continue;
237                                }
238
239                                let event = event::new_event_message(msg.data);
240                                if tx.send(event).await.is_err() {
241                                    // O receptor foi fechado, encerra a task.
242                                    break;
243                                }
244                            }
245                            Some(Err(e)) => {
246                                // Erro no stream, loga e continua tentando
247                                warn!("Error in pubsub stream for topic {}: {:?}", topic_name, e);
248                                continue;
249                            }
250                            None => {
251                                // Stream fechado, encerra a task
252                                break;
253                            }
254                        }
255                    }
256                    _ = cancellation_token.cancelled() => {
257                        // Cancelamento solicitado, encerra a task
258                        break;
259                    }
260                }
261            }
262        });
263
264        Ok(rx)
265    }
266
267    // Retorna uma referência ao nome do tópico, o que é mais eficiente
268    // do que clonar a String.
269    #[instrument(level = "debug", skip(self))]
270    pub fn topic(&self) -> &str {
271        &self.topic
272    }
273
274    /// Cancela todas as operações ativas do tópico
275    #[instrument(level = "debug", skip(self))]
276    pub fn cancel(&self) {
277        self.cancellation_token.cancel();
278    }
279
280    /// Verifica se o tópico foi cancelado
281    #[instrument(level = "debug", skip(self))]
282    pub fn is_cancelled(&self) -> bool {
283        self.cancellation_token.is_cancelled()
284    }
285
286    /// Limpa a lista de membros do tópico
287    #[instrument(level = "debug", skip(self))]
288    pub async fn clear_members(&self) {
289        let mut members_guard = self.members.write().await;
290        members_guard.clear();
291    }
292}
293
294#[async_trait::async_trait]
295impl PubSubTopic for PsTopic {
296    type Error = GuardianError;
297
298    #[instrument(level = "debug", skip(self, message))]
299    async fn publish(&self, message: Vec<u8>) -> crate::error::Result<()> {
300        PsTopic::publish(self, &message).await
301    }
302
303    #[instrument(level = "debug", skip(self))]
304    async fn peers(&self) -> crate::error::Result<Vec<PeerId>> {
305        self.peers().await
306    }
307
308    #[instrument(level = "debug", skip(self))]
309    async fn watch_peers(
310        &self,
311    ) -> crate::error::Result<Pin<Box<dyn Stream<Item = events::Event> + Send>>> {
312        // Remove criação desnecessária de instância local
313        let (tx, rx) = mpsc::channel(32);
314
315        // Clona dados necessários para a task
316        let topic_clone = Arc::new(PsTopic {
317            topic: self.topic.clone(),
318            ps: self.ps.clone(),
319            members: RwLock::new(self.members.read().await.clone()),
320            cancellation_token: self.cancellation_token.clone(),
321        });
322
323        tokio::spawn(async move {
324            loop {
325                // Verifica cancelamento antes de cada iteração
326                if topic_clone.cancellation_token.is_cancelled() {
327                    break;
328                }
329
330                // Chama a função que calcula a diferença de peers
331                let peers_diff_result = topic_clone.peers_diff().await;
332
333                let (joining, leaving) = match peers_diff_result {
334                    Ok((j, l)) => (j, l),
335                    Err(e) => {
336                        // Loga o erro e encerra a task.
337                        error!("Erro ao verificar a diferença de peers: {:?}", e);
338                        return;
339                    }
340                };
341
342                for pid in joining {
343                    let event = event::new_event_peer_join(pid, topic_clone.topic().to_string());
344                    // Converte EventPubSub para o tipo esperado como events::Event
345                    let event_any: events::Event = Arc::new(event);
346                    if tx.send(event_any).await.is_err() {
347                        // O receptor foi fechado, então a task não precisa mais rodar.
348                        return;
349                    }
350                }
351
352                for pid in leaving {
353                    let event = event::new_event_peer_leave(pid, topic_clone.topic().to_string());
354                    // Converte EventPubSub para o tipo esperado como events::Event
355                    let event_any: events::Event = Arc::new(event);
356                    if tx.send(event_any).await.is_err() {
357                        return;
358                    }
359                }
360
361                // Usa select! para permitir cancelamento durante o sleep
362                tokio::select! {
363                    _ = tokio::time::sleep(topic_clone.ps.poll_interval) => {},
364                    _ = topic_clone.cancellation_token.cancelled() => {
365                        break;
366                    }
367                }
368            }
369        });
370
371        let stream = ReceiverStream::new(rx);
372        Ok(Box::pin(stream))
373    }
374
375    #[instrument(level = "debug", skip(self))]
376    async fn watch_messages(
377        &self,
378    ) -> crate::error::Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>> {
379        let receiver = self.watch_messages().await?;
380        let stream = ReceiverStream::new(receiver);
381        Ok(Box::pin(stream))
382    }
383
384    #[instrument(level = "debug", skip(self))]
385    fn topic(&self) -> &str {
386        &self.topic
387    }
388}
389
390impl CoreApiPubSub {
391    /// Método auxiliar para criar tópicos sem problemas de ownership
392    #[instrument(level = "debug", skip(self))]
393    async fn create_topic(&self, topic: &str) -> Arc<PsTopic> {
394        Arc::new(PsTopic {
395            topic: topic.to_string(),
396            ps: unsafe {
397                // SAFETY: Criamos um Arc temporário que não será armazenado
398                Arc::from_raw(self as *const Self)
399            },
400            members: Default::default(),
401            cancellation_token: self.cancellation_token.child_token(),
402        })
403    }
404
405    /// Assina um tópico de pubsub, retornando uma instância de `PubSubTopic`.
406    /// Se o tópico já existe, retorna a instância existente.
407    #[instrument(level = "debug", skip(self))]
408    pub async fn topic_subscribe_internal(
409        self: &Arc<Self>,
410        topic: &str,
411    ) -> crate::error::Result<Arc<PsTopic>> {
412        let mut topics_guard = self.topics.lock().await;
413
414        // Se o tópico já estiver na nossa cache, retorna a instância existente.
415        if let Some(t) = topics_guard.get(topic) {
416            return Ok(t.clone());
417        }
418
419        // Se não, cria uma nova instância do tópico.
420        let new_topic = Arc::new(PsTopic {
421            topic: topic.to_string(),
422            ps: self.clone(), // Clona a referência Arc para o CoreApiPubSub
423            members: Default::default(),
424            cancellation_token: self.cancellation_token.child_token(),
425        });
426
427        // Insere o novo tópico no nosso cache.
428        topics_guard.insert(topic.to_string(), new_topic.clone());
429
430        Ok(new_topic)
431    }
432
433    /// Cria uma nova instância de `CoreApiPubSub`.
434    /// Os parâmetros `span` e `tracer` podem ser opcionais.
435    #[instrument(level = "debug", skip(api, span, tracer))]
436    pub fn new(
437        api: IpfsClient,
438        id: PeerId,
439        poll_interval: Duration,
440        span: Option<Span>,
441        tracer: Option<Arc<TracerWrapper>>,
442    ) -> Arc<Self> {
443        // Criar tracer padrão caso não seja fornecido
444        let default_tracer = Arc::new(TracerWrapper::Noop(NoopTracer::new()));
445
446        Arc::new(Self {
447            topics: Mutex::new(HashMap::new()),
448            api,
449            id,
450            poll_interval,
451            span: span.unwrap_or_else(tracing::Span::current),
452            tracer: tracer.unwrap_or(default_tracer),
453            cancellation_token: CancellationToken::new(),
454        })
455    }
456
457    /// Método para cancelar todas as operações do PubSub
458    pub fn cancel(&self) {
459        self.cancellation_token.cancel();
460    }
461
462    /// Verifica se o PubSub foi cancelado
463    pub fn is_cancelled(&self) -> bool {
464        self.cancellation_token.is_cancelled()
465    }
466
467    /// Remove um tópico específico do cache
468    #[instrument(level = "debug", skip(self))]
469    pub async fn remove_topic(&self, topic_name: &str) -> bool {
470        let mut topics_guard = self.topics.lock().await;
471        topics_guard.remove(topic_name).is_some()
472    }
473
474    /// Remove todos os tópicos cancelados do cache
475    #[instrument(level = "debug", skip(self))]
476    pub async fn cleanup_cancelled_topics(&self) -> usize {
477        let mut topics_guard = self.topics.lock().await;
478        let mut cancelled_topics = Vec::new();
479
480        // Identifica tópicos cancelados
481        for (name, topic) in topics_guard.iter() {
482            if topic.is_cancelled() {
483                cancelled_topics.push(name.clone());
484            }
485        }
486
487        // Remove tópicos cancelados
488        for topic_name in &cancelled_topics {
489            topics_guard.remove(topic_name);
490        }
491
492        cancelled_topics.len()
493    }
494
495    /// Retorna estatísticas dos tópicos ativos
496    #[instrument(level = "debug", skip(self))]
497    pub async fn topic_stats(&self) -> (usize, usize) {
498        let topics_guard = self.topics.lock().await;
499        let total_topics = topics_guard.len();
500        let mut active_topics = 0;
501
502        for topic in topics_guard.values() {
503            if !topic.is_cancelled() {
504                active_topics += 1;
505            }
506        }
507
508        (total_topics, active_topics)
509    }
510}