libp2p_floodsub/
layer.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
22use crate::topic::Topic;
23use crate::FloodsubConfig;
24use cuckoofilter::{CuckooError, CuckooFilter};
25use fnv::FnvHashSet;
26use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
27use libp2p_swarm::{
28    NetworkBehaviour,
29    NetworkBehaviourAction,
30    PollParameters,
31    ProtocolsHandler,
32    OneShotHandler,
33    NotifyHandler,
34    DialPeerCondition,
35};
36use log::warn;
37use smallvec::SmallVec;
38use std::{collections::VecDeque, iter};
39use std::collections::hash_map::{DefaultHasher, HashMap};
40use std::task::{Context, Poll};
41
42/// Network behaviour that handles the floodsub protocol.
43pub struct Floodsub {
44    /// Events that need to be yielded to the outside when polling.
45    events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
46
47    config: FloodsubConfig,
48
49    /// List of peers to send messages to.
50    target_peers: FnvHashSet<PeerId>,
51
52    /// List of peers the network is connected to, and the topics that they're subscribed to.
53    // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
54    //       opened substreams
55    connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
56
57    // List of topics we're subscribed to. Necessary to filter out messages that we receive
58    // erroneously.
59    subscribed_topics: SmallVec<[Topic; 16]>,
60
61    // We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that
62    // we don't dispatch the same message twice if we receive it twice on the network.
63    received: CuckooFilter<DefaultHasher>,
64}
65
66impl Floodsub {
67    /// Creates a `Floodsub` with default configuration.
68    pub fn new(local_peer_id: PeerId) -> Self {
69        Self::from_config(FloodsubConfig::new(local_peer_id))
70    }
71
72    /// Creates a `Floodsub` with the given configuration.
73    pub fn from_config(config: FloodsubConfig) -> Self {
74        Floodsub {
75            events: VecDeque::new(),
76            config,
77            target_peers: FnvHashSet::default(),
78            connected_peers: HashMap::new(),
79            subscribed_topics: SmallVec::new(),
80            received: CuckooFilter::new(),
81        }
82    }
83
84    /// Add a node to the list of nodes to propagate messages to.
85    #[inline]
86    pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
87        // Send our topics to this node if we're already connected to it.
88        if self.connected_peers.contains_key(&peer_id) {
89            for topic in self.subscribed_topics.iter().cloned() {
90                self.events.push_back(NetworkBehaviourAction::NotifyHandler {
91                    peer_id,
92                    handler: NotifyHandler::Any,
93                    event: FloodsubRpc {
94                        messages: Vec::new(),
95                        subscriptions: vec![FloodsubSubscription {
96                            topic,
97                            action: FloodsubSubscriptionAction::Subscribe,
98                        }],
99                    },
100                });
101            }
102        }
103
104        if self.target_peers.insert(peer_id) {
105            self.events.push_back(NetworkBehaviourAction::DialPeer {
106                peer_id, condition: DialPeerCondition::Disconnected
107            });
108        }
109    }
110
111    /// Remove a node from the list of nodes to propagate messages to.
112    #[inline]
113    pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
114        self.target_peers.remove(peer_id);
115    }
116
117    /// Subscribes to a topic.
118    ///
119    /// Returns true if the subscription worked. Returns false if we were already subscribed.
120    pub fn subscribe(&mut self, topic: Topic) -> bool {
121        if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
122            return false;
123        }
124
125        for peer in self.connected_peers.keys() {
126            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
127                peer_id: *peer,
128                handler: NotifyHandler::Any,
129                event: FloodsubRpc {
130                    messages: Vec::new(),
131                    subscriptions: vec![FloodsubSubscription {
132                        topic: topic.clone(),
133                        action: FloodsubSubscriptionAction::Subscribe,
134                    }],
135                },
136            });
137        }
138
139        self.subscribed_topics.push(topic);
140        true
141    }
142
143    /// Unsubscribes from a topic.
144    ///
145    /// Note that this only requires the topic name.
146    ///
147    /// Returns true if we were subscribed to this topic.
148    pub fn unsubscribe(&mut self, topic: Topic) -> bool {
149        let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
150            Some(pos) => pos,
151            None => return false
152        };
153
154        self.subscribed_topics.remove(pos);
155
156        for peer in self.connected_peers.keys() {
157            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
158                peer_id: *peer,
159                handler: NotifyHandler::Any,
160                event: FloodsubRpc {
161                    messages: Vec::new(),
162                    subscriptions: vec![FloodsubSubscription {
163                        topic: topic.clone(),
164                        action: FloodsubSubscriptionAction::Unsubscribe,
165                    }],
166                },
167            });
168        }
169
170        true
171    }
172
173    /// Publishes a message to the network, if we're subscribed to the topic only.
174    pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
175        self.publish_many(iter::once(topic), data)
176    }
177
178    /// Publishes a message to the network, even if we're not subscribed to the topic.
179    pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
180        self.publish_many_any(iter::once(topic), data)
181    }
182
183    /// Publishes a message with multiple topics to the network.
184    ///
185    ///
186    /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
187    pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
188        self.publish_many_inner(topic, data, true)
189    }
190
191    /// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics.
192    pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
193        self.publish_many_inner(topic, data, false)
194    }
195
196    fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
197        let message = FloodsubMessage {
198            source: self.config.local_peer_id,
199            data: data.into(),
200            // If the sequence numbers are predictable, then an attacker could flood the network
201            // with packets with the predetermined sequence numbers and absorb our legitimate
202            // messages. We therefore use a random number.
203            sequence_number: rand::random::<[u8; 20]>().to_vec(),
204            topics: topic.into_iter().map(Into::into).collect(),
205        };
206
207        let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
208        if self_subscribed {
209            if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
210                warn!(
211                    "Message was added to 'received' Cuckoofilter but some \
212                     other message was removed as a consequence: {}", e,
213                );
214            }
215            if self.config.subscribe_local_messages {
216                self.events.push_back(
217                    NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
218            }
219        }
220        // Don't publish the message if we have to check subscriptions
221        // and we're not subscribed ourselves to any of the topics.
222        if check_self_subscriptions && !self_subscribed {
223            return
224        }
225
226        // Send to peers we know are subscribed to the topic.
227        for (peer_id, sub_topic) in self.connected_peers.iter() {
228            if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
229                continue;
230            }
231
232            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
233                peer_id: *peer_id,
234                handler: NotifyHandler::Any,
235                event: FloodsubRpc {
236                    subscriptions: Vec::new(),
237                    messages: vec![message.clone()],
238                }
239            });
240        }
241    }
242}
243
244impl NetworkBehaviour for Floodsub {
245    type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
246    type OutEvent = FloodsubEvent;
247
248    fn new_handler(&mut self) -> Self::ProtocolsHandler {
249        Default::default()
250    }
251
252    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
253        Vec::new()
254    }
255
256    fn inject_connected(&mut self, id: &PeerId) {
257        // We need to send our subscriptions to the newly-connected node.
258        if self.target_peers.contains(id) {
259            for topic in self.subscribed_topics.iter().cloned() {
260                self.events.push_back(NetworkBehaviourAction::NotifyHandler {
261                    peer_id: *id,
262                    handler: NotifyHandler::Any,
263                    event: FloodsubRpc {
264                        messages: Vec::new(),
265                        subscriptions: vec![FloodsubSubscription {
266                            topic,
267                            action: FloodsubSubscriptionAction::Subscribe,
268                        }],
269                    },
270                });
271            }
272        }
273
274        self.connected_peers.insert(*id, SmallVec::new());
275    }
276
277    fn inject_disconnected(&mut self, id: &PeerId) {
278        let was_in = self.connected_peers.remove(id);
279        debug_assert!(was_in.is_some());
280
281        // We can be disconnected by the remote in case of inactivity for example, so we always
282        // try to reconnect.
283        if self.target_peers.contains(id) {
284            self.events.push_back(NetworkBehaviourAction::DialPeer {
285                peer_id: *id,
286                condition: DialPeerCondition::Disconnected
287            });
288        }
289    }
290
291    fn inject_event(
292        &mut self,
293        propagation_source: PeerId,
294        _connection: ConnectionId,
295        event: InnerMessage,
296    ) {
297        // We ignore successful sends or timeouts.
298        let event = match event {
299            InnerMessage::Rx(event) => event,
300            InnerMessage::Sent => return,
301        };
302
303        // Update connected peers topics
304        for subscription in event.subscriptions {
305            let remote_peer_topics = self.connected_peers
306                .get_mut(&propagation_source)
307                .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
308            match subscription.action {
309                FloodsubSubscriptionAction::Subscribe => {
310                    if !remote_peer_topics.contains(&subscription.topic) {
311                        remote_peer_topics.push(subscription.topic.clone());
312                    }
313                    self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
314                        peer_id: propagation_source,
315                        topic: subscription.topic,
316                    }));
317                }
318                FloodsubSubscriptionAction::Unsubscribe => {
319                    if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
320                        remote_peer_topics.remove(pos);
321                    }
322                    self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
323                        peer_id: propagation_source,
324                        topic: subscription.topic,
325                    }));
326                }
327            }
328        }
329
330        // List of messages we're going to propagate on the network.
331        let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
332
333        for message in event.messages {
334            // Use `self.received` to skip the messages that we have already received in the past.
335            // Note that this can result in false positives.
336            match self.received.test_and_add(&message) {
337                Ok(true) => {}, // Message  was added.
338                Ok(false) => continue, // Message already existed.
339                Err(e @ CuckooError::NotEnoughSpace) => { // Message added, but some other removed.
340                    warn!(
341                        "Message was added to 'received' Cuckoofilter but some \
342                         other message was removed as a consequence: {}", e,
343                    );
344                }
345            }
346
347            // Add the message to be dispatched to the user.
348            if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
349                let event = FloodsubEvent::Message(message.clone());
350                self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
351            }
352
353            // Propagate the message to everyone else who is subscribed to any of the topics.
354            for (peer_id, subscr_topics) in self.connected_peers.iter() {
355                if peer_id == &propagation_source {
356                    continue;
357                }
358
359                if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
360                    continue;
361                }
362
363                if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
364                    rpcs_to_dispatch[pos].1.messages.push(message.clone());
365                } else {
366                    rpcs_to_dispatch.push((*peer_id, FloodsubRpc {
367                        subscriptions: Vec::new(),
368                        messages: vec![message.clone()],
369                    }));
370                }
371            }
372        }
373
374        for (peer_id, rpc) in rpcs_to_dispatch {
375            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
376                peer_id,
377                handler: NotifyHandler::Any,
378                event: rpc,
379            });
380        }
381    }
382
383    fn poll(
384        &mut self,
385        _: &mut Context<'_>,
386        _: &mut impl PollParameters,
387    ) -> Poll<
388        NetworkBehaviourAction<
389            <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
390            Self::OutEvent,
391        >,
392    > {
393        if let Some(event) = self.events.pop_front() {
394            return Poll::Ready(event);
395        }
396
397        Poll::Pending
398    }
399}
400
401/// Transmission between the `OneShotHandler` and the `FloodsubHandler`.
402pub enum InnerMessage {
403    /// We received an RPC from a remote.
404    Rx(FloodsubRpc),
405    /// We successfully sent an RPC request.
406    Sent,
407}
408
409impl From<FloodsubRpc> for InnerMessage {
410    #[inline]
411    fn from(rpc: FloodsubRpc) -> InnerMessage {
412        InnerMessage::Rx(rpc)
413    }
414}
415
416impl From<()> for InnerMessage {
417    #[inline]
418    fn from(_: ()) -> InnerMessage {
419        InnerMessage::Sent
420    }
421}
422
423/// Event that can happen on the floodsub behaviour.
424#[derive(Debug)]
425pub enum FloodsubEvent {
426    /// A message has been received.
427    Message(FloodsubMessage),
428
429    /// A remote subscribed to a topic.
430    Subscribed {
431        /// Remote that has subscribed.
432        peer_id: PeerId,
433        /// The topic it has subscribed to.
434        topic: Topic,
435    },
436
437    /// A remote unsubscribed from a topic.
438    Unsubscribed {
439        /// Remote that has unsubscribed.
440        peer_id: PeerId,
441        /// The topic it has subscribed from.
442        topic: Topic,
443    },
444}