1use crate::error::{GuardianError, Result};
2use crate::ipfs_core_api::{IpfsClient, PubsubStream};
3use crate::p2p::events::new_event_payload;
4use crate::traits::{
5 DirectChannel, DirectChannelEmitter, DirectChannelFactory, DirectChannelOptions,
6};
7use async_trait::async_trait;
8use futures::stream::StreamExt;
9use libp2p::PeerId;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tokio_util::sync::CancellationToken;
17use tracing::{Span, debug, error, info, instrument, warn};
18
19const PROTOCOL: &str = "ipfs-pubsub-direct-channel/v1";
21
22#[async_trait]
24impl DirectChannel for Channels {
25 type Error = GuardianError;
26
27 #[instrument(level = "debug", skip(self))]
31 async fn connect(&mut self, target: PeerId) -> std::result::Result<(), Self::Error> {
32 let id = self.get_channel_id(&target);
33 let mut subs = self.subs.write().await;
34
35 if let std::collections::hash_map::Entry::Vacant(e) = subs.entry(target) {
37 info!(peer = %target, topic = %id, "Iniciando conexão e inscrição no canal.");
38
39 let child_token = self.token.child_token();
43
44 let stream = self.ipfs_client.pubsub_subscribe(&id).await?;
46
47 e.insert(child_token.clone());
49
50 let self_clone = self.clone();
52
53 tokio::spawn(async move {
55 self_clone.monitor_topic(stream, target, child_token).await;
57
58 let mut subs = self_clone.subs.write().await;
61 subs.remove(&target);
62 debug!("Monitor para {} encerrado e removido.", target);
63 });
64 }
65
66 drop(subs);
68
69 if let Err(e) = self.ipfs_client.swarm_connect(&target).await {
72 warn!(peer = %target, "Não foi possível conectar diretamente ao peer (aviso): {}", e);
73 }
74
75 self.wait_for_peers(target, &id).await
77 }
78
79 #[instrument(level = "debug", skip(self, head))]
82 async fn send(&mut self, p: PeerId, head: Vec<u8>) -> std::result::Result<(), Self::Error> {
83 let id = {
86 let subs = self.subs.read().await;
87 if subs.contains_key(&p) {
88 self.get_channel_id(&p)
91 } else {
92 self.get_channel_id(&p)
93 }
94 };
95
96 self.ipfs_client.pubsub_publish(&id, &head).await?;
98
99 Ok(())
100 }
101
102 #[instrument(level = "debug", skip(self))]
105 async fn close(&mut self) -> std::result::Result<(), Self::Error> {
106 info!("Encerrando todos os canais e tarefas de monitoramento...");
107
108 self.token.cancel();
113
114 self.subs.write().await.clear();
116
117 self.emitter.close().await?;
119
120 Ok(())
121 }
122
123 #[instrument(level = "debug", skip(self))]
126 async fn close_shared(&self) -> std::result::Result<(), Self::Error> {
127 info!("Encerrando todos os canais (referência compartilhada)...");
128
129 self.token.cancel();
131
132 self.subs.write().await.clear();
134
135 self.emitter.close().await?;
137
138 Ok(())
139 }
140
141 fn as_any(&self) -> &dyn std::any::Any {
142 self
143 }
144}
145
146#[derive(Clone)]
148pub struct Channels {
149 subs: Arc<RwLock<HashMap<PeerId, CancellationToken>>>,
150 self_id: PeerId,
151 emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError> + Send + Sync>,
152 ipfs_client: Arc<IpfsClient>,
153 span: Span,
154 token: CancellationToken,
156}
157
158impl Channels {
159 pub fn span(&self) -> &Span {
161 &self.span
162 }
163
164 #[instrument(level = "debug", skip(self))]
165 pub async fn connect(&self, target: PeerId) -> Result<()> {
166 let _entered = self.span.enter();
167 let id = self.get_channel_id(&target);
168 let mut subs = self.subs.write().await;
169
170 if let std::collections::hash_map::Entry::Vacant(e) = subs.entry(target) {
171 debug!(topic = %id, "inscrevendo-se no tópico");
172
173 let stream = self.ipfs_client.pubsub_subscribe(&id).await?;
176
177 let cancel_token = CancellationToken::new();
178
179 e.insert(cancel_token.clone());
180
181 let self_clone = self.clone();
183 tokio::spawn(async move {
184 self_clone.monitor_topic(stream, target, cancel_token).await;
185
186 let mut subs = self_clone.subs.write().await;
188 subs.remove(&target);
189 });
190 }
191 drop(subs);
193
194 if let Err(e) = self.ipfs_client.swarm_connect(&target).await {
195 warn!(peer = %target, "não foi possível conectar ao peer remoto: {}", e);
196 }
197
198 self.wait_for_peers(target, &id).await
199 }
200
201 #[instrument(level = "debug", skip(self, head))]
202 pub async fn send(&self, p: PeerId, head: &[u8]) -> Result<()> {
203 let _entered = self.span.enter();
204 let id = {
205 let _subs = self.subs.read().await;
206 self.get_channel_id(&p)
207 };
208
209 self.ipfs_client
210 .pubsub_publish(&id, head)
211 .await
212 .map_err(|e| {
213 GuardianError::Other(format!(
214 "falha ao publicar dados no pubsub via nossa API IPFS: {}",
215 e
216 ))
217 })?;
218
219 Ok(())
220 }
221
222 #[instrument(level = "debug", skip(self))]
223 async fn wait_for_peers(&self, other_peer: PeerId, channel_id: &str) -> Result<()> {
224 let mut interval = tokio::time::interval(Duration::from_secs(1));
225 let timeout = tokio::time::sleep(Duration::from_secs(30));
227 tokio::pin!(timeout);
228
229 loop {
230 tokio::select! {
231 _ = &mut timeout => {
232 return Err(GuardianError::Network(format!("timeout esperando pelo peer {} no canal {}", other_peer, channel_id)));
233 }
234 _ = interval.tick() => {
235 match self.ipfs_client.pubsub_peers(channel_id).await {
236 Ok(peers) => {
237 if peers.iter().any(|p| p == &other_peer) {
238 debug!("peer {} encontrado no pubsub", other_peer);
239 return Ok(());
240 }
241 debug!("Peer não encontrado, tentando novamente...");
242 }
243 Err(e) => {
244 error!("falha ao obter peers do pubsub: {}", e);
245 return Err(e);
247 }
248 }
249 }
250 }
251 }
252 }
253
254 #[instrument(level = "debug", skip(self))]
258 fn get_channel_id(&self, p: &PeerId) -> String {
259 let mut channel_id_peers = [self.self_id.to_string(), p.to_string()];
260 channel_id_peers.sort();
261 format!("/{}/{}", PROTOCOL, channel_id_peers.join("/"))
262 }
263
264 #[instrument(level = "debug", skip(self, stream, token))]
265 async fn monitor_topic(
266 &self,
267 mut stream: PubsubStream,
268 p: PeerId,
269 token: CancellationToken, ) {
271 loop {
272 tokio::select! {
273 biased;
276 _ = token.cancelled() => {
277 debug!(remote = %p, "fechando monitor do tópico por cancelamento");
278 break;
279 },
280
281 maybe_msg = stream.next() => {
283 match maybe_msg {
284 Some(Ok(msg)) => {
285 if msg.from == self.self_id {
287 continue;
288 }
289
290 let event = new_event_payload(msg.data, p);
292 if let Err(e) = self.emitter.emit(event).await {
293 warn!("não foi possível emitir payload do evento: {}", e);
294 }
295 },
296 Some(Err(e)) => {
297 error!("erro no stream do pubsub: {}", e);
298 break;
299 },
300 None => {
302 debug!(remote = %p, "stream do pubsub finalizado");
303 break;
304 }
305 }
306 }
307 }
308 }
309 }
310}
311
312#[instrument(level = "debug", skip(ipfs_client))]
313pub async fn new_channel_factory(ipfs_client: Arc<IpfsClient>) -> Result<DirectChannelFactory> {
314 let self_id = ipfs_client
315 .id()
316 .await
317 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
318 .id;
319
320 info!("ID do nó local: {}", self_id);
321
322 let factory = move |emitter: Arc<dyn DirectChannelEmitter<Error = GuardianError>>,
323 _opts: Option<DirectChannelOptions>| {
324 let ipfs_client = ipfs_client.clone();
325 let self_id = self_id;
326
327 Box::pin(async move {
328 let span = tracing::info_span!("direct_channel", self_id = %self_id);
330
331 let ch = Arc::new(Channels {
332 emitter,
333 subs: Arc::new(RwLock::new(HashMap::new())),
334 self_id,
335 ipfs_client,
336 span,
337 token: CancellationToken::new(),
338 });
339
340 Ok(ch as Arc<dyn DirectChannel<Error = GuardianError>>)
341 })
342 as Pin<
343 Box<
344 dyn Future<
345 Output = std::result::Result<
346 Arc<dyn DirectChannel<Error = GuardianError>>,
347 Box<dyn std::error::Error + Send + Sync>,
348 >,
349 > + Send,
350 >,
351 >
352 };
353
354 Ok(Arc::new(factory))
355}