guardian_db/p2p/pubsub/
raw.rs1use crate::error::{GuardianError, Result};
2use crate::events;
3use crate::ipfs_core_api::IpfsClient;
4use crate::p2p::events::new_event_message;
5use crate::traits::{EventPubSubMessage, PubSubInterface, PubSubTopic};
6use async_trait::async_trait;
7use futures::stream::{Stream, StreamExt};
8use libp2p::PeerId;
9use std::collections::HashMap;
10use std::pin::Pin;
11use std::sync::Arc;
12use tokio::sync::{RwLock, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, warn};
16
17pub struct PsTopic {
19 topic_name: String,
20 ipfs_client: Arc<IpfsClient>,
21 cancellation_token: CancellationToken,
23}
24
25pub struct RawPubSub {
27 ipfs_client: Arc<IpfsClient>,
28 id: PeerId,
29 topics: RwLock<HashMap<String, Arc<PsTopic>>>,
31}
32
33#[async_trait]
34impl PubSubTopic for PsTopic {
35 type Error = GuardianError;
36
37 async fn publish(&self, message: Vec<u8>) -> std::result::Result<(), Self::Error> {
39 info!(
40 "Publicando mensagem no tópico '{}': {} bytes",
41 self.topic_name,
42 message.len()
43 );
44
45 self.ipfs_client
46 .pubsub_publish(&self.topic_name, &message)
47 .await?;
48
49 debug!(
50 "Mensagem publicada com sucesso no tópico '{}'",
51 self.topic_name
52 );
53 Ok(())
54 }
55
56 async fn peers(&self) -> std::result::Result<Vec<PeerId>, Self::Error> {
58 debug!("Listando peers do tópico: {}", self.topic_name);
59
60 let peers = self.ipfs_client.pubsub_peers(&self.topic_name).await?;
61
62 debug!(
63 "Encontrados {} peers para tópico '{}'",
64 peers.len(),
65 self.topic_name
66 );
67 Ok(peers)
68 }
69
70 async fn watch_peers(
72 &self,
73 ) -> std::result::Result<Pin<Box<dyn Stream<Item = events::Event> + Send>>, Self::Error> {
74 let (tx, rx) = mpsc::channel(32);
75 let ipfs_client = self.ipfs_client.clone();
76 let topic_name = self.topic_name.clone();
77 let token = self.cancellation_token.clone();
78
79 tokio::spawn(async move {
80 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
81 let mut last_peers: Vec<PeerId> = Vec::new();
82
83 loop {
84 tokio::select! {
85 _ = token.cancelled() => {
86 debug!("Monitoramento de peers cancelado para tópico: {}", topic_name);
87 break;
88 }
89 _ = interval.tick() => {
90 match ipfs_client.pubsub_peers(&topic_name).await {
91 Ok(current_peers) => {
92 for peer in ¤t_peers {
94 if !last_peers.contains(peer) {
95 let join_event: events::Event = Arc::new(crate::traits::EventPubSub::Join {
97 peer: *peer,
98 topic: topic_name.clone()
99 });
100 if tx.send(join_event).await.is_err() {
101 break;
102 }
103 }
104 }
105
106 for peer in &last_peers {
107 if !current_peers.contains(peer) {
108 let leave_event: events::Event = Arc::new(crate::traits::EventPubSub::Leave {
110 peer: *peer,
111 topic: topic_name.clone()
112 });
113 if tx.send(leave_event).await.is_err() {
114 break;
115 }
116 }
117 }
118
119 last_peers = current_peers;
120 }
121 Err(e) => {
122 error!("Erro ao obter peers do tópico '{}': {}", topic_name, e);
123 }
124 }
125 }
126 }
127 }
128 });
129
130 let stream = ReceiverStream::new(rx);
131 Ok(Box::pin(stream))
132 }
133
134 async fn watch_messages(
136 &self,
137 ) -> std::result::Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>, Self::Error>
138 {
139 let (tx, rx) = mpsc::channel(128);
140 let ipfs_client = self.ipfs_client.clone();
141 let topic_name = self.topic_name.clone();
142 let token = self.cancellation_token.clone();
143
144 tokio::spawn(async move {
146 match ipfs_client.pubsub_subscribe(&topic_name).await {
147 Ok(mut stream) => {
148 debug!("Stream de mensagens iniciado para tópico: {}", topic_name);
149
150 loop {
151 tokio::select! {
152 _ = token.cancelled() => {
153 debug!("Monitoramento de mensagens cancelado para tópico: {}", topic_name);
154 break;
155 }
156 msg_result = stream.next() => {
157 match msg_result {
158 Some(Ok(msg)) => {
159 let event = new_event_message(msg.data);
160 if tx.send(event).await.is_err() {
161 debug!("Receptor de mensagens desconectado para tópico: {}", topic_name);
162 break;
163 }
164 }
165 Some(Err(e)) => {
166 error!("Erro no stream de mensagens para tópico '{}': {}", topic_name, e);
167 break;
168 }
169 None => {
170 debug!("Stream de mensagens finalizado para tópico: {}", topic_name);
171 break;
172 }
173 }
174 }
175 }
176 }
177 }
178 Err(e) => {
179 error!(
180 "Erro ao criar stream de mensagens para tópico '{}': {}",
181 topic_name, e
182 );
183 }
184 }
185 });
186
187 let stream = ReceiverStream::new(rx);
188 Ok(Box::pin(stream))
189 }
190
191 fn topic(&self) -> &str {
193 &self.topic_name
194 }
195}
196
197impl RawPubSub {
198 pub async fn new(ipfs_client: Arc<IpfsClient>) -> Result<Arc<Self>> {
200 let node_info = ipfs_client.id().await?;
201 let id = node_info.id;
202
203 info!("Inicializando RawPubSub com node ID: {}", id);
204
205 Ok(Arc::new(RawPubSub {
206 ipfs_client,
207 id,
208 topics: RwLock::new(HashMap::new()),
209 }))
210 }
211
212 pub async fn new_with_config(ipfs_client: Arc<IpfsClient>) -> Result<Arc<Self>> {
214 Self::new(ipfs_client).await
215 }
216
217 pub async fn get_topic_stats(&self) -> HashMap<String, usize> {
219 let topics = self.topics.read().await;
220 let mut stats = HashMap::new();
221
222 for (topic_name, topic) in topics.iter() {
223 match topic.peers().await {
224 Ok(peers) => {
225 stats.insert(topic_name.clone(), peers.len());
226 }
227 Err(e) => {
228 warn!("Erro ao obter peers do tópico '{}': {}", topic_name, e);
229 stats.insert(topic_name.clone(), 0);
230 }
231 }
232 }
233
234 stats
235 }
236
237 pub async fn list_topics(&self) -> Vec<String> {
239 let topics = self.topics.read().await;
240 topics.keys().cloned().collect()
241 }
242
243 pub fn local_peer_id(&self) -> PeerId {
245 self.id
246 }
247
248 pub async fn topic_unsubscribe(&self, topic_name: &str) -> Result<()> {
250 let mut topics = self.topics.write().await;
251
252 if let Some(topic) = topics.remove(topic_name) {
253 topic.cancellation_token.cancel();
254 info!("Tópico '{}' removido com sucesso", topic_name);
255 } else {
256 warn!("Tentativa de remover tópico inexistente: {}", topic_name);
257 }
258
259 Ok(())
260 }
261}
262
263#[async_trait]
265impl PubSubInterface for RawPubSub {
266 type Error = GuardianError;
267
268 async fn topic_subscribe(
270 &mut self,
271 topic_name: &str,
272 ) -> std::result::Result<Arc<dyn PubSubTopic<Error = GuardianError>>, Self::Error> {
273 let mut topics = self.topics.write().await;
274
275 if let Some(existing_topic) = topics.get(topic_name) {
277 info!("Reutilizando tópico existente: {}", topic_name);
278 return Ok(existing_topic.clone() as Arc<dyn PubSubTopic<Error = GuardianError>>);
279 }
280
281 info!("Criando nova subscrição para tópico: {}", topic_name);
282
283 let new_topic = Arc::new(PsTopic {
285 topic_name: topic_name.to_string(),
286 ipfs_client: self.ipfs_client.clone(),
287 cancellation_token: CancellationToken::new(),
288 });
289
290 let _subscription_stream = self
292 .ipfs_client
293 .pubsub_subscribe(topic_name)
294 .await
295 .map_err(|e| GuardianError::Ipfs(format!("Erro ao subscrever tópico IPFS: {}", e)))?;
296
297 topics.insert(topic_name.to_string(), new_topic.clone());
299
300 info!("Tópico '{}' criado e subscrito com sucesso", topic_name);
301
302 Ok(new_topic as Arc<dyn PubSubTopic<Error = GuardianError>>)
303 }
304
305 fn as_any(&self) -> &dyn std::any::Any {
306 self
307 }
308}