1use futures::channel::mpsc::{self as channel};
2use futures::stream::{FusedStream, Stream};
3use libp2p::gossipsub::PublishError;
4use std::collections::HashMap;
5use std::fmt;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tracing::debug;
9
10use libp2p::core::transport::PortUse;
11use libp2p::core::{Endpoint, Multiaddr};
12use libp2p::identity::PeerId;
13
14use libp2p::gossipsub::{
15 Behaviour as Gossipsub, Event as GossipsubEvent, IdentTopic as Topic,
16 Message as GossipsubMessage, MessageId, TopicHash,
17};
18use libp2p::swarm::{
19 ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, THandlerInEvent, ToSwarm,
20};
21
22pub struct GossipsubStream {
26 streams: HashMap<TopicHash, futures::channel::mpsc::Sender<GossipsubMessage>>,
28
29 gossipsub: Gossipsub,
31
32 unsubscriptions: (
35 channel::UnboundedSender<TopicHash>,
36 channel::UnboundedReceiver<TopicHash>,
37 ),
38}
39
40impl core::ops::Deref for GossipsubStream {
41 type Target = Gossipsub;
42 fn deref(&self) -> &Self::Target {
43 &self.gossipsub
44 }
45}
46
47impl core::ops::DerefMut for GossipsubStream {
48 fn deref_mut(&mut self) -> &mut Gossipsub {
49 &mut self.gossipsub
50 }
51}
52
53pub struct SubscriptionStream {
55 on_drop: Option<channel::UnboundedSender<TopicHash>>,
56 topic: Option<TopicHash>,
57 inner: futures::channel::mpsc::Receiver<GossipsubMessage>,
58}
59
60impl Drop for SubscriptionStream {
61 fn drop(&mut self) {
62 if let Some(sender) = self.on_drop.take() {
63 if let Some(topic) = self.topic.take() {
64 let _ = sender.unbounded_send(topic);
65 }
66 }
67 }
68}
69
70impl fmt::Debug for SubscriptionStream {
71 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
72 if let Some(topic) = self.topic.as_ref() {
73 write!(
74 fmt,
75 "SubscriptionStream {{ topic: {:?}, is_terminated: {} }}",
76 topic,
77 self.is_terminated()
78 )
79 } else {
80 write!(
81 fmt,
82 "SubscriptionStream {{ is_terminated: {} }}",
83 self.is_terminated()
84 )
85 }
86 }
87}
88
89impl Stream for SubscriptionStream {
90 type Item = GossipsubMessage;
91
92 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
93 use futures::stream::StreamExt;
94 let inner = &mut self.as_mut().inner;
95 match inner.poll_next_unpin(ctx) {
96 Poll::Ready(None) => {
97 self.on_drop.take();
100 Poll::Ready(None)
101 }
102 other => other,
103 }
104 }
105}
106
107impl FusedStream for SubscriptionStream {
108 fn is_terminated(&self) -> bool {
109 self.on_drop.is_none()
110 }
111}
112
113impl From<Gossipsub> for GossipsubStream {
114 fn from(gossipsub: Gossipsub) -> Self {
115 let (tx, rx) = channel::unbounded();
116 GossipsubStream {
117 streams: HashMap::new(),
118 gossipsub,
119 unsubscriptions: (tx, rx),
120 }
121 }
122}
123
124impl GossipsubStream {
125 pub fn subscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<SubscriptionStream> {
129 let topic = Topic::new(topic);
130
131 if self.streams.contains_key(&topic.hash()) {
132 anyhow::bail!("Already subscribed to topic")
133 }
134
135 if !self.gossipsub.subscribe(&topic)? {
136 anyhow::bail!("Already subscribed to topic")
137 }
138
139 let (tx, rx) = futures::channel::mpsc::channel(15000);
140 self.streams.insert(topic.hash(), tx);
141 Ok(SubscriptionStream {
142 on_drop: Some(self.unsubscriptions.0.clone()),
143 topic: Some(topic.hash()),
144 inner: rx,
145 })
146 }
147
148 pub fn unsubscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<bool> {
153 let topic = Topic::new(topic);
154
155 if !self.streams.contains_key(&topic.hash()) {
156 anyhow::bail!("Unable to unsubscribe from topic.")
157 }
158
159 self.streams
160 .remove(&topic.hash())
161 .expect("subscribed to topic");
162
163 self.gossipsub
164 .unsubscribe(&topic)
165 .map_err(anyhow::Error::from)
166 }
167
168 pub fn publish(
170 &mut self,
171 topic: impl Into<String>,
172 data: impl Into<Vec<u8>>,
173 ) -> Result<MessageId, PublishError> {
174 self.gossipsub.publish(Topic::new(topic), data)
175 }
176
177 pub fn known_peers(&self) -> Vec<PeerId> {
179 self.all_peers().map(|(peer, _)| *peer).collect()
180 }
181
182 pub fn subscribed_peers(&self, topic: impl Into<String>) -> Vec<PeerId> {
184 let topic = Topic::new(topic);
185 self.all_peers()
186 .filter(|(_, list)| list.contains(&&topic.hash()))
187 .map(|(peer_id, _)| *peer_id)
188 .collect()
189 }
190}
191
192impl NetworkBehaviour for GossipsubStream {
193 type ConnectionHandler = <Gossipsub as NetworkBehaviour>::ConnectionHandler;
194 type ToSwarm = GossipsubEvent;
195
196 fn handle_pending_outbound_connection(
197 &mut self,
198 connection_id: ConnectionId,
199 maybe_peer: Option<PeerId>,
200 addresses: &[Multiaddr],
201 effective_role: Endpoint,
202 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
203 self.gossipsub.handle_pending_outbound_connection(
204 connection_id,
205 maybe_peer,
206 addresses,
207 effective_role,
208 )
209 }
210
211 fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
212 self.gossipsub.on_swarm_event(event)
213 }
214
215 fn on_connection_handler_event(
216 &mut self,
217 peer_id: PeerId,
218 connection_id: libp2p::swarm::ConnectionId,
219 event: libp2p::swarm::THandlerOutEvent<Self>,
220 ) {
221 self.gossipsub
222 .on_connection_handler_event(peer_id, connection_id, event)
223 }
224
225 fn handle_established_inbound_connection(
226 &mut self,
227 connection_id: ConnectionId,
228 peer: PeerId,
229 local_addr: &Multiaddr,
230 remote_addr: &Multiaddr,
231 ) -> Result<THandler<Self>, ConnectionDenied> {
232 self.gossipsub.handle_established_inbound_connection(
233 connection_id,
234 peer,
235 local_addr,
236 remote_addr,
237 )
238 }
239
240 fn handle_established_outbound_connection(
241 &mut self,
242 connection_id: ConnectionId,
243 peer: PeerId,
244 addr: &Multiaddr,
245 role_override: Endpoint,
246 port_use: PortUse,
247 ) -> Result<THandler<Self>, ConnectionDenied> {
248 self.gossipsub.handle_established_outbound_connection(
249 connection_id,
250 peer,
251 addr,
252 role_override,
253 port_use,
254 )
255 }
256
257 fn poll(
258 &mut self,
259 ctx: &mut Context,
260 ) -> Poll<ToSwarm<libp2p::gossipsub::Event, THandlerInEvent<Self>>> {
261 use futures::stream::StreamExt;
262 use std::collections::hash_map::Entry;
263
264 loop {
265 match self.unsubscriptions.1.poll_next_unpin(ctx) {
266 Poll::Ready(Some(dropped)) => {
267 if let Some(mut sender) = self.streams.remove(&dropped) {
268 sender.close_channel();
269 debug!("unsubscribing via drop from {:?}", dropped);
270 assert!(
271 self.gossipsub
272 .unsubscribe(&Topic::new(dropped.to_string()))
273 .unwrap_or_default(),
274 "Failed to unsubscribe a dropped subscription"
275 );
276 }
277 }
278 Poll::Ready(None) => unreachable!("we own the sender"),
279 Poll::Pending => break,
280 }
281 }
282
283 loop {
284 match futures::ready!(self.gossipsub.poll(ctx)) {
285 ToSwarm::GenerateEvent(GossipsubEvent::Message { message, .. }) => {
286 let topic = message.topic.clone();
287 if let Entry::Occupied(mut oe) = self.streams.entry(topic) {
288 if let Err(e) = oe.get_mut().try_send(message) {
289 if e.is_full() {
290 continue;
291 }
292 let (topic, _) = oe.remove_entry();
294 debug!("unsubscribing via SendError from {:?}", &topic);
295 assert!(
296 self.gossipsub
297 .unsubscribe(&Topic::new(topic.to_string()))
298 .unwrap_or_default(),
299 "Failed to unsubscribe following SendError"
300 );
301 }
302 }
303 continue;
304 }
305 action => {
306 return Poll::Ready(action);
307 }
308 }
309 }
310 }
311}