1use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
22use crate::topic::Topic;
23use crate::FloodsubConfig;
24use cuckoofilter::{CuckooError, CuckooFilter};
25use fnv::FnvHashSet;
26use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
27use libp2p_swarm::{
28 NetworkBehaviour,
29 NetworkBehaviourAction,
30 PollParameters,
31 ProtocolsHandler,
32 OneShotHandler,
33 NotifyHandler,
34 DialPeerCondition,
35};
36use log::warn;
37use smallvec::SmallVec;
38use std::{collections::VecDeque, iter};
39use std::collections::hash_map::{DefaultHasher, HashMap};
40use std::task::{Context, Poll};
41
42pub struct Floodsub {
44 events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
46
47 config: FloodsubConfig,
48
49 target_peers: FnvHashSet<PeerId>,
51
52 connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
56
57 subscribed_topics: SmallVec<[Topic; 16]>,
60
61 received: CuckooFilter<DefaultHasher>,
64}
65
66impl Floodsub {
67 pub fn new(local_peer_id: PeerId) -> Self {
69 Self::from_config(FloodsubConfig::new(local_peer_id))
70 }
71
72 pub fn from_config(config: FloodsubConfig) -> Self {
74 Floodsub {
75 events: VecDeque::new(),
76 config,
77 target_peers: FnvHashSet::default(),
78 connected_peers: HashMap::new(),
79 subscribed_topics: SmallVec::new(),
80 received: CuckooFilter::new(),
81 }
82 }
83
84 #[inline]
86 pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
87 if self.connected_peers.contains_key(&peer_id) {
89 for topic in self.subscribed_topics.iter().cloned() {
90 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
91 peer_id,
92 handler: NotifyHandler::Any,
93 event: FloodsubRpc {
94 messages: Vec::new(),
95 subscriptions: vec![FloodsubSubscription {
96 topic,
97 action: FloodsubSubscriptionAction::Subscribe,
98 }],
99 },
100 });
101 }
102 }
103
104 if self.target_peers.insert(peer_id) {
105 self.events.push_back(NetworkBehaviourAction::DialPeer {
106 peer_id, condition: DialPeerCondition::Disconnected
107 });
108 }
109 }
110
111 #[inline]
113 pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
114 self.target_peers.remove(peer_id);
115 }
116
117 pub fn subscribe(&mut self, topic: Topic) -> bool {
121 if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
122 return false;
123 }
124
125 for peer in self.connected_peers.keys() {
126 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
127 peer_id: *peer,
128 handler: NotifyHandler::Any,
129 event: FloodsubRpc {
130 messages: Vec::new(),
131 subscriptions: vec![FloodsubSubscription {
132 topic: topic.clone(),
133 action: FloodsubSubscriptionAction::Subscribe,
134 }],
135 },
136 });
137 }
138
139 self.subscribed_topics.push(topic);
140 true
141 }
142
143 pub fn unsubscribe(&mut self, topic: Topic) -> bool {
149 let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
150 Some(pos) => pos,
151 None => return false
152 };
153
154 self.subscribed_topics.remove(pos);
155
156 for peer in self.connected_peers.keys() {
157 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
158 peer_id: *peer,
159 handler: NotifyHandler::Any,
160 event: FloodsubRpc {
161 messages: Vec::new(),
162 subscriptions: vec![FloodsubSubscription {
163 topic: topic.clone(),
164 action: FloodsubSubscriptionAction::Unsubscribe,
165 }],
166 },
167 });
168 }
169
170 true
171 }
172
173 pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
175 self.publish_many(iter::once(topic), data)
176 }
177
178 pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
180 self.publish_many_any(iter::once(topic), data)
181 }
182
183 pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
188 self.publish_many_inner(topic, data, true)
189 }
190
191 pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
193 self.publish_many_inner(topic, data, false)
194 }
195
196 fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
197 let message = FloodsubMessage {
198 source: self.config.local_peer_id,
199 data: data.into(),
200 sequence_number: rand::random::<[u8; 20]>().to_vec(),
204 topics: topic.into_iter().map(Into::into).collect(),
205 };
206
207 let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
208 if self_subscribed {
209 if let Err(e @ CuckooError::NotEnoughSpace) = self.received.add(&message) {
210 warn!(
211 "Message was added to 'received' Cuckoofilter but some \
212 other message was removed as a consequence: {}", e,
213 );
214 }
215 if self.config.subscribe_local_messages {
216 self.events.push_back(
217 NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
218 }
219 }
220 if check_self_subscriptions && !self_subscribed {
223 return
224 }
225
226 for (peer_id, sub_topic) in self.connected_peers.iter() {
228 if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
229 continue;
230 }
231
232 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
233 peer_id: *peer_id,
234 handler: NotifyHandler::Any,
235 event: FloodsubRpc {
236 subscriptions: Vec::new(),
237 messages: vec![message.clone()],
238 }
239 });
240 }
241 }
242}
243
244impl NetworkBehaviour for Floodsub {
245 type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
246 type OutEvent = FloodsubEvent;
247
248 fn new_handler(&mut self) -> Self::ProtocolsHandler {
249 Default::default()
250 }
251
252 fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
253 Vec::new()
254 }
255
256 fn inject_connected(&mut self, id: &PeerId) {
257 if self.target_peers.contains(id) {
259 for topic in self.subscribed_topics.iter().cloned() {
260 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
261 peer_id: *id,
262 handler: NotifyHandler::Any,
263 event: FloodsubRpc {
264 messages: Vec::new(),
265 subscriptions: vec![FloodsubSubscription {
266 topic,
267 action: FloodsubSubscriptionAction::Subscribe,
268 }],
269 },
270 });
271 }
272 }
273
274 self.connected_peers.insert(*id, SmallVec::new());
275 }
276
277 fn inject_disconnected(&mut self, id: &PeerId) {
278 let was_in = self.connected_peers.remove(id);
279 debug_assert!(was_in.is_some());
280
281 if self.target_peers.contains(id) {
284 self.events.push_back(NetworkBehaviourAction::DialPeer {
285 peer_id: *id,
286 condition: DialPeerCondition::Disconnected
287 });
288 }
289 }
290
291 fn inject_event(
292 &mut self,
293 propagation_source: PeerId,
294 _connection: ConnectionId,
295 event: InnerMessage,
296 ) {
297 let event = match event {
299 InnerMessage::Rx(event) => event,
300 InnerMessage::Sent => return,
301 };
302
303 for subscription in event.subscriptions {
305 let remote_peer_topics = self.connected_peers
306 .get_mut(&propagation_source)
307 .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
308 match subscription.action {
309 FloodsubSubscriptionAction::Subscribe => {
310 if !remote_peer_topics.contains(&subscription.topic) {
311 remote_peer_topics.push(subscription.topic.clone());
312 }
313 self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
314 peer_id: propagation_source,
315 topic: subscription.topic,
316 }));
317 }
318 FloodsubSubscriptionAction::Unsubscribe => {
319 if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
320 remote_peer_topics.remove(pos);
321 }
322 self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
323 peer_id: propagation_source,
324 topic: subscription.topic,
325 }));
326 }
327 }
328 }
329
330 let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
332
333 for message in event.messages {
334 match self.received.test_and_add(&message) {
337 Ok(true) => {}, Ok(false) => continue, Err(e @ CuckooError::NotEnoughSpace) => { warn!(
341 "Message was added to 'received' Cuckoofilter but some \
342 other message was removed as a consequence: {}", e,
343 );
344 }
345 }
346
347 if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
349 let event = FloodsubEvent::Message(message.clone());
350 self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
351 }
352
353 for (peer_id, subscr_topics) in self.connected_peers.iter() {
355 if peer_id == &propagation_source {
356 continue;
357 }
358
359 if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
360 continue;
361 }
362
363 if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
364 rpcs_to_dispatch[pos].1.messages.push(message.clone());
365 } else {
366 rpcs_to_dispatch.push((*peer_id, FloodsubRpc {
367 subscriptions: Vec::new(),
368 messages: vec![message.clone()],
369 }));
370 }
371 }
372 }
373
374 for (peer_id, rpc) in rpcs_to_dispatch {
375 self.events.push_back(NetworkBehaviourAction::NotifyHandler {
376 peer_id,
377 handler: NotifyHandler::Any,
378 event: rpc,
379 });
380 }
381 }
382
383 fn poll(
384 &mut self,
385 _: &mut Context<'_>,
386 _: &mut impl PollParameters,
387 ) -> Poll<
388 NetworkBehaviourAction<
389 <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
390 Self::OutEvent,
391 >,
392 > {
393 if let Some(event) = self.events.pop_front() {
394 return Poll::Ready(event);
395 }
396
397 Poll::Pending
398 }
399}
400
401pub enum InnerMessage {
403 Rx(FloodsubRpc),
405 Sent,
407}
408
409impl From<FloodsubRpc> for InnerMessage {
410 #[inline]
411 fn from(rpc: FloodsubRpc) -> InnerMessage {
412 InnerMessage::Rx(rpc)
413 }
414}
415
416impl From<()> for InnerMessage {
417 #[inline]
418 fn from(_: ()) -> InnerMessage {
419 InnerMessage::Sent
420 }
421}
422
423#[derive(Debug)]
425pub enum FloodsubEvent {
426 Message(FloodsubMessage),
428
429 Subscribed {
431 peer_id: PeerId,
433 topic: Topic,
435 },
436
437 Unsubscribed {
439 peer_id: PeerId,
441 topic: Topic,
443 },
444}