guardian_db/p2p/pubsub/
one_on_one_channel.rs

1use crate::error::{GuardianError, Result};
2use crate::ipfs_core_api::{IpfsClient, PubsubStream};
3use crate::p2p::events::new_event_payload;
4use crate::traits::{
5    DirectChannel, DirectChannelEmitter, DirectChannelFactory, DirectChannelOptions,
6};
7use async_trait::async_trait;
8use futures::stream::StreamExt;
9use libp2p::PeerId;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tokio_util::sync::CancellationToken;
17use tracing::{Span, debug, error, info, instrument, warn};
18
19// Constantes do protocolo
20const PROTOCOL: &str = "ipfs-pubsub-direct-channel/v1";
21
22// Implemetação da trait `DirectChannel` para a struct `Channels`.
23#[async_trait]
24impl DirectChannel for Channels {
25    type Error = GuardianError;
26
27    /// Inicia a conexão e o monitoramento de um peer específico. Se uma conexão
28    /// com o peer já não existir, cria um novo tópico no pubsub, inscreve-se nele,
29    /// e inicia uma tarefa em segundo plano (`monitor_topic`) para escutar por mensagens.
30    #[instrument(level = "debug", skip(self))]
31    async fn connect(&mut self, target: PeerId) -> std::result::Result<(), Self::Error> {
32        let id = self.get_channel_id(&target);
33        let mut subs = self.subs.write().await;
34
35        // Só executa a lógica se não estivermos já inscritos no canal com este peer.
36        if let std::collections::hash_map::Entry::Vacant(e) = subs.entry(target) {
37            info!(peer = %target, topic = %id, "Iniciando conexão e inscrição no canal.");
38
39            // Cria um "token filho" para esta conexão específica.
40            // Quando o token principal em `self.token` for cancelado (no `close`),
41            // este token filho será cancelado automaticamente em cascata.
42            let child_token = self.token.child_token();
43
44            // Inscreve-se no tópico do pubsub através da nossa API IPFS 100% Rust.
45            let stream = self.ipfs_client.pubsub_subscribe(&id).await?;
46
47            // Armazena o token da nova subscrição no mapa.
48            e.insert(child_token.clone());
49
50            // Clona as referências necessárias para a nova tarefa assíncrona.
51            let self_clone = self.clone();
52
53            // Inicia a tarefa em segundo plano que irá monitorar o tópico.
54            tokio::spawn(async move {
55                // Passa o token filho para a tarefa de monitoramento.
56                self_clone.monitor_topic(stream, target, child_token).await;
57
58                // Após o término do monitoramento (seja por cancelamento ou fim do stream),
59                // remove o peer do mapa de subscrições ativas para limpeza.
60                let mut subs = self_clone.subs.write().await;
61                subs.remove(&target);
62                debug!("Monitor para {} encerrado e removido.", target);
63            });
64        }
65
66        // Libera o lock de escrita antes de continuar.
67        drop(subs);
68
69        // Tenta se conectar diretamente ao peer na camada de swarm do IPFS.
70        // A falha aqui é apenas um aviso, pois o pubsub pode funcionar mesmo sem uma conexão direta.
71        if let Err(e) = self.ipfs_client.swarm_connect(&target).await {
72            warn!(peer = %target, "Não foi possível conectar diretamente ao peer (aviso): {}", e);
73        }
74
75        // Aguarda até que o peer seja visível no tópico do pubsub.
76        self.wait_for_peers(target, &id).await
77    }
78
79    /// Publica uma mensagem (slice de bytes) no canal de comunicação
80    /// estabelecido com o peer `p`.
81    #[instrument(level = "debug", skip(self, head))]
82    async fn send(&mut self, p: PeerId, head: Vec<u8>) -> std::result::Result<(), Self::Error> {
83        // Determina o ID do canal. Se já tivermos uma subscrição ativa,
84        // reutiliza o ID do canal armazenado. Caso contrário, calcula-o.
85        let id = {
86            let subs = self.subs.read().await;
87            if subs.contains_key(&p) {
88                // Lógica para obter o ID do canal associado ao token, se necessário.
89                // Neste caso, é mais simples recalcular.
90                self.get_channel_id(&p)
91            } else {
92                self.get_channel_id(&p)
93            }
94        };
95
96        // Publica os dados no tópico do pubsub via nossa API IPFS 100% Rust.
97        self.ipfs_client.pubsub_publish(&id, &head).await?;
98
99        Ok(())
100    }
101
102    /// Encerra todas as conexões e tarefas de monitoramento ativas,
103    /// limpando todos os recursos associados ao `Channels`.
104    #[instrument(level = "debug", skip(self))]
105    async fn close(&mut self) -> std::result::Result<(), Self::Error> {
106        info!("Encerrando todos os canais e tarefas de monitoramento...");
107
108        // Com um único chamado, cancelamos o token principal.
109        // Esta ação se propaga e cancela TODOS os tokens filhos que foram
110        // passados para as tarefas `monitor_topic`, sinalizando que elas
111        // devem parar de forma limpa e cooperativa.
112        self.token.cancel();
113
114        // Limpa o mapa de subscrições.
115        self.subs.write().await.clear();
116
117        // Fecha o emissor de eventos.
118        self.emitter.close().await?;
119
120        Ok(())
121    }
122
123    /// Versão de close() que funciona com referência compartilhada (&self).
124    /// Permite fechar o canal quando usado dentro de Arc<>.
125    #[instrument(level = "debug", skip(self))]
126    async fn close_shared(&self) -> std::result::Result<(), Self::Error> {
127        info!("Encerrando todos os canais (referência compartilhada)...");
128
129        // Cancela o token principal para parar todas as tarefas de monitoramento
130        self.token.cancel();
131
132        // Limpa o mapa de subscrições
133        self.subs.write().await.clear();
134
135        // Fecha o emissor de eventos
136        self.emitter.close().await?;
137
138        Ok(())
139    }
140
141    fn as_any(&self) -> &dyn std::any::Any {
142        self
143    }
144}
145
146// A struct principal que gerencia os canais
147#[derive(Clone)]
148pub struct Channels {
149    subs: Arc<RwLock<HashMap<PeerId, CancellationToken>>>,
150    self_id: PeerId,
151    emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError> + Send + Sync>,
152    ipfs_client: Arc<IpfsClient>,
153    span: Span,
154    // O token principal que controla o tempo de vida de toda a instância de Channels
155    token: CancellationToken,
156}
157
158impl Channels {
159    /// Retorna uma referência ao span de tracing para instrumentação
160    pub fn span(&self) -> &Span {
161        &self.span
162    }
163
164    #[instrument(level = "debug", skip(self))]
165    pub async fn connect(&self, target: PeerId) -> Result<()> {
166        let _entered = self.span.enter();
167        let id = self.get_channel_id(&target);
168        let mut subs = self.subs.write().await;
169
170        if let std::collections::hash_map::Entry::Vacant(e) = subs.entry(target) {
171            debug!(topic = %id, "inscrevendo-se no tópico");
172
173            // A chamada é realizada via API IPFS.
174            // A sua gestão será feita dentro de `monitor_topic`.
175            let stream = self.ipfs_client.pubsub_subscribe(&id).await?;
176
177            let cancel_token = CancellationToken::new();
178
179            e.insert(cancel_token.clone());
180
181            // Spawna a task para monitorar o tópico
182            let self_clone = self.clone();
183            tokio::spawn(async move {
184                self_clone.monitor_topic(stream, target, cancel_token).await;
185
186                // Quando monitor_topic termina, remove o peer do cache
187                let mut subs = self_clone.subs.write().await;
188                subs.remove(&target);
189            });
190        }
191        // Libera o lock de escrita antes das chamadas de rede
192        drop(subs);
193
194        if let Err(e) = self.ipfs_client.swarm_connect(&target).await {
195            warn!(peer = %target, "não foi possível conectar ao peer remoto: {}", e);
196        }
197
198        self.wait_for_peers(target, &id).await
199    }
200
201    #[instrument(level = "debug", skip(self, head))]
202    pub async fn send(&self, p: PeerId, head: &[u8]) -> Result<()> {
203        let _entered = self.span.enter();
204        let id = {
205            let _subs = self.subs.read().await;
206            self.get_channel_id(&p)
207        };
208
209        self.ipfs_client
210            .pubsub_publish(&id, head)
211            .await
212            .map_err(|e| {
213                GuardianError::Other(format!(
214                    "falha ao publicar dados no pubsub via nossa API IPFS: {}",
215                    e
216                ))
217            })?;
218
219        Ok(())
220    }
221
222    #[instrument(level = "debug", skip(self))]
223    async fn wait_for_peers(&self, other_peer: PeerId, channel_id: &str) -> Result<()> {
224        let mut interval = tokio::time::interval(Duration::from_secs(1));
225        // Adiciona um timeout para não ficar em loop infinito
226        let timeout = tokio::time::sleep(Duration::from_secs(30));
227        tokio::pin!(timeout);
228
229        loop {
230            tokio::select! {
231                _ = &mut timeout => {
232                     return Err(GuardianError::Network(format!("timeout esperando pelo peer {} no canal {}", other_peer, channel_id)));
233                }
234                _ = interval.tick() => {
235                    match self.ipfs_client.pubsub_peers(channel_id).await {
236                        Ok(peers) => {
237                            if peers.iter().any(|p| p == &other_peer) {
238                                debug!("peer {} encontrado no pubsub", other_peer);
239                                return Ok(());
240                            }
241                            debug!("Peer não encontrado, tentando novamente...");
242                        }
243                        Err(e) => {
244                            error!("falha ao obter peers do pubsub: {}", e);
245                            // Retorna o erro em caso de falha na chamada da nossa API IPFS
246                            return Err(e);
247                        }
248                    }
249                }
250            }
251        }
252    }
253
254    // Função auxiliar para geração de identificadores únicos de canal.
255    // Implementa lógica pura e determinística para criar IDs de canais one-on-one.
256    // Garante que o mesmo ID seja gerado independente da ordem dos peers.
257    #[instrument(level = "debug", skip(self))]
258    fn get_channel_id(&self, p: &PeerId) -> String {
259        let mut channel_id_peers = [self.self_id.to_string(), p.to_string()];
260        channel_id_peers.sort();
261        format!("/{}/{}", PROTOCOL, channel_id_peers.join("/"))
262    }
263
264    #[instrument(level = "debug", skip(self, stream, token))]
265    async fn monitor_topic(
266        &self,
267        mut stream: PubsubStream,
268        p: PeerId,
269        token: CancellationToken, // Recebe o token (filho)
270    ) {
271        loop {
272            tokio::select! {
273                // Aguarda pelo sinal de cancelamento no token.
274                // `biased;` pode ser usado para sempre checar o cancelamento primeiro.
275                biased;
276                _ = token.cancelled() => {
277                    debug!(remote = %p, "fechando monitor do tópico por cancelamento");
278                    break;
279                },
280
281                // Processa a próxima mensagem do stream
282                maybe_msg = stream.next() => {
283                    match maybe_msg {
284                        Some(Ok(msg)) => {
285                            // Garante que a mensagem venha de um peer diferente do self
286                            if msg.from == self.self_id {
287                                continue;
288                            }
289
290                            // Emite o payload do evento - mantém dados como Vec<u8>
291                            let event = new_event_payload(msg.data, p);
292                            if let Err(e) = self.emitter.emit(event).await {
293                                warn!("não foi possível emitir payload do evento: {}", e);
294                            }
295                        },
296                        Some(Err(e)) => {
297                            error!("erro no stream do pubsub: {}", e);
298                            break;
299                        },
300                        // Stream finalizado
301                        None => {
302                             debug!(remote = %p, "stream do pubsub finalizado");
303                             break;
304                        }
305                    }
306                }
307            }
308        }
309    }
310}
311
312#[instrument(level = "debug", skip(ipfs_client))]
313pub async fn new_channel_factory(ipfs_client: Arc<IpfsClient>) -> Result<DirectChannelFactory> {
314    let self_id = ipfs_client
315        .id()
316        .await
317        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
318        .id;
319
320    info!("ID do nó local: {}", self_id);
321
322    let factory = move |emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
323                        _opts: Option<DirectChannelOptions>| {
324        let ipfs_client = ipfs_client.clone();
325        let self_id = self_id;
326
327        Box::pin(async move {
328            // Criar um span para o canal direto
329            let span = tracing::info_span!("direct_channel", self_id = %self_id);
330
331            let ch = Arc::new(Channels {
332                emitter,
333                subs: Arc::new(RwLock::new(HashMap::new())),
334                self_id,
335                ipfs_client,
336                span,
337                token: CancellationToken::new(),
338            });
339
340            Ok(ch as Arc<dyn DirectChannel<Error = GuardianError>>)
341        })
342            as Pin<
343                Box<
344                    dyn Future<
345                            Output = std::result::Result<
346                                Arc<dyn DirectChannel<Error = GuardianError>>,
347                                Box<dyn std::error::Error + Send + Sync>,
348                            >,
349                        > + Send,
350                >,
351            >
352    };
353
354    Ok(Arc::new(factory))
355}