1use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
22use crate::topic::Topic;
23use crate::FloodsubConfig;
24use cuckoofilter::{CuckooError, CuckooFilter};
25use fnv::FnvHashSet;
26use tet_libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
27use tet_libp2p_swarm::{
28 NetworkBehaviour,
29 NetworkBehaviourAction,
30 PollParameters,
31 ProtocolsHandler,
32 OneShotHandler,
33 NotifyHandler,
34 DialPeerCondition,
35};
36use log::warn;
37use rand;
38use smallvec::SmallVec;
39use std::{collections::VecDeque, iter};
40use std::collections::hash_map::{DefaultHasher, HashMap};
41use std::task::{Context, Poll};
42
43pub struct Floodsub {
45 events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
47
48 config: FloodsubConfig,
49
50 target_peers: FnvHashSet<PeerId>,
52
53 connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
57
58 subscribed_topics: SmallVec<[Topic; 16]>,
61
62 received: CuckooFilter<DefaultHasher>,
65}
66
67impl Floodsub {
68 pub fn new(local_peer_id: PeerId) -> Self {
70 Self::from_config(FloodsubConfig::new(local_peer_id))
71 }
72
73 pub fn from_config(config: FloodsubConfig) -> Self {
75 Floodsub {
76 events: VecDeque::new(),
77 config,
78 target_peers: FnvHashSet::default(),
79 connected_peers: HashMap::new(),
80 subscribed_topics: SmallVec::new(),
81 received: CuckooFilter::new(),
82 }
83 }
84
85 #[inline]
87 pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
88 if self.connected_peers.contains_key(&peer_id) {
90 for topic in self.subscribed_topics.iter().cloned() {
91 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
92 peer_id: peer_id.clone(),
93 handler: NotifyHandler::Any,
94 event: FloodsubRpc {
95 messages: Vec::new(),
96 subscriptions: vec![FloodsubSubscription {
97 topic,
98 action: FloodsubSubscriptionAction::Subscribe,
99 }],
100 },
101 });
102 }
103 }
104
105 if self.target_peers.insert(peer_id.clone()) {
106 self.events.push_back(NetworkBehaviourAction::DialPeer {
107 peer_id, condition: DialPeerCondition::Disconnected
108 });
109 }
110 }
111
112 #[inline]
114 pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
115 self.target_peers.remove(peer_id);
116 }
117
118 pub fn subscribe(&mut self, topic: Topic) -> bool {
122 if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
123 return false;
124 }
125
126 for peer in self.connected_peers.keys() {
127 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
128 peer_id: peer.clone(),
129 handler: NotifyHandler::Any,
130 event: FloodsubRpc {
131 messages: Vec::new(),
132 subscriptions: vec![FloodsubSubscription {
133 topic: topic.clone(),
134 action: FloodsubSubscriptionAction::Subscribe,
135 }],
136 },
137 });
138 }
139
140 self.subscribed_topics.push(topic);
141 true
142 }
143
144 pub fn unsubscribe(&mut self, topic: Topic) -> bool {
150 let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
151 Some(pos) => pos,
152 None => return false
153 };
154
155 self.subscribed_topics.remove(pos);
156
157 for peer in self.connected_peers.keys() {
158 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
159 peer_id: peer.clone(),
160 handler: NotifyHandler::Any,
161 event: FloodsubRpc {
162 messages: Vec::new(),
163 subscriptions: vec![FloodsubSubscription {
164 topic: topic.clone(),
165 action: FloodsubSubscriptionAction::Unsubscribe,
166 }],
167 },
168 });
169 }
170
171 true
172 }
173
174 pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
176 self.publish_many(iter::once(topic), data)
177 }
178
179 pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
181 self.publish_many_any(iter::once(topic), data)
182 }
183
184 pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
189 self.publish_many_inner(topic, data, true)
190 }
191
192 pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
194 self.publish_many_inner(topic, data, false)
195 }
196
197 fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
198 let message = FloodsubMessage {
199 source: self.config.local_peer_id.clone(),
200 data: data.into(),
201 sequence_number: rand::random::<[u8; 20]>().to_vec(),
205 topics: topic.into_iter().map(Into::into).collect(),
206 };
207
208 let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
209 if self_subscribed {
210 if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
211 warn!(
212 "Message was added to 'received' Cuckoofilter but some \
213 other message was removed as a consequence: {}", e,
214 );
215 }
216 if self.config.subscribe_local_messages {
217 self.events.push_back(
218 NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
219 }
220 }
221 if check_self_subscriptions && !self_subscribed {
224 return
225 }
226
227 for (peer_id, sub_topic) in self.connected_peers.iter() {
229 if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
230 continue;
231 }
232
233 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
234 peer_id: peer_id.clone(),
235 handler: NotifyHandler::Any,
236 event: FloodsubRpc {
237 subscriptions: Vec::new(),
238 messages: vec![message.clone()],
239 }
240 });
241 }
242 }
243}
244
245impl NetworkBehaviour for Floodsub {
246 type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
247 type OutEvent = FloodsubEvent;
248
249 fn new_handler(&mut self) -> Self::ProtocolsHandler {
250 Default::default()
251 }
252
253 fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
254 Vec::new()
255 }
256
257 fn inject_connected(&mut self, id: &PeerId) {
258 if self.target_peers.contains(id) {
260 for topic in self.subscribed_topics.iter().cloned() {
261 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
262 peer_id: id.clone(),
263 handler: NotifyHandler::Any,
264 event: FloodsubRpc {
265 messages: Vec::new(),
266 subscriptions: vec![FloodsubSubscription {
267 topic,
268 action: FloodsubSubscriptionAction::Subscribe,
269 }],
270 },
271 });
272 }
273 }
274
275 self.connected_peers.insert(id.clone(), SmallVec::new());
276 }
277
278 fn inject_disconnected(&mut self, id: &PeerId) {
279 let was_in = self.connected_peers.remove(id);
280 debug_assert!(was_in.is_some());
281
282 if self.target_peers.contains(id) {
285 self.events.push_back(NetworkBehaviourAction::DialPeer {
286 peer_id: id.clone(),
287 condition: DialPeerCondition::Disconnected
288 });
289 }
290 }
291
292 fn inject_event(
293 &mut self,
294 propagation_source: PeerId,
295 _connection: ConnectionId,
296 event: InnerMessage,
297 ) {
298 let event = match event {
300 InnerMessage::Rx(event) => event,
301 InnerMessage::Sent => return,
302 };
303
304 for subscription in event.subscriptions {
306 let remote_peer_topics = self.connected_peers
307 .get_mut(&propagation_source)
308 .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
309 match subscription.action {
310 FloodsubSubscriptionAction::Subscribe => {
311 if !remote_peer_topics.contains(&subscription.topic) {
312 remote_peer_topics.push(subscription.topic.clone());
313 }
314 self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
315 peer_id: propagation_source.clone(),
316 topic: subscription.topic,
317 }));
318 }
319 FloodsubSubscriptionAction::Unsubscribe => {
320 if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
321 remote_peer_topics.remove(pos);
322 }
323 self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
324 peer_id: propagation_source.clone(),
325 topic: subscription.topic,
326 }));
327 }
328 }
329 }
330
331 let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
333
334 for message in event.messages {
335 match self.received.test_and_add(&message) {
338 Ok(true) => {}, Ok(false) => continue, Err(e @ CuckooError::NotEnoughSpace) => { warn!(
342 "Message was added to 'received' Cuckoofilter but some \
343 other message was removed as a consequence: {}", e,
344 );
345 }
346 }
347
348 if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
350 let event = FloodsubEvent::Message(message.clone());
351 self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
352 }
353
354 for (peer_id, subscr_topics) in self.connected_peers.iter() {
356 if peer_id == &propagation_source {
357 continue;
358 }
359
360 if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
361 continue;
362 }
363
364 if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
365 rpcs_to_dispatch[pos].1.messages.push(message.clone());
366 } else {
367 rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
368 subscriptions: Vec::new(),
369 messages: vec![message.clone()],
370 }));
371 }
372 }
373 }
374
375 for (peer_id, rpc) in rpcs_to_dispatch {
376 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
377 peer_id,
378 handler: NotifyHandler::Any,
379 event: rpc,
380 });
381 }
382 }
383
384 fn poll(
385 &mut self,
386 _: &mut Context<'_>,
387 _: &mut impl PollParameters,
388 ) -> Poll<
389 NetworkBehaviourAction<
390 <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
391 Self::OutEvent,
392 >,
393 > {
394 if let Some(event) = self.events.pop_front() {
395 return Poll::Ready(event);
396 }
397
398 Poll::Pending
399 }
400}
401
402pub enum InnerMessage {
404 Rx(FloodsubRpc),
406 Sent,
408}
409
410impl From<FloodsubRpc> for InnerMessage {
411 #[inline]
412 fn from(rpc: FloodsubRpc) -> InnerMessage {
413 InnerMessage::Rx(rpc)
414 }
415}
416
417impl From<()> for InnerMessage {
418 #[inline]
419 fn from(_: ()) -> InnerMessage {
420 InnerMessage::Sent
421 }
422}
423
424#[derive(Debug)]
426pub enum FloodsubEvent {
427 Message(FloodsubMessage),
429
430 Subscribed {
432 peer_id: PeerId,
434 topic: Topic,
436 },
437
438 Unsubscribed {
440 peer_id: PeerId,
442 topic: Topic,
444 },
445}