Skip to main content

soil_network/
peer_info.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! [`PeerInfoBehaviour`] is implementation of `NetworkBehaviour` that holds information about peers
8//! in cache.
9
10use crate::{utils::interval, LOG_TARGET};
11use either::Either;
12
13use fnv::FnvHashMap;
14use futures::prelude::*;
15use libp2p::{
16	core::{transport::PortUse, ConnectedPoint, Endpoint},
17	identify::{
18		Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
19		Info as IdentifyInfo,
20	},
21	identity::PublicKey,
22	multiaddr::Protocol,
23	ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
24	swarm::{
25		behaviour::{
26			AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
27			ListenFailure,
28		},
29		ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
30		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
31	},
32	Multiaddr, PeerId,
33};
34use log::{debug, error, trace, warn};
35use parking_lot::Mutex;
36use schnellru::{ByLength, LruMap};
37use smallvec::SmallVec;
38
39use std::{
40	collections::{hash_map::Entry, HashSet, VecDeque},
41	iter,
42	pin::Pin,
43	sync::Arc,
44	task::{Context, Poll},
45	time::{Duration, Instant},
46};
47
48/// Time after we disconnect from a node before we purge its information from the cache.
49const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
50/// Interval at which we perform garbage collection on the node info.
51const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
52/// The maximum number of tracked external addresses we allow.
53const MAX_EXTERNAL_ADDRESSES: u32 = 32;
54/// Number of times observed address is received from different peers before it is confirmed as
55/// external.
56const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
57
58/// Implementation of `NetworkBehaviour` that holds information about peers in cache.
59pub struct PeerInfoBehaviour {
60	/// Periodically ping nodes, and close the connection if it's unresponsive.
61	ping: Ping,
62	/// Periodically identifies the remote and responds to incoming requests.
63	identify: Identify,
64	/// Information that we know about all nodes.
65	nodes_info: FnvHashMap<PeerId, NodeInfo>,
66	/// Interval at which we perform garbage collection in `nodes_info`.
67	garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
68	/// PeerId of the local node.
69	local_peer_id: PeerId,
70	/// Public addresses supplied by the operator. Never expire.
71	public_addresses: Vec<Multiaddr>,
72	/// Listen addresses. External addresses matching listen addresses never expire.
73	listen_addresses: HashSet<Multiaddr>,
74	/// External address confirmations.
75	address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
76	/// Record keeping of external addresses. Data is queried by the `NetworkService`.
77	/// The addresses contain the `/p2p/...` part with local peer ID.
78	external_addresses: ExternalAddresses,
79	/// Pending events to emit to [`Swarm`](libp2p::swarm::Swarm).
80	pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
81}
82
83/// Information about a node we're connected to.
84#[derive(Debug)]
85struct NodeInfo {
86	/// When we will remove the entry about this node from the list, or `None` if we're connected
87	/// to the node.
88	info_expire: Option<Instant>,
89	/// Non-empty list of connected endpoints, one per connection.
90	endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
91	/// Version reported by the remote, or `None` if unknown.
92	client_version: Option<String>,
93	/// Latest ping time with this node.
94	latest_ping: Option<Duration>,
95}
96
97impl NodeInfo {
98	fn new(endpoint: ConnectedPoint) -> Self {
99		let mut endpoints = SmallVec::new();
100		endpoints.push(endpoint);
101		Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
102	}
103}
104
105/// Utility struct for tracking external addresses. The data is shared with the `NetworkService`.
106#[derive(Debug, Clone, Default)]
107pub struct ExternalAddresses {
108	addresses: Arc<Mutex<HashSet<Multiaddr>>>,
109}
110
111impl ExternalAddresses {
112	/// Add an external address.
113	pub fn add(&mut self, addr: Multiaddr) -> bool {
114		self.addresses.lock().insert(addr)
115	}
116
117	/// Remove an external address.
118	pub fn remove(&mut self, addr: &Multiaddr) -> bool {
119		self.addresses.lock().remove(addr)
120	}
121}
122
123impl PeerInfoBehaviour {
124	/// Builds a new `PeerInfoBehaviour`.
125	pub fn new(
126		user_agent: String,
127		local_public_key: PublicKey,
128		external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
129		public_addresses: Vec<Multiaddr>,
130	) -> Self {
131		let identify = {
132			let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key.clone())
133				.with_agent_version(user_agent)
134				// We don't need any peer information cached.
135				.with_cache_size(0);
136			Identify::new(cfg)
137		};
138
139		Self {
140			ping: Ping::new(PingConfig::new()),
141			identify,
142			nodes_info: FnvHashMap::default(),
143			garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
144			local_peer_id: local_public_key.to_peer_id(),
145			public_addresses,
146			listen_addresses: HashSet::new(),
147			address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
148			external_addresses: ExternalAddresses { addresses: external_addresses },
149			pending_actions: Default::default(),
150		}
151	}
152
153	/// Borrows `self` and returns a struct giving access to the information about a node.
154	///
155	/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
156	/// we're connected to, meaning that if `None` is returned then we're not connected to that
157	/// node.
158	pub fn node(&self, peer_id: &PeerId) -> Option<Node<'_>> {
159		self.nodes_info.get(peer_id).map(Node)
160	}
161
162	/// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node,
163	/// which shouldn't happen.
164	fn handle_ping_report(
165		&mut self,
166		peer_id: &PeerId,
167		ping_time: Duration,
168		connection: ConnectionId,
169	) {
170		trace!(target: LOG_TARGET, "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
171		if let Some(entry) = self.nodes_info.get_mut(peer_id) {
172			entry.latest_ping = Some(ping_time);
173		} else {
174			error!(target: LOG_TARGET,
175				"Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
176		}
177	}
178
179	/// Ensure address has the `/p2p/...` part with local peer id. Returns `Err` if the address
180	/// already contains a different peer id.
181	fn with_local_peer_id(&self, address: Multiaddr) -> Result<Multiaddr, Multiaddr> {
182		if let Some(Protocol::P2p(peer_id)) = address.iter().last() {
183			if peer_id == self.local_peer_id {
184				Ok(address)
185			} else {
186				Err(address)
187			}
188		} else {
189			Ok(address.with(Protocol::P2p(self.local_peer_id)))
190		}
191	}
192
193	/// Inserts an identify record in the cache & discovers external addresses when multiple
194	/// peers report the same address as observed.
195	fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
196		trace!(target: LOG_TARGET, "Identified {:?} => {:?}", peer_id, info);
197		if let Some(entry) = self.nodes_info.get_mut(peer_id) {
198			entry.client_version = Some(info.agent_version.clone());
199		} else {
200			error!(target: LOG_TARGET,
201				"Received identify message from node we're not connected to {peer_id:?}");
202		}
203		// Discover external addresses.
204		match self.with_local_peer_id(info.observed_addr.clone()) {
205			Ok(observed_addr) => {
206				let (is_new, expired) = self.is_new_external_address(&observed_addr, *peer_id);
207				if is_new && self.external_addresses.add(observed_addr.clone()) {
208					trace!(
209						target: LOG_TARGET,
210						"Observed address reported by Identify confirmed as external {}",
211						observed_addr,
212					);
213					self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(observed_addr));
214				}
215				if let Some(expired) = expired {
216					trace!(target: LOG_TARGET, "Removing replaced external address: {expired}");
217					self.external_addresses.remove(&expired);
218					self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(expired));
219				}
220			},
221			Err(addr) => {
222				warn!(
223					target: LOG_TARGET,
224					"Identify reported observed address for a peer that is not us: {addr}",
225				);
226			},
227		}
228	}
229
230	/// Check if addresses are equal taking into account they can contain or not contain
231	/// the `/p2p/...` part.
232	fn is_same_address(left: &Multiaddr, right: &Multiaddr) -> bool {
233		let mut left = left.iter();
234		let mut right = right.iter();
235
236		loop {
237			match (left.next(), right.next()) {
238				(None, None) => return true,
239				(None, Some(Protocol::P2p(_))) => return true,
240				(Some(Protocol::P2p(_)), None) => return true,
241				(left, right) if left != right => return false,
242				_ => {},
243			}
244		}
245	}
246
247	/// Check if `address` can be considered a new external address.
248	///
249	/// If this address replaces an older address, the expired address is returned.
250	fn is_new_external_address(
251		&mut self,
252		address: &Multiaddr,
253		peer_id: PeerId,
254	) -> (bool, Option<Multiaddr>) {
255		trace!(target: LOG_TARGET, "Verify new external address: {address}");
256
257		// Public and listen addresses don't count towards discovered external addresses
258		// and are always confirmed.
259		// Because they are not kept in the LRU, they are never replaced by discovered
260		// external addresses.
261		if self
262			.listen_addresses
263			.iter()
264			.chain(self.public_addresses.iter())
265			.any(|known_address| PeerInfoBehaviour::is_same_address(&known_address, &address))
266		{
267			return (true, None);
268		}
269
270		match self.address_confirmations.get(address) {
271			Some(confirmations) => {
272				confirmations.insert(peer_id);
273
274				if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
275					return (true, None);
276				}
277			},
278			None => {
279				let oldest = (self.address_confirmations.len()
280					>= self.address_confirmations.limiter().max_length() as usize)
281					.then(|| {
282						self.address_confirmations.pop_oldest().map(|(address, peers)| {
283							if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
284								return Some(address);
285							} else {
286								None
287							}
288						})
289					})
290					.flatten()
291					.flatten();
292
293				self.address_confirmations
294					.insert(address.clone(), iter::once(peer_id).collect());
295
296				return (false, oldest);
297			},
298		}
299
300		(false, None)
301	}
302}
303
304/// Gives access to the information about a node.
305pub struct Node<'a>(&'a NodeInfo);
306
307impl<'a> Node<'a> {
308	/// Returns the endpoint of an established connection to the peer.
309	///
310	/// Returns `None` if we are disconnected from the node.
311	pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
312		self.0.endpoints.get(0)
313	}
314
315	/// Returns the latest version information we know of.
316	pub fn client_version(&self) -> Option<&'a str> {
317		self.0.client_version.as_deref()
318	}
319
320	/// Returns the latest ping time we know of for this node. `None` if we never successfully
321	/// pinged this node.
322	pub fn latest_ping(&self) -> Option<Duration> {
323		self.0.latest_ping
324	}
325}
326
327/// Event that can be emitted by the behaviour.
328#[derive(Debug)]
329pub enum PeerInfoEvent {
330	/// We have obtained identity information from a peer, including the addresses it is listening
331	/// on.
332	Identified {
333		/// Id of the peer that has been identified.
334		peer_id: PeerId,
335		/// Information about the peer.
336		info: IdentifyInfo,
337	},
338}
339
340impl NetworkBehaviour for PeerInfoBehaviour {
341	type ConnectionHandler = ConnectionHandlerSelect<
342		<Ping as NetworkBehaviour>::ConnectionHandler,
343		<Identify as NetworkBehaviour>::ConnectionHandler,
344	>;
345	type ToSwarm = PeerInfoEvent;
346
347	fn handle_pending_inbound_connection(
348		&mut self,
349		connection_id: ConnectionId,
350		local_addr: &Multiaddr,
351		remote_addr: &Multiaddr,
352	) -> Result<(), ConnectionDenied> {
353		self.ping
354			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
355		self.identify
356			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
357	}
358
359	fn handle_pending_outbound_connection(
360		&mut self,
361		_connection_id: ConnectionId,
362		_maybe_peer: Option<PeerId>,
363		_addresses: &[Multiaddr],
364		_effective_role: Endpoint,
365	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
366		// Only `Discovery::handle_pending_outbound_connection` must be returning addresses to
367		// ensure that we don't return unwanted addresses.
368		Ok(Vec::new())
369	}
370
371	fn handle_established_inbound_connection(
372		&mut self,
373		connection_id: ConnectionId,
374		peer: PeerId,
375		local_addr: &Multiaddr,
376		remote_addr: &Multiaddr,
377	) -> Result<THandler<Self>, ConnectionDenied> {
378		let ping_handler = self.ping.handle_established_inbound_connection(
379			connection_id,
380			peer,
381			local_addr,
382			remote_addr,
383		)?;
384		let identify_handler = self.identify.handle_established_inbound_connection(
385			connection_id,
386			peer,
387			local_addr,
388			remote_addr,
389		)?;
390		Ok(ping_handler.select(identify_handler))
391	}
392
393	fn handle_established_outbound_connection(
394		&mut self,
395		connection_id: ConnectionId,
396		peer: PeerId,
397		addr: &Multiaddr,
398		role_override: Endpoint,
399		port_use: PortUse,
400	) -> Result<THandler<Self>, ConnectionDenied> {
401		let ping_handler = self.ping.handle_established_outbound_connection(
402			connection_id,
403			peer,
404			addr,
405			role_override,
406			port_use,
407		)?;
408		let identify_handler = self.identify.handle_established_outbound_connection(
409			connection_id,
410			peer,
411			addr,
412			role_override,
413			port_use,
414		)?;
415		Ok(ping_handler.select(identify_handler))
416	}
417
418	fn on_swarm_event(&mut self, event: FromSwarm) {
419		match event {
420			FromSwarm::ConnectionEstablished(
421				e @ ConnectionEstablished { peer_id, endpoint, .. },
422			) => {
423				self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
424				self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
425
426				match self.nodes_info.entry(peer_id) {
427					Entry::Vacant(e) => {
428						e.insert(NodeInfo::new(endpoint.clone()));
429					},
430					Entry::Occupied(e) => {
431						let e = e.into_mut();
432						if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
433						{
434							e.client_version = None;
435							e.latest_ping = None;
436						}
437						e.info_expire = None;
438						e.endpoints.push(endpoint.clone());
439					},
440				}
441			},
442			FromSwarm::ConnectionClosed(ConnectionClosed {
443				peer_id,
444				connection_id,
445				endpoint,
446				cause,
447				remaining_established,
448			}) => {
449				self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
450					peer_id,
451					connection_id,
452					endpoint,
453					cause,
454					remaining_established,
455				}));
456				self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
457					peer_id,
458					connection_id,
459					endpoint,
460					cause,
461					remaining_established,
462				}));
463
464				if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
465					if remaining_established == 0 {
466						entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
467					}
468					entry.endpoints.retain(|ep| ep != endpoint)
469				} else {
470					error!(target: LOG_TARGET,
471						"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
472				}
473			},
474			FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
475				self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
476					peer_id,
477					error,
478					connection_id,
479				}));
480				self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
481					peer_id,
482					error,
483					connection_id,
484				}));
485			},
486			FromSwarm::ListenerClosed(e) => {
487				self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
488				self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
489			},
490			FromSwarm::ListenFailure(ListenFailure {
491				local_addr,
492				send_back_addr,
493				error,
494				connection_id,
495				peer_id,
496			}) => {
497				self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
498					local_addr,
499					send_back_addr,
500					error,
501					connection_id,
502					peer_id,
503				}));
504				self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
505					local_addr,
506					send_back_addr,
507					error,
508					connection_id,
509					peer_id,
510				}));
511			},
512			FromSwarm::ListenerError(e) => {
513				self.ping.on_swarm_event(FromSwarm::ListenerError(e));
514				self.identify.on_swarm_event(FromSwarm::ListenerError(e));
515			},
516			FromSwarm::ExternalAddrExpired(e) => {
517				self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
518				self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
519			},
520			FromSwarm::NewListener(e) => {
521				self.ping.on_swarm_event(FromSwarm::NewListener(e));
522				self.identify.on_swarm_event(FromSwarm::NewListener(e));
523			},
524			FromSwarm::NewListenAddr(e) => {
525				self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
526				self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
527				self.listen_addresses.insert(e.addr.clone());
528			},
529			FromSwarm::ExpiredListenAddr(e) => {
530				self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
531				self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
532				self.listen_addresses.remove(e.addr);
533				// Remove matching external address.
534				match self.with_local_peer_id(e.addr.clone()) {
535					Ok(addr) => {
536						self.external_addresses.remove(&addr);
537						self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(addr));
538					},
539					Err(addr) => {
540						warn!(
541							target: LOG_TARGET,
542							"Listen address expired with peer ID that is not us: {addr}",
543						);
544					},
545				}
546			},
547			FromSwarm::NewExternalAddrCandidate(e) => {
548				self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
549				self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
550			},
551			FromSwarm::ExternalAddrConfirmed(e) => {
552				self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
553				self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
554			},
555			FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
556				self.ping.on_swarm_event(FromSwarm::AddressChange(e));
557				self.identify.on_swarm_event(FromSwarm::AddressChange(e));
558
559				if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
560					if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
561						*endpoint = new.clone();
562					} else {
563						error!(target: LOG_TARGET,
564							"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
565					}
566				} else {
567					error!(target: LOG_TARGET,
568						"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
569				}
570			},
571			FromSwarm::NewExternalAddrOfPeer(e) => {
572				self.ping.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
573				self.identify.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
574			},
575			event => {
576				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
577				self.ping.on_swarm_event(event);
578				self.identify.on_swarm_event(event);
579			},
580		}
581	}
582
583	fn on_connection_handler_event(
584		&mut self,
585		peer_id: PeerId,
586		connection_id: ConnectionId,
587		event: THandlerOutEvent<Self>,
588	) {
589		match event {
590			Either::Left(event) => {
591				self.ping.on_connection_handler_event(peer_id, connection_id, event)
592			},
593			Either::Right(event) => {
594				self.identify.on_connection_handler_event(peer_id, connection_id, event)
595			},
596		}
597	}
598
599	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
600		if let Some(event) = self.pending_actions.pop_front() {
601			return Poll::Ready(event);
602		}
603
604		loop {
605			match self.ping.poll(cx) {
606				Poll::Pending => break,
607				Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
608					if let PingEvent { peer, result: Ok(rtt), connection } = ev {
609						self.handle_ping_report(&peer, rtt, connection)
610					}
611				},
612				Poll::Ready(event) => {
613					return Poll::Ready(event.map_in(Either::Left).map_out(|_| {
614						unreachable!("`GenerateEvent` is handled in a branch above; qed")
615					}));
616				},
617			}
618		}
619
620		loop {
621			match self.identify.poll(cx) {
622				Poll::Pending => break,
623				Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
624					IdentifyEvent::Received { peer_id, info, .. } => {
625						self.handle_identify_report(&peer_id, &info);
626						let event = PeerInfoEvent::Identified { peer_id, info };
627						return Poll::Ready(ToSwarm::GenerateEvent(event));
628					},
629					IdentifyEvent::Error { connection_id, peer_id, error } => {
630						debug!(
631							target: LOG_TARGET,
632							"Identification with peer {peer_id:?}({connection_id}) failed => {error}"
633						);
634					},
635					IdentifyEvent::Pushed { .. } => {},
636					IdentifyEvent::Sent { .. } => {},
637				},
638				Poll::Ready(event) => {
639					return Poll::Ready(event.map_in(Either::Right).map_out(|_| {
640						unreachable!("`GenerateEvent` is handled in a branch above; qed")
641					}));
642				},
643			}
644		}
645
646		while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
647			self.nodes_info.retain(|_, node| {
648				node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
649			});
650		}
651
652		Poll::Pending
653	}
654}