Skip to main content

connexa/handle/
gossipsub.rs

1use crate::handle::Connexa;
2use crate::types::{GossipsubCommand, GossipsubEvent};
3use bytes::Bytes;
4use futures::StreamExt;
5use futures::channel::oneshot;
6use futures::stream::BoxStream;
7use libp2p::PeerId;
8use libp2p::gossipsub::{Hasher, IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
9
10#[derive(Copy, Clone)]
11pub struct ConnexaGossipsub<'a, T> {
12    connexa: &'a Connexa<T>,
13}
14
15impl<'a, T> ConnexaGossipsub<'a, T>
16where
17    T: Send + Sync + 'static,
18{
19    pub(crate) fn new(connexa: &'a Connexa<T>) -> Self {
20        Self { connexa }
21    }
22
23    /// Subscribes to a specified topic in the gossipsub network.
24    pub async fn subscribe(&self, topic: impl IntoTopic) -> std::io::Result<()> {
25        let topic = topic.into_topic();
26        let (tx, rx) = oneshot::channel();
27
28        self.connexa
29            .to_task
30            .clone()
31            .send(GossipsubCommand::Subscribe { topic, resp: tx }.into())
32            .await?;
33
34        rx.await.map_err(std::io::Error::other)?
35    }
36
37    /// Creates a listener for a specified gossipsub topic.
38    pub async fn listener(
39        &self,
40        topic: impl IntoTopic,
41    ) -> std::io::Result<BoxStream<'static, GossipsubEvent>> {
42        let topic = topic.into_topic();
43        let (tx, rx) = oneshot::channel();
44
45        self.connexa
46            .to_task
47            .clone()
48            .send(GossipsubCommand::GossipsubListener { topic, resp: tx }.into())
49            .await?;
50
51        rx.await
52            .map_err(std::io::Error::other)?
53            .map(|rx| rx.boxed())
54    }
55
56    /// Unsubscribes from a specified gossipsub topic.
57    pub async fn unsubscribe(&self, topic: impl IntoTopic) -> std::io::Result<()> {
58        let topic = topic.into_topic();
59        let (tx, rx) = oneshot::channel();
60
61        self.connexa
62            .to_task
63            .clone()
64            .send(GossipsubCommand::Unsubscribe { topic, resp: tx }.into())
65            .await?;
66
67        rx.await.map_err(std::io::Error::other)?
68    }
69
70    /// Retrieves a list of peers that are subscribed to a specified topic.
71    pub async fn peers(&self, topic: impl IntoTopic) -> std::io::Result<Vec<PeerId>> {
72        let topic = topic.into_topic();
73        let (tx, rx) = oneshot::channel();
74
75        self.connexa
76            .to_task
77            .clone()
78            .send(GossipsubCommand::Peers { topic, resp: tx }.into())
79            .await?;
80
81        rx.await.map_err(std::io::Error::other)?
82    }
83
84    /// Publishes a message to a specified gossipsub topic.
85    pub async fn publish(
86        &self,
87        topic: impl IntoTopic,
88        message: impl Into<Bytes>,
89    ) -> std::io::Result<()> {
90        let topic = topic.into_topic();
91        let data = message.into();
92        let (tx, rx) = oneshot::channel();
93
94        self.connexa
95            .to_task
96            .clone()
97            .send(
98                GossipsubCommand::Publish {
99                    topic,
100                    data,
101                    resp: tx,
102                }
103                .into(),
104            )
105            .await?;
106
107        rx.await.map_err(std::io::Error::other)?
108    }
109
110    /// Reports validation results to the gossipsub system for a received message
111    pub async fn report_message(
112        &self,
113        peer_id: PeerId,
114        message_id: MessageId,
115        message_acceptance: MessageAcceptance,
116    ) -> std::io::Result<bool> {
117        let (tx, rx) = oneshot::channel();
118
119        self.connexa
120            .to_task
121            .clone()
122            .send(
123                GossipsubCommand::ReportMessage {
124                    peer_id,
125                    message_id,
126                    accept: message_acceptance,
127                    resp: tx,
128                }
129                .into(),
130            )
131            .await?;
132
133        rx.await.map_err(std::io::Error::other)?
134    }
135}
136
137pub trait IntoTopic {
138    fn into_topic(self) -> TopicHash;
139}
140
141impl<H: Hasher> IntoTopic for Topic<H> {
142    fn into_topic(self) -> TopicHash {
143        self.hash()
144    }
145}
146
147impl<H: Hasher> IntoTopic for &Topic<H> {
148    fn into_topic(self) -> TopicHash {
149        self.hash()
150    }
151}
152
153impl IntoTopic for TopicHash {
154    fn into_topic(self) -> TopicHash {
155        self
156    }
157}
158
159impl IntoTopic for &TopicHash {
160    fn into_topic(self) -> TopicHash {
161        self.clone()
162    }
163}
164
165impl IntoTopic for String {
166    fn into_topic(self) -> TopicHash {
167        IdentTopic::new(self).hash()
168    }
169}
170
171impl IntoTopic for &String {
172    fn into_topic(self) -> TopicHash {
173        IdentTopic::new(self).hash()
174    }
175}
176
177impl IntoTopic for &str {
178    fn into_topic(self) -> TopicHash {
179        IdentTopic::new(self).hash()
180    }
181}
182
183impl IntoTopic for Vec<u8> {
184    fn into_topic(self) -> TopicHash {
185        let topic = String::from_utf8_lossy(&self);
186        IdentTopic::new(topic).hash()
187    }
188}
189
190impl IntoTopic for &[u8] {
191    fn into_topic(self) -> TopicHash {
192        let topic = String::from_utf8_lossy(self);
193        IdentTopic::new(topic).hash()
194    }
195}
196
197impl IntoTopic for Bytes {
198    fn into_topic(self) -> TopicHash {
199        let topic = String::from_utf8_lossy(&self);
200        IdentTopic::new(topic).hash()
201    }
202}
203
204impl IntoTopic for &Bytes {
205    fn into_topic(self) -> TopicHash {
206        let topic = String::from_utf8_lossy(self);
207        IdentTopic::new(topic).hash()
208    }
209}
210
211impl IntoTopic for Vec<String> {
212    fn into_topic(self) -> TopicHash {
213        let topic = self.join("/");
214        IntoTopic::into_topic(topic)
215    }
216}
217
218impl IntoTopic for &[String] {
219    fn into_topic(self) -> TopicHash {
220        let topic = self.join("/");
221        IntoTopic::into_topic(topic)
222    }
223}
224
225impl IntoTopic for &[&str] {
226    fn into_topic(self) -> TopicHash {
227        let topic = self.join("/");
228        IntoTopic::into_topic(topic)
229    }
230}
231
232impl IntoTopic for Vec<&str> {
233    fn into_topic(self) -> TopicHash {
234        let topic = self.join("/");
235        IntoTopic::into_topic(topic)
236    }
237}
238
239impl<F> IntoTopic for F
240where
241    F: FnOnce() -> TopicHash,
242{
243    fn into_topic(self) -> TopicHash {
244        self()
245    }
246}