rust_ipfs/p2p/
gossipsub.rs

1use futures::channel::mpsc::{self as channel};
2use futures::stream::{FusedStream, Stream};
3use libp2p::gossipsub::PublishError;
4use std::collections::HashMap;
5use std::fmt;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tracing::debug;
9
10use libp2p::core::transport::PortUse;
11use libp2p::core::{Endpoint, Multiaddr};
12use libp2p::identity::PeerId;
13
14use libp2p::gossipsub::{
15    Behaviour as Gossipsub, Event as GossipsubEvent, IdentTopic as Topic,
16    Message as GossipsubMessage, MessageId, TopicHash,
17};
18use libp2p::swarm::{
19    ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, THandlerInEvent, ToSwarm,
20};
21
22/// Currently a thin wrapper around Gossipsub.
23/// Allows single subscription to a topic with only unbounded senders. Tracks the peers subscribed
24/// to different topics.
25pub struct GossipsubStream {
26    // Tracks the topic subscriptions.
27    streams: HashMap<TopicHash, futures::channel::mpsc::Sender<GossipsubMessage>>,
28
29    // Gossipsub protocol
30    gossipsub: Gossipsub,
31
32    // the subscription streams implement Drop and will send out their topic through the
33    // sender cloned from here if they are dropped before the stream has ended.
34    unsubscriptions: (
35        channel::UnboundedSender<TopicHash>,
36        channel::UnboundedReceiver<TopicHash>,
37    ),
38}
39
40impl core::ops::Deref for GossipsubStream {
41    type Target = Gossipsub;
42    fn deref(&self) -> &Self::Target {
43        &self.gossipsub
44    }
45}
46
47impl core::ops::DerefMut for GossipsubStream {
48    fn deref_mut(&mut self) -> &mut Gossipsub {
49        &mut self.gossipsub
50    }
51}
52
53/// Stream of a pubsub messages. Implements [`FusedStream`].
54pub struct SubscriptionStream {
55    on_drop: Option<channel::UnboundedSender<TopicHash>>,
56    topic: Option<TopicHash>,
57    inner: futures::channel::mpsc::Receiver<GossipsubMessage>,
58}
59
60impl Drop for SubscriptionStream {
61    fn drop(&mut self) {
62        if let Some(sender) = self.on_drop.take() {
63            if let Some(topic) = self.topic.take() {
64                let _ = sender.unbounded_send(topic);
65            }
66        }
67    }
68}
69
70impl fmt::Debug for SubscriptionStream {
71    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
72        if let Some(topic) = self.topic.as_ref() {
73            write!(
74                fmt,
75                "SubscriptionStream {{ topic: {:?}, is_terminated: {} }}",
76                topic,
77                self.is_terminated()
78            )
79        } else {
80            write!(
81                fmt,
82                "SubscriptionStream {{ is_terminated: {} }}",
83                self.is_terminated()
84            )
85        }
86    }
87}
88
89impl Stream for SubscriptionStream {
90    type Item = GossipsubMessage;
91
92    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
93        use futures::stream::StreamExt;
94        let inner = &mut self.as_mut().inner;
95        match inner.poll_next_unpin(ctx) {
96            Poll::Ready(None) => {
97                // no need to unsubscribe on drop as the stream has already ended, likely via
98                // unsubscribe call.
99                self.on_drop.take();
100                Poll::Ready(None)
101            }
102            other => other,
103        }
104    }
105}
106
107impl FusedStream for SubscriptionStream {
108    fn is_terminated(&self) -> bool {
109        self.on_drop.is_none()
110    }
111}
112
113impl From<Gossipsub> for GossipsubStream {
114    fn from(gossipsub: Gossipsub) -> Self {
115        let (tx, rx) = channel::unbounded();
116        GossipsubStream {
117            streams: HashMap::new(),
118            gossipsub,
119            unsubscriptions: (tx, rx),
120        }
121    }
122}
123
124impl GossipsubStream {
125    /// Subscribes to a currently unsubscribed topic.
126    /// Returns a receiver for messages sent to the topic or `None` if subscription existed
127    /// already.
128    pub fn subscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<SubscriptionStream> {
129        let topic = Topic::new(topic);
130
131        if self.streams.contains_key(&topic.hash()) {
132            anyhow::bail!("Already subscribed to topic")
133        }
134
135        if !self.gossipsub.subscribe(&topic)? {
136            anyhow::bail!("Already subscribed to topic")
137        }
138
139        let (tx, rx) = futures::channel::mpsc::channel(15000);
140        self.streams.insert(topic.hash(), tx);
141        Ok(SubscriptionStream {
142            on_drop: Some(self.unsubscriptions.0.clone()),
143            topic: Some(topic.hash()),
144            inner: rx,
145        })
146    }
147
148    /// Unsubscribes from a topic. Unsubscription is usually done through dropping the
149    /// SubscriptionStream.
150    ///
151    /// Returns true if an existing subscription was dropped, false otherwise
152    pub fn unsubscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<bool> {
153        let topic = Topic::new(topic);
154
155        if !self.streams.contains_key(&topic.hash()) {
156            anyhow::bail!("Unable to unsubscribe from topic.")
157        }
158
159        self.streams
160            .remove(&topic.hash())
161            .expect("subscribed to topic");
162
163        self.gossipsub
164            .unsubscribe(&topic)
165            .map_err(anyhow::Error::from)
166    }
167
168    /// Publish to subscribed topic
169    pub fn publish(
170        &mut self,
171        topic: impl Into<String>,
172        data: impl Into<Vec<u8>>,
173    ) -> Result<MessageId, PublishError> {
174        self.gossipsub.publish(Topic::new(topic), data)
175    }
176
177    /// Returns the known peers subscribed to any topic
178    pub fn known_peers(&self) -> Vec<PeerId> {
179        self.all_peers().map(|(peer, _)| *peer).collect()
180    }
181
182    /// Returns the peers known to subscribe to the given topic
183    pub fn subscribed_peers(&self, topic: impl Into<String>) -> Vec<PeerId> {
184        let topic = Topic::new(topic);
185        self.all_peers()
186            .filter(|(_, list)| list.contains(&&topic.hash()))
187            .map(|(peer_id, _)| *peer_id)
188            .collect()
189    }
190}
191
192impl NetworkBehaviour for GossipsubStream {
193    type ConnectionHandler = <Gossipsub as NetworkBehaviour>::ConnectionHandler;
194    type ToSwarm = GossipsubEvent;
195
196    fn handle_pending_outbound_connection(
197        &mut self,
198        connection_id: ConnectionId,
199        maybe_peer: Option<PeerId>,
200        addresses: &[Multiaddr],
201        effective_role: Endpoint,
202    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
203        self.gossipsub.handle_pending_outbound_connection(
204            connection_id,
205            maybe_peer,
206            addresses,
207            effective_role,
208        )
209    }
210
211    fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
212        self.gossipsub.on_swarm_event(event)
213    }
214
215    fn on_connection_handler_event(
216        &mut self,
217        peer_id: PeerId,
218        connection_id: libp2p::swarm::ConnectionId,
219        event: libp2p::swarm::THandlerOutEvent<Self>,
220    ) {
221        self.gossipsub
222            .on_connection_handler_event(peer_id, connection_id, event)
223    }
224
225    fn handle_established_inbound_connection(
226        &mut self,
227        connection_id: ConnectionId,
228        peer: PeerId,
229        local_addr: &Multiaddr,
230        remote_addr: &Multiaddr,
231    ) -> Result<THandler<Self>, ConnectionDenied> {
232        self.gossipsub.handle_established_inbound_connection(
233            connection_id,
234            peer,
235            local_addr,
236            remote_addr,
237        )
238    }
239
240    fn handle_established_outbound_connection(
241        &mut self,
242        connection_id: ConnectionId,
243        peer: PeerId,
244        addr: &Multiaddr,
245        role_override: Endpoint,
246        port_use: PortUse,
247    ) -> Result<THandler<Self>, ConnectionDenied> {
248        self.gossipsub.handle_established_outbound_connection(
249            connection_id,
250            peer,
251            addr,
252            role_override,
253            port_use,
254        )
255    }
256
257    fn poll(
258        &mut self,
259        ctx: &mut Context,
260    ) -> Poll<ToSwarm<libp2p::gossipsub::Event, THandlerInEvent<Self>>> {
261        use futures::stream::StreamExt;
262        use std::collections::hash_map::Entry;
263
264        loop {
265            match self.unsubscriptions.1.poll_next_unpin(ctx) {
266                Poll::Ready(Some(dropped)) => {
267                    if let Some(mut sender) = self.streams.remove(&dropped) {
268                        sender.close_channel();
269                        debug!("unsubscribing via drop from {:?}", dropped);
270                        assert!(
271                            self.gossipsub
272                                .unsubscribe(&Topic::new(dropped.to_string()))
273                                .unwrap_or_default(),
274                            "Failed to unsubscribe a dropped subscription"
275                        );
276                    }
277                }
278                Poll::Ready(None) => unreachable!("we own the sender"),
279                Poll::Pending => break,
280            }
281        }
282
283        loop {
284            match futures::ready!(self.gossipsub.poll(ctx)) {
285                ToSwarm::GenerateEvent(GossipsubEvent::Message { message, .. }) => {
286                    let topic = message.topic.clone();
287                    if let Entry::Occupied(mut oe) = self.streams.entry(topic) {
288                        if let Err(e) = oe.get_mut().try_send(message) {
289                            if e.is_full() {
290                                continue;
291                            }
292                            // receiver has dropped
293                            let (topic, _) = oe.remove_entry();
294                            debug!("unsubscribing via SendError from {:?}", &topic);
295                            assert!(
296                                self.gossipsub
297                                    .unsubscribe(&Topic::new(topic.to_string()))
298                                    .unwrap_or_default(),
299                                "Failed to unsubscribe following SendError"
300                            );
301                        }
302                    }
303                    continue;
304                }
305                action => {
306                    return Poll::Ready(action);
307                }
308            }
309        }
310    }
311}