guardian_db/ipfs_core_api/backends/
iroh_pubsub.rs1use crate::error::{GuardianError, Result};
7use crate::ipfs_core_api::backends::iroh::IrohBackend;
8use crate::p2p::manager::SwarmManager;
9use crate::traits::{EventPubSubMessage, PubSubInterface, PubSubTopic};
10use async_trait::async_trait;
11use futures::stream::{Stream, StreamExt};
12use libp2p::{PeerId, gossipsub::TopicHash};
13use std::collections::HashMap;
14use std::pin::Pin;
15use std::sync::Arc;
16use tokio::sync::{RwLock, broadcast};
17use tokio_stream::wrappers::BroadcastStream;
18use tracing::{debug, info, warn};
19
20pub struct IrohPubSub {
22 iroh_backend: Arc<IrohBackend>,
24 topics: Arc<RwLock<HashMap<String, Arc<IrohTopic>>>>,
26}
27
28pub struct IrohTopic {
30 topic_name: String,
32 topic_hash: TopicHash,
34 swarm: Arc<RwLock<Option<SwarmManager>>>,
36 message_sender: Arc<broadcast::Sender<Vec<u8>>>,
38 peers: Arc<RwLock<Vec<PeerId>>>,
40 peer_events_sender: Arc<broadcast::Sender<crate::events::Event>>,
42}
43
44impl IrohPubSub {
45 pub fn new(iroh_backend: Arc<IrohBackend>) -> Self {
47 Self {
48 iroh_backend,
49 topics: Arc::new(RwLock::new(HashMap::new())),
50 }
51 }
52
53 async fn get_or_create_topic(&self, topic: &str) -> Result<Arc<IrohTopic>> {
55 let mut topics = self.topics.write().await;
56
57 if let Some(existing_topic) = topics.get(topic) {
58 return Ok(existing_topic.clone());
59 }
60
61 let topic_hash = TopicHash::from_raw(topic);
63 let (sender, _) = broadcast::channel(1000); let (peer_events_sender, _) = broadcast::channel(1000); let swarm = self.iroh_backend.get_swarm_manager().await?;
68
69 {
71 let swarm_lock = swarm.read().await;
72 if let Some(swarm_manager) = swarm_lock.as_ref() {
73 swarm_manager
74 .subscribe_topic(&topic_hash)
75 .await
76 .map_err(|e| {
77 GuardianError::Other(format!("Erro ao subscrever tópico {}: {}", topic, e))
78 })?;
79 info!("Subscrito com sucesso ao tópico Gossipsub: {}", topic);
80 } else {
81 return Err(GuardianError::Other(
82 "SwarmManager não disponível".to_string(),
83 ));
84 }
85 }
86
87 let iroh_topic = Arc::new(IrohTopic {
88 topic_name: topic.to_string(),
89 topic_hash,
90 swarm,
91 message_sender: Arc::new(sender),
92 peers: Arc::new(RwLock::new(Vec::new())),
93 peer_events_sender: Arc::new(peer_events_sender),
94 });
95
96 topics.insert(topic.to_string(), iroh_topic.clone());
97
98 Ok(iroh_topic)
99 }
100}
101
102#[async_trait]
103impl PubSubInterface for IrohPubSub {
104 type Error = GuardianError;
105
106 async fn topic_subscribe(
107 &mut self,
108 topic: &str,
109 ) -> std::result::Result<Arc<dyn PubSubTopic<Error = GuardianError>>, Self::Error> {
110 debug!("Subscrevendo ao tópico via IrohPubSub: {}", topic);
111
112 let iroh_topic = self.get_or_create_topic(topic).await?;
113
114 Ok(iroh_topic as Arc<dyn PubSubTopic<Error = GuardianError>>)
115 }
116
117 fn as_any(&self) -> &dyn std::any::Any {
118 self
119 }
120}
121
122impl IrohTopic {
123 pub async fn publish(&self, data: &[u8]) -> Result<()> {
125 debug!(
126 "Publicando mensagem no tópico {}: {} bytes",
127 self.topic_name,
128 data.len()
129 );
130
131 let swarm_lock = self.swarm.read().await;
132 if let Some(swarm_manager) = swarm_lock.as_ref() {
133 swarm_manager
134 .publish_message(&self.topic_hash, data)
135 .await
136 .map_err(|e| {
137 GuardianError::Other(format!("Erro ao publicar no Gossipsub: {}", e))
138 })?;
139
140 debug!(
141 "Mensagem publicada com sucesso no tópico {}",
142 self.topic_name
143 );
144 Ok(())
145 } else {
146 Err(GuardianError::Other(
147 "SwarmManager não disponível".to_string(),
148 ))
149 }
150 }
151
152 pub fn subscribe_messages(&self) -> broadcast::Receiver<Vec<u8>> {
154 self.message_sender.subscribe()
155 }
156
157 pub async fn notify_message(&self, data: Vec<u8>) -> Result<()> {
159 match self.message_sender.send(data) {
160 Ok(_) => {
161 debug!(
162 "Mensagem propagada para subscribers do tópico {}",
163 self.topic_name
164 );
165 Ok(())
166 }
167 Err(_) => {
168 warn!("Nenhum subscriber ativo para o tópico {}", self.topic_name);
169 Ok(()) }
171 }
172 }
173
174 pub async fn add_peer(&self, peer_id: PeerId) {
176 let mut peers = self.peers.write().await;
177 if !peers.contains(&peer_id) {
178 peers.push(peer_id);
179 debug!("Peer {} adicionado ao tópico {}", peer_id, self.topic_name);
180
181 let event: crate::events::Event =
183 Arc::new(format!("PeerConnected:{}:{}", peer_id, self.topic_name));
184 let _ = self.peer_events_sender.send(event);
185 }
186 }
187
188 pub async fn remove_peer(&self, peer_id: &PeerId) {
190 let mut peers = self.peers.write().await;
191 let was_present = peers.iter().any(|p| p == peer_id);
192 peers.retain(|p| p != peer_id);
193
194 if was_present {
195 debug!("Peer {} removido do tópico {}", peer_id, self.topic_name);
196
197 let event: crate::events::Event =
199 Arc::new(format!("PeerDisconnected:{}:{}", peer_id, self.topic_name));
200 let _ = self.peer_events_sender.send(event);
201 }
202 }
203
204 pub async fn list_peers(&self) -> Vec<PeerId> {
206 let peers = self.peers.read().await;
207 peers.clone()
208 }
209}
210
211#[async_trait]
212impl PubSubTopic for IrohTopic {
213 type Error = GuardianError;
214
215 async fn publish(&self, message: Vec<u8>) -> std::result::Result<(), Self::Error> {
216 self.publish(&message).await
217 }
218
219 async fn peers(&self) -> std::result::Result<Vec<PeerId>, Self::Error> {
220 Ok(self.list_peers().await)
221 }
222
223 async fn watch_peers(
224 &self,
225 ) -> std::result::Result<Pin<Box<dyn Stream<Item = crate::events::Event> + Send>>, Self::Error>
226 {
227 let receiver = self.peer_events_sender.subscribe();
229
230 let stream = BroadcastStream::new(receiver).filter_map(|result| async {
232 result.ok() });
234
235 Ok(Box::pin(stream))
236 }
237
238 async fn watch_messages(
239 &self,
240 ) -> std::result::Result<Pin<Box<dyn Stream<Item = EventPubSubMessage> + Send>>, Self::Error>
241 {
242 let receiver = self.message_sender.subscribe();
244 let _topic_name = self.topic_name.clone();
245
246 let stream = BroadcastStream::new(receiver).filter_map(move |result| async move {
248 match result {
249 Ok(data) => {
250 Some(EventPubSubMessage { content: data })
252 }
253 Err(_) => None, }
255 });
256
257 Ok(Box::pin(stream))
258 }
259
260 fn topic(&self) -> &str {
261 &self.topic_name
262 }
263}
264
265impl IrohBackend {
266 pub fn create_pubsub_interface(self: Arc<Self>) -> IrohPubSub {
268 IrohPubSub::new(self)
269 }
270}