floodsub/
layer.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
22use crate::topic::Topic;
23use crate::FloodsubConfig;
24use cuckoofilter::{CuckooError, CuckooFilter};
25use fnv::FnvHashSet;
26use tet_libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
27use tet_libp2p_swarm::{
28    NetworkBehaviour,
29    NetworkBehaviourAction,
30    PollParameters,
31    ProtocolsHandler,
32    OneShotHandler,
33    NotifyHandler,
34    DialPeerCondition,
35};
36use log::warn;
37use rand;
38use smallvec::SmallVec;
39use std::{collections::VecDeque, iter};
40use std::collections::hash_map::{DefaultHasher, HashMap};
41use std::task::{Context, Poll};
42
43/// Network behaviour that handles the floodsub protocol.
44pub struct Floodsub {
45    /// Events that need to be yielded to the outside when polling.
46    events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
47
48    config: FloodsubConfig,
49
50    /// List of peers to send messages to.
51    target_peers: FnvHashSet<PeerId>,
52
53    /// List of peers the network is connected to, and the topics that they're subscribed to.
54    // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
55    //       opened substreams
56    connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
57
58    // List of topics we're subscribed to. Necessary to filter out messages that we receive
59    // erroneously.
60    subscribed_topics: SmallVec<[Topic; 16]>,
61
62    // We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that
63    // we don't dispatch the same message twice if we receive it twice on the network.
64    received: CuckooFilter<DefaultHasher>,
65}
66
67impl Floodsub {
68    /// Creates a `Floodsub` with default configuration.
69    pub fn new(local_peer_id: PeerId) -> Self {
70        Self::from_config(FloodsubConfig::new(local_peer_id))
71    }
72
73    /// Creates a `Floodsub` with the given configuration.
74    pub fn from_config(config: FloodsubConfig) -> Self {
75        Floodsub {
76            events: VecDeque::new(),
77            config,
78            target_peers: FnvHashSet::default(),
79            connected_peers: HashMap::new(),
80            subscribed_topics: SmallVec::new(),
81            received: CuckooFilter::new(),
82        }
83    }
84
85    /// Add a node to the list of nodes to propagate messages to.
86    #[inline]
87    pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
88        // Send our topics to this node if we're already connected to it.
89        if self.connected_peers.contains_key(&peer_id) {
90            for topic in self.subscribed_topics.iter().cloned() {
91                self.events.push_back(NetworkBehaviourAction::NotifyHandler {
92                    peer_id: peer_id.clone(),
93                    handler: NotifyHandler::Any,
94                    event: FloodsubRpc {
95                        messages: Vec::new(),
96                        subscriptions: vec![FloodsubSubscription {
97                            topic,
98                            action: FloodsubSubscriptionAction::Subscribe,
99                        }],
100                    },
101                });
102            }
103        }
104
105        if self.target_peers.insert(peer_id.clone()) {
106            self.events.push_back(NetworkBehaviourAction::DialPeer {
107                peer_id, condition: DialPeerCondition::Disconnected
108            });
109        }
110    }
111
112    /// Remove a node from the list of nodes to propagate messages to.
113    #[inline]
114    pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
115        self.target_peers.remove(peer_id);
116    }
117
118    /// Subscribes to a topic.
119    ///
120    /// Returns true if the subscription worked. Returns false if we were already subscribed.
121    pub fn subscribe(&mut self, topic: Topic) -> bool {
122        if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
123            return false;
124        }
125
126        for peer in self.connected_peers.keys() {
127            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
128                peer_id: peer.clone(),
129                handler: NotifyHandler::Any,
130                event: FloodsubRpc {
131                    messages: Vec::new(),
132                    subscriptions: vec![FloodsubSubscription {
133                        topic: topic.clone(),
134                        action: FloodsubSubscriptionAction::Subscribe,
135                    }],
136                },
137            });
138        }
139
140        self.subscribed_topics.push(topic);
141        true
142    }
143
144    /// Unsubscribes from a topic.
145    ///
146    /// Note that this only requires the topic name.
147    ///
148    /// Returns true if we were subscribed to this topic.
149    pub fn unsubscribe(&mut self, topic: Topic) -> bool {
150        let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
151            Some(pos) => pos,
152            None => return false
153        };
154
155        self.subscribed_topics.remove(pos);
156
157        for peer in self.connected_peers.keys() {
158            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
159                peer_id: peer.clone(),
160                handler: NotifyHandler::Any,
161                event: FloodsubRpc {
162                    messages: Vec::new(),
163                    subscriptions: vec![FloodsubSubscription {
164                        topic: topic.clone(),
165                        action: FloodsubSubscriptionAction::Unsubscribe,
166                    }],
167                },
168            });
169        }
170
171        true
172    }
173
174    /// Publishes a message to the network, if we're subscribed to the topic only.
175    pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
176        self.publish_many(iter::once(topic), data)
177    }
178
179    /// Publishes a message to the network, even if we're not subscribed to the topic.
180    pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
181        self.publish_many_any(iter::once(topic), data)
182    }
183
184    /// Publishes a message with multiple topics to the network.
185    ///
186    ///
187    /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
188    pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
189        self.publish_many_inner(topic, data, true)
190    }
191
192    /// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics.
193    pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
194        self.publish_many_inner(topic, data, false)
195    }
196
197    fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
198        let message = FloodsubMessage {
199            source: self.config.local_peer_id.clone(),
200            data: data.into(),
201            // If the sequence numbers are predictable, then an attacker could flood the network
202            // with packets with the predetermined sequence numbers and absorb our legitimate
203            // messages. We therefore use a random number.
204            sequence_number: rand::random::<[u8; 20]>().to_vec(),
205            topics: topic.into_iter().map(Into::into).collect(),
206        };
207
208        let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
209        if self_subscribed {
210            if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
211                warn!(
212                    "Message was added to 'received' Cuckoofilter but some \
213                     other message was removed as a consequence: {}", e,
214                );
215            }
216            if self.config.subscribe_local_messages {
217                self.events.push_back(
218                    NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
219            }
220        }
221        // Don't publish the message if we have to check subscriptions
222        // and we're not subscribed ourselves to any of the topics.
223        if check_self_subscriptions && !self_subscribed {
224            return
225        }
226
227        // Send to peers we know are subscribed to the topic.
228        for (peer_id, sub_topic) in self.connected_peers.iter() {
229            if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
230                continue;
231            }
232
233            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
234                peer_id: peer_id.clone(),
235                handler: NotifyHandler::Any,
236                event: FloodsubRpc {
237                    subscriptions: Vec::new(),
238                    messages: vec![message.clone()],
239                }
240            });
241        }
242    }
243}
244
245impl NetworkBehaviour for Floodsub {
246    type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
247    type OutEvent = FloodsubEvent;
248
249    fn new_handler(&mut self) -> Self::ProtocolsHandler {
250        Default::default()
251    }
252
253    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
254        Vec::new()
255    }
256
257    fn inject_connected(&mut self, id: &PeerId) {
258        // We need to send our subscriptions to the newly-connected node.
259        if self.target_peers.contains(id) {
260            for topic in self.subscribed_topics.iter().cloned() {
261                self.events.push_back(NetworkBehaviourAction::NotifyHandler {
262                    peer_id: id.clone(),
263                    handler: NotifyHandler::Any,
264                    event: FloodsubRpc {
265                        messages: Vec::new(),
266                        subscriptions: vec![FloodsubSubscription {
267                            topic,
268                            action: FloodsubSubscriptionAction::Subscribe,
269                        }],
270                    },
271                });
272            }
273        }
274
275        self.connected_peers.insert(id.clone(), SmallVec::new());
276    }
277
278    fn inject_disconnected(&mut self, id: &PeerId) {
279        let was_in = self.connected_peers.remove(id);
280        debug_assert!(was_in.is_some());
281
282        // We can be disconnected by the remote in case of inactivity for example, so we always
283        // try to reconnect.
284        if self.target_peers.contains(id) {
285            self.events.push_back(NetworkBehaviourAction::DialPeer {
286                peer_id: id.clone(),
287                condition: DialPeerCondition::Disconnected
288            });
289        }
290    }
291
292    fn inject_event(
293        &mut self,
294        propagation_source: PeerId,
295        _connection: ConnectionId,
296        event: InnerMessage,
297    ) {
298        // We ignore successful sends or timeouts.
299        let event = match event {
300            InnerMessage::Rx(event) => event,
301            InnerMessage::Sent => return,
302        };
303
304        // Update connected peers topics
305        for subscription in event.subscriptions {
306            let remote_peer_topics = self.connected_peers
307                .get_mut(&propagation_source)
308                .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
309            match subscription.action {
310                FloodsubSubscriptionAction::Subscribe => {
311                    if !remote_peer_topics.contains(&subscription.topic) {
312                        remote_peer_topics.push(subscription.topic.clone());
313                    }
314                    self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
315                        peer_id: propagation_source.clone(),
316                        topic: subscription.topic,
317                    }));
318                }
319                FloodsubSubscriptionAction::Unsubscribe => {
320                    if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
321                        remote_peer_topics.remove(pos);
322                    }
323                    self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
324                        peer_id: propagation_source.clone(),
325                        topic: subscription.topic,
326                    }));
327                }
328            }
329        }
330
331        // List of messages we're going to propagate on the network.
332        let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
333
334        for message in event.messages {
335            // Use `self.received` to skip the messages that we have already received in the past.
336            // Note that this can result in false positives.
337            match self.received.test_and_add(&message) {
338                Ok(true) => {}, // Message  was added.
339                Ok(false) => continue, // Message already existed.
340                Err(e @ CuckooError::NotEnoughSpace) => { // Message added, but some other removed.
341                    warn!(
342                        "Message was added to 'received' Cuckoofilter but some \
343                         other message was removed as a consequence: {}", e,
344                    );
345                }
346            }
347
348            // Add the message to be dispatched to the user.
349            if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
350                let event = FloodsubEvent::Message(message.clone());
351                self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
352            }
353
354            // Propagate the message to everyone else who is subscribed to any of the topics.
355            for (peer_id, subscr_topics) in self.connected_peers.iter() {
356                if peer_id == &propagation_source {
357                    continue;
358                }
359
360                if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
361                    continue;
362                }
363
364                if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
365                    rpcs_to_dispatch[pos].1.messages.push(message.clone());
366                } else {
367                    rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
368                        subscriptions: Vec::new(),
369                        messages: vec![message.clone()],
370                    }));
371                }
372            }
373        }
374
375        for (peer_id, rpc) in rpcs_to_dispatch {
376            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
377                peer_id,
378                handler: NotifyHandler::Any,
379                event: rpc,
380            });
381        }
382    }
383
384    fn poll(
385        &mut self,
386        _: &mut Context<'_>,
387        _: &mut impl PollParameters,
388    ) -> Poll<
389        NetworkBehaviourAction<
390            <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
391            Self::OutEvent,
392        >,
393    > {
394        if let Some(event) = self.events.pop_front() {
395            return Poll::Ready(event);
396        }
397
398        Poll::Pending
399    }
400}
401
402/// Transmission between the `OneShotHandler` and the `FloodsubHandler`.
403pub enum InnerMessage {
404    /// We received an RPC from a remote.
405    Rx(FloodsubRpc),
406    /// We successfully sent an RPC request.
407    Sent,
408}
409
410impl From<FloodsubRpc> for InnerMessage {
411    #[inline]
412    fn from(rpc: FloodsubRpc) -> InnerMessage {
413        InnerMessage::Rx(rpc)
414    }
415}
416
417impl From<()> for InnerMessage {
418    #[inline]
419    fn from(_: ()) -> InnerMessage {
420        InnerMessage::Sent
421    }
422}
423
424/// Event that can happen on the floodsub behaviour.
425#[derive(Debug)]
426pub enum FloodsubEvent {
427    /// A message has been received.
428    Message(FloodsubMessage),
429
430    /// A remote subscribed to a topic.
431    Subscribed {
432        /// Remote that has subscribed.
433        peer_id: PeerId,
434        /// The topic it has subscribed to.
435        topic: Topic,
436    },
437
438    /// A remote unsubscribed from a topic.
439    Unsubscribed {
440        /// Remote that has unsubscribed.
441        peer_id: PeerId,
442        /// The topic it has subscribed from.
443        topic: Topic,
444    },
445}