connexa/handle/
gossipsub.rs1use crate::handle::Connexa;
2use crate::types::{GossipsubCommand, GossipsubEvent};
3use bytes::Bytes;
4use futures::StreamExt;
5use futures::channel::oneshot;
6use futures::stream::BoxStream;
7use libp2p::PeerId;
8use libp2p::gossipsub::{Hasher, IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
9
10#[derive(Copy, Clone)]
11pub struct ConnexaGossipsub<'a, T> {
12 connexa: &'a Connexa<T>,
13}
14
15impl<'a, T> ConnexaGossipsub<'a, T>
16where
17 T: Send + Sync + 'static,
18{
19 pub(crate) fn new(connexa: &'a Connexa<T>) -> Self {
20 Self { connexa }
21 }
22
23 pub async fn subscribe(&self, topic: impl IntoTopic) -> std::io::Result<()> {
25 let topic = topic.into_topic();
26 let (tx, rx) = oneshot::channel();
27
28 self.connexa
29 .to_task
30 .clone()
31 .send(GossipsubCommand::Subscribe { topic, resp: tx }.into())
32 .await?;
33
34 rx.await.map_err(std::io::Error::other)?
35 }
36
37 pub async fn listener(
39 &self,
40 topic: impl IntoTopic,
41 ) -> std::io::Result<BoxStream<'static, GossipsubEvent>> {
42 let topic = topic.into_topic();
43 let (tx, rx) = oneshot::channel();
44
45 self.connexa
46 .to_task
47 .clone()
48 .send(GossipsubCommand::GossipsubListener { topic, resp: tx }.into())
49 .await?;
50
51 rx.await
52 .map_err(std::io::Error::other)?
53 .map(|rx| rx.boxed())
54 }
55
56 pub async fn unsubscribe(&self, topic: impl IntoTopic) -> std::io::Result<()> {
58 let topic = topic.into_topic();
59 let (tx, rx) = oneshot::channel();
60
61 self.connexa
62 .to_task
63 .clone()
64 .send(GossipsubCommand::Unsubscribe { topic, resp: tx }.into())
65 .await?;
66
67 rx.await.map_err(std::io::Error::other)?
68 }
69
70 pub async fn peers(&self, topic: impl IntoTopic) -> std::io::Result<Vec<PeerId>> {
72 let topic = topic.into_topic();
73 let (tx, rx) = oneshot::channel();
74
75 self.connexa
76 .to_task
77 .clone()
78 .send(GossipsubCommand::Peers { topic, resp: tx }.into())
79 .await?;
80
81 rx.await.map_err(std::io::Error::other)?
82 }
83
84 pub async fn publish(
86 &self,
87 topic: impl IntoTopic,
88 message: impl Into<Bytes>,
89 ) -> std::io::Result<()> {
90 let topic = topic.into_topic();
91 let data = message.into();
92 let (tx, rx) = oneshot::channel();
93
94 self.connexa
95 .to_task
96 .clone()
97 .send(
98 GossipsubCommand::Publish {
99 topic,
100 data,
101 resp: tx,
102 }
103 .into(),
104 )
105 .await?;
106
107 rx.await.map_err(std::io::Error::other)?
108 }
109
110 pub async fn report_message(
112 &self,
113 peer_id: PeerId,
114 message_id: MessageId,
115 message_acceptance: MessageAcceptance,
116 ) -> std::io::Result<bool> {
117 let (tx, rx) = oneshot::channel();
118
119 self.connexa
120 .to_task
121 .clone()
122 .send(
123 GossipsubCommand::ReportMessage {
124 peer_id,
125 message_id,
126 accept: message_acceptance,
127 resp: tx,
128 }
129 .into(),
130 )
131 .await?;
132
133 rx.await.map_err(std::io::Error::other)?
134 }
135}
136
137pub trait IntoTopic {
138 fn into_topic(self) -> TopicHash;
139}
140
141impl<H: Hasher> IntoTopic for Topic<H> {
142 fn into_topic(self) -> TopicHash {
143 self.hash()
144 }
145}
146
147impl<H: Hasher> IntoTopic for &Topic<H> {
148 fn into_topic(self) -> TopicHash {
149 self.hash()
150 }
151}
152
153impl IntoTopic for TopicHash {
154 fn into_topic(self) -> TopicHash {
155 self
156 }
157}
158
159impl IntoTopic for &TopicHash {
160 fn into_topic(self) -> TopicHash {
161 self.clone()
162 }
163}
164
165impl IntoTopic for String {
166 fn into_topic(self) -> TopicHash {
167 IdentTopic::new(self).hash()
168 }
169}
170
171impl IntoTopic for &String {
172 fn into_topic(self) -> TopicHash {
173 IdentTopic::new(self).hash()
174 }
175}
176
177impl IntoTopic for &str {
178 fn into_topic(self) -> TopicHash {
179 IdentTopic::new(self).hash()
180 }
181}
182
183impl IntoTopic for Vec<u8> {
184 fn into_topic(self) -> TopicHash {
185 let topic = String::from_utf8_lossy(&self);
186 IdentTopic::new(topic).hash()
187 }
188}
189
190impl IntoTopic for &[u8] {
191 fn into_topic(self) -> TopicHash {
192 let topic = String::from_utf8_lossy(self);
193 IdentTopic::new(topic).hash()
194 }
195}
196
197impl IntoTopic for Bytes {
198 fn into_topic(self) -> TopicHash {
199 let topic = String::from_utf8_lossy(&self);
200 IdentTopic::new(topic).hash()
201 }
202}
203
204impl IntoTopic for &Bytes {
205 fn into_topic(self) -> TopicHash {
206 let topic = String::from_utf8_lossy(self);
207 IdentTopic::new(topic).hash()
208 }
209}
210
211impl IntoTopic for Vec<String> {
212 fn into_topic(self) -> TopicHash {
213 let topic = self.join("/");
214 IntoTopic::into_topic(topic)
215 }
216}
217
218impl IntoTopic for &[String] {
219 fn into_topic(self) -> TopicHash {
220 let topic = self.join("/");
221 IntoTopic::into_topic(topic)
222 }
223}
224
225impl IntoTopic for &[&str] {
226 fn into_topic(self) -> TopicHash {
227 let topic = self.join("/");
228 IntoTopic::into_topic(topic)
229 }
230}
231
232impl IntoTopic for Vec<&str> {
233 fn into_topic(self) -> TopicHash {
234 let topic = self.join("/");
235 IntoTopic::into_topic(topic)
236 }
237}
238
239impl<F> IntoTopic for F
240where
241 F: FnOnce() -> TopicHash,
242{
243 fn into_topic(self) -> TopicHash {
244 self()
245 }
246}