1use crate::error::GuardianError;
2use crate::events;
3use crate::ipfs_core_api::client::IpfsClient;
4use crate::p2p::events as event;
5use crate::traits::{EventPubSubMessage, PubSubInterface, PubSubTopic, TracerWrapper};
6use futures::Stream;
7use libp2p::PeerId;
8use opentelemetry::trace::noop::NoopTracer;
9use std::collections::{HashMap, HashSet};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, RwLock, mpsc};
14use tokio_stream::{StreamExt, wrappers::ReceiverStream};
15use tokio_util::sync::CancellationToken;
16use tracing::{Span, error, instrument, warn};
17
18pub mod direct_channel;
19pub mod one_on_one_channel;
20pub mod raw;
21
22pub const PROTOCOL: &str = "/guardian-db/direct-channel/1.2.0";
23#[allow(dead_code)]
24pub const DELIMITED_READ_MAX_SIZE: usize = 1024 * 1024 * 4; #[allow(dead_code)]
26pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
27pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
28pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024; pub struct CoreApiPubSub {
32 pub api: IpfsClient,
33 pub span: Span,
34 pub id: PeerId,
35 pub poll_interval: Duration,
36 pub tracer: Arc<TracerWrapper>,
37 topics: Mutex<HashMap<String, Arc<PsTopic>>>,
38 cancellation_token: CancellationToken,
40}
41
42#[async_trait::async_trait]
43impl PubSubInterface for CoreApiPubSub {
44 type Error = GuardianError;
45
46 #[instrument(level = "debug", skip(self))]
47 async fn topic_subscribe(
48 &mut self,
49 topic: &str,
50 ) -> Result<Arc<dyn crate::traits::PubSubTopic<Error = GuardianError>>, Self::Error> {
51 let mut topics_guard = self.topics.lock().await;
53
54 if let Some(t) = topics_guard.get(topic) {
56 return Ok(t.clone() as Arc<dyn crate::traits::PubSubTopic<Error = GuardianError>>);
57 }
58
59 let new_topic = self.create_topic(topic).await;
61
62 topics_guard.insert(topic.to_string(), new_topic.clone());
64
65 Ok(new_topic as Arc<dyn crate::traits::PubSubTopic<Error = GuardianError>>)
66 }
67
68 fn as_any(&self) -> &dyn std::any::Any {
69 self
70 }
71}
72
73pub struct PsTopic {
77 topic: String,
78 ps: Arc<CoreApiPubSub>,
79 members: RwLock<Vec<PeerId>>,
80 cancellation_token: CancellationToken,
82}
83
84impl PsTopic {
85 #[instrument(level = "debug", skip(self, message))]
87 pub async fn publish(&self, message: &[u8]) -> crate::error::Result<()> {
88 if self.cancellation_token.is_cancelled() {
90 return Err(crate::error::GuardianError::Store(
91 "Cannot publish to cancelled topic".to_string(),
92 ));
93 }
94
95 if message.is_empty() {
97 return Err(crate::error::GuardianError::Store(
98 "Cannot publish empty message".to_string(),
99 ));
100 }
101
102 self.ps.api.pubsub_publish(&self.topic, message).await?;
103 Ok(())
104 }
105
106 #[instrument(level = "debug", skip(self))]
107 pub async fn peers(&self) -> crate::error::Result<Vec<PeerId>> {
108 let members_guard = self.members.read().await;
110
111 Ok(members_guard.clone())
114 }
115
116 #[instrument(level = "debug", skip(self))]
118 pub async fn peers_diff(&self) -> crate::error::Result<(Vec<PeerId>, Vec<PeerId>)> {
119 let all_current_peers_vec = self.ps.api.pubsub_peers(&self.topic).await?;
121 let current_peers_set: HashSet<PeerId> = all_current_peers_vec.iter().cloned().collect();
122
123 let (joining, leaving) = {
125 let mut members_guard = self.members.write().await;
126 let old_members_set: HashSet<PeerId> = members_guard.iter().cloned().collect();
127
128 let joining: Vec<PeerId> = current_peers_set
130 .difference(&old_members_set)
131 .cloned()
132 .collect();
133 let leaving: Vec<PeerId> = old_members_set
134 .difference(¤t_peers_set)
135 .cloned()
136 .collect();
137
138 *members_guard = all_current_peers_vec;
140
141 (joining, leaving)
142 };
143
144 Ok((joining, leaving))
145 }
146
147 #[instrument(level = "debug", skip(self))]
151 pub async fn watch_peers_channel(
152 self: &Arc<Self>,
153 ) -> crate::error::Result<mpsc::Receiver<Arc<dyn std::any::Any + Send + Sync>>> {
154 let (tx, rx) = mpsc::channel(32);
155
156 let topic_clone = self.clone();
158 let cancellation_token = self.cancellation_token.clone();
159
160 tokio::spawn(async move {
161 loop {
162 if cancellation_token.is_cancelled() {
164 break;
165 }
166
167 let peers_diff_result = topic_clone.peers_diff().await;
169
170 let (joining, leaving) = match peers_diff_result {
171 Ok((j, l)) => (j, l),
172 Err(e) => {
173 error!("Erro ao verificar a diferença de peers: {:?}", e);
177 return;
178 }
179 };
180
181 for pid in joining {
182 let event = event::new_event_peer_join(pid, topic_clone.topic().to_string());
183 let event_any: Arc<dyn std::any::Any + Send + Sync> = Arc::new(event);
185 if tx.send(event_any).await.is_err() {
186 return;
188 }
189 }
190
191 for pid in leaving {
192 let event = event::new_event_peer_leave(pid, topic_clone.topic().to_string());
193 let event_any: Arc<dyn std::any::Any + Send + Sync> = Arc::new(event);
195 if tx.send(event_any).await.is_err() {
196 return;
197 }
198 }
199
200 tokio::select! {
202 _ = tokio::time::sleep(topic_clone.ps.poll_interval) => {},
203 _ = cancellation_token.cancelled() => {
204 break;
205 }
206 }
207 }
208 });
209
210 Ok(rx)
211 }
212
213 #[instrument(level = "debug", skip(self))]
214 pub async fn watch_messages(&self) -> crate::error::Result<mpsc::Receiver<EventPubSubMessage>> {
215 let mut subscription = self.ps.api.pubsub_subscribe(&self.topic).await?;
216
217 let (tx, rx) = mpsc::channel(128);
218 let self_peer_id = self.ps.id;
219 let cancellation_token = self.cancellation_token.clone();
220 let topic_name = self.topic.clone();
221
222 tokio::spawn(async move {
223 loop {
224 if cancellation_token.is_cancelled() {
226 break;
227 }
228
229 tokio::select! {
231 msg_result = subscription.next() => {
232 match msg_result {
233 Some(Ok(msg)) => {
234 if msg.from == self_peer_id {
236 continue;
237 }
238
239 let event = event::new_event_message(msg.data);
240 if tx.send(event).await.is_err() {
241 break;
243 }
244 }
245 Some(Err(e)) => {
246 warn!("Error in pubsub stream for topic {}: {:?}", topic_name, e);
248 continue;
249 }
250 None => {
251 break;
253 }
254 }
255 }
256 _ = cancellation_token.cancelled() => {
257 break;
259 }
260 }
261 }
262 });
263
264 Ok(rx)
265 }
266
267 #[instrument(level = "debug", skip(self))]
270 pub fn topic(&self) -> &str {
271 &self.topic
272 }
273
274 #[instrument(level = "debug", skip(self))]
276 pub fn cancel(&self) {
277 self.cancellation_token.cancel();
278 }
279
280 #[instrument(level = "debug", skip(self))]
282 pub fn is_cancelled(&self) -> bool {
283 self.cancellation_token.is_cancelled()
284 }
285
286 #[instrument(level = "debug", skip(self))]
288 pub async fn clear_members(&self) {
289 let mut members_guard = self.members.write().await;
290 members_guard.clear();
291 }
292}
293
294#[async_trait::async_trait]
295impl PubSubTopic for PsTopic {
296 type Error = GuardianError;
297
298 #[instrument(level = "debug", skip(self, message))]
299 async fn publish(&self, message: Vec<u8>) -> crate::error::Result<()> {
300 PsTopic::publish(self, &message).await
301 }
302
303 #[instrument(level = "debug", skip(self))]
304 async fn peers(&self) -> crate::error::Result<Vec<PeerId>> {
305 self.peers().await
306 }
307
308 #[instrument(level = "debug", skip(self))]
309 async fn watch_peers(
310 &self,
311 ) -> crate::error::Result<Pin<Box<dyn Stream<Item = events::Event> + Send>>> {
312 let (tx, rx) = mpsc::channel(32);
314
315 let topic_clone = Arc::new(PsTopic {
317 topic: self.topic.clone(),
318 ps: self.ps.clone(),
319 members: RwLock::new(self.members.read().await.clone()),
320 cancellation_token: self.cancellation_token.clone(),
321 });
322
323 tokio::spawn(async move {
324 loop {
325 if topic_clone.cancellation_token.is_cancelled() {
327 break;
328 }
329
330 let peers_diff_result = topic_clone.peers_diff().await;
332
333 let (joining, leaving) = match peers_diff_result {
334 Ok((j, l)) => (j, l),
335 Err(e) => {
336 error!("Erro ao verificar a diferença de peers: {:?}", e);
338 return;
339 }
340 };
341
342 for pid in joining {
343 let event = event::new_event_peer_join(pid, topic_clone.topic().to_string());
344 let event_any: events::Event = Arc::new(event);
346 if tx.send(event_any).await.is_err() {
347 return;
349 }
350 }
351
352 for pid in leaving {
353 let event = event::new_event_peer_leave(pid, topic_clone.topic().to_string());
354 let event_any: events::Event = Arc::new(event);
356 if tx.send(event_any).await.is_err() {
357 return;
358 }
359 }
360
361 tokio::select! {
363 _ = tokio::time::sleep(topic_clone.ps.poll_interval) => {},
364 _ = topic_clone.cancellation_token.cancelled() => {
365 break;
366 }
367 }
368 }
369 });
370
371 let stream = ReceiverStream::new(rx);
372 Ok(Box::pin(stream))
373 }
374
375 #[instrument(level = "debug", skip(self))]
376 async fn watch_messages(
377 &self,
378 ) -> crate::error::Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>> {
379 let receiver = self.watch_messages().await?;
380 let stream = ReceiverStream::new(receiver);
381 Ok(Box::pin(stream))
382 }
383
384 #[instrument(level = "debug", skip(self))]
385 fn topic(&self) -> &str {
386 &self.topic
387 }
388}
389
390impl CoreApiPubSub {
391 #[instrument(level = "debug", skip(self))]
393 async fn create_topic(&self, topic: &str) -> Arc<PsTopic> {
394 Arc::new(PsTopic {
395 topic: topic.to_string(),
396 ps: unsafe {
397 Arc::from_raw(self as *const Self)
399 },
400 members: Default::default(),
401 cancellation_token: self.cancellation_token.child_token(),
402 })
403 }
404
405 #[instrument(level = "debug", skip(self))]
408 pub async fn topic_subscribe_internal(
409 self: &Arc<Self>,
410 topic: &str,
411 ) -> crate::error::Result<Arc<PsTopic>> {
412 let mut topics_guard = self.topics.lock().await;
413
414 if let Some(t) = topics_guard.get(topic) {
416 return Ok(t.clone());
417 }
418
419 let new_topic = Arc::new(PsTopic {
421 topic: topic.to_string(),
422 ps: self.clone(), members: Default::default(),
424 cancellation_token: self.cancellation_token.child_token(),
425 });
426
427 topics_guard.insert(topic.to_string(), new_topic.clone());
429
430 Ok(new_topic)
431 }
432
433 #[instrument(level = "debug", skip(api, span, tracer))]
436 pub fn new(
437 api: IpfsClient,
438 id: PeerId,
439 poll_interval: Duration,
440 span: Option<Span>,
441 tracer: Option<Arc<TracerWrapper>>,
442 ) -> Arc<Self> {
443 let default_tracer = Arc::new(TracerWrapper::Noop(NoopTracer::new()));
445
446 Arc::new(Self {
447 topics: Mutex::new(HashMap::new()),
448 api,
449 id,
450 poll_interval,
451 span: span.unwrap_or_else(tracing::Span::current),
452 tracer: tracer.unwrap_or(default_tracer),
453 cancellation_token: CancellationToken::new(),
454 })
455 }
456
457 pub fn cancel(&self) {
459 self.cancellation_token.cancel();
460 }
461
462 pub fn is_cancelled(&self) -> bool {
464 self.cancellation_token.is_cancelled()
465 }
466
467 #[instrument(level = "debug", skip(self))]
469 pub async fn remove_topic(&self, topic_name: &str) -> bool {
470 let mut topics_guard = self.topics.lock().await;
471 topics_guard.remove(topic_name).is_some()
472 }
473
474 #[instrument(level = "debug", skip(self))]
476 pub async fn cleanup_cancelled_topics(&self) -> usize {
477 let mut topics_guard = self.topics.lock().await;
478 let mut cancelled_topics = Vec::new();
479
480 for (name, topic) in topics_guard.iter() {
482 if topic.is_cancelled() {
483 cancelled_topics.push(name.clone());
484 }
485 }
486
487 for topic_name in &cancelled_topics {
489 topics_guard.remove(topic_name);
490 }
491
492 cancelled_topics.len()
493 }
494
495 #[instrument(level = "debug", skip(self))]
497 pub async fn topic_stats(&self) -> (usize, usize) {
498 let topics_guard = self.topics.lock().await;
499 let total_topics = topics_guard.len();
500 let mut active_topics = 0;
501
502 for topic in topics_guard.values() {
503 if !topic.is_cancelled() {
504 active_topics += 1;
505 }
506 }
507
508 (total_topics, active_topics)
509 }
510}