1use crate::{utils::interval, LOG_TARGET};
11use either::Either;
12
13use fnv::FnvHashMap;
14use futures::prelude::*;
15use libp2p::{
16 core::{transport::PortUse, ConnectedPoint, Endpoint},
17 identify::{
18 Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
19 Info as IdentifyInfo,
20 },
21 identity::PublicKey,
22 multiaddr::Protocol,
23 ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
24 swarm::{
25 behaviour::{
26 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
27 ListenFailure,
28 },
29 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
30 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
31 },
32 Multiaddr, PeerId,
33};
34use log::{debug, error, trace, warn};
35use parking_lot::Mutex;
36use schnellru::{ByLength, LruMap};
37use smallvec::SmallVec;
38
39use std::{
40 collections::{hash_map::Entry, HashSet, VecDeque},
41 iter,
42 pin::Pin,
43 sync::Arc,
44 task::{Context, Poll},
45 time::{Duration, Instant},
46};
47
48const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
50const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
52const MAX_EXTERNAL_ADDRESSES: u32 = 32;
54const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
57
58pub struct PeerInfoBehaviour {
60 ping: Ping,
62 identify: Identify,
64 nodes_info: FnvHashMap<PeerId, NodeInfo>,
66 garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
68 local_peer_id: PeerId,
70 public_addresses: Vec<Multiaddr>,
72 listen_addresses: HashSet<Multiaddr>,
74 address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
76 external_addresses: ExternalAddresses,
79 pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
81}
82
83#[derive(Debug)]
85struct NodeInfo {
86 info_expire: Option<Instant>,
89 endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
91 client_version: Option<String>,
93 latest_ping: Option<Duration>,
95}
96
97impl NodeInfo {
98 fn new(endpoint: ConnectedPoint) -> Self {
99 let mut endpoints = SmallVec::new();
100 endpoints.push(endpoint);
101 Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
102 }
103}
104
105#[derive(Debug, Clone, Default)]
107pub struct ExternalAddresses {
108 addresses: Arc<Mutex<HashSet<Multiaddr>>>,
109}
110
111impl ExternalAddresses {
112 pub fn add(&mut self, addr: Multiaddr) -> bool {
114 self.addresses.lock().insert(addr)
115 }
116
117 pub fn remove(&mut self, addr: &Multiaddr) -> bool {
119 self.addresses.lock().remove(addr)
120 }
121}
122
123impl PeerInfoBehaviour {
124 pub fn new(
126 user_agent: String,
127 local_public_key: PublicKey,
128 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
129 public_addresses: Vec<Multiaddr>,
130 ) -> Self {
131 let identify = {
132 let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key.clone())
133 .with_agent_version(user_agent)
134 .with_cache_size(0);
136 Identify::new(cfg)
137 };
138
139 Self {
140 ping: Ping::new(PingConfig::new()),
141 identify,
142 nodes_info: FnvHashMap::default(),
143 garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
144 local_peer_id: local_public_key.to_peer_id(),
145 public_addresses,
146 listen_addresses: HashSet::new(),
147 address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
148 external_addresses: ExternalAddresses { addresses: external_addresses },
149 pending_actions: Default::default(),
150 }
151 }
152
153 pub fn node(&self, peer_id: &PeerId) -> Option<Node<'_>> {
159 self.nodes_info.get(peer_id).map(Node)
160 }
161
162 fn handle_ping_report(
165 &mut self,
166 peer_id: &PeerId,
167 ping_time: Duration,
168 connection: ConnectionId,
169 ) {
170 trace!(target: LOG_TARGET, "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
171 if let Some(entry) = self.nodes_info.get_mut(peer_id) {
172 entry.latest_ping = Some(ping_time);
173 } else {
174 error!(target: LOG_TARGET,
175 "Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
176 }
177 }
178
179 fn with_local_peer_id(&self, address: Multiaddr) -> Result<Multiaddr, Multiaddr> {
182 if let Some(Protocol::P2p(peer_id)) = address.iter().last() {
183 if peer_id == self.local_peer_id {
184 Ok(address)
185 } else {
186 Err(address)
187 }
188 } else {
189 Ok(address.with(Protocol::P2p(self.local_peer_id)))
190 }
191 }
192
193 fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
196 trace!(target: LOG_TARGET, "Identified {:?} => {:?}", peer_id, info);
197 if let Some(entry) = self.nodes_info.get_mut(peer_id) {
198 entry.client_version = Some(info.agent_version.clone());
199 } else {
200 error!(target: LOG_TARGET,
201 "Received identify message from node we're not connected to {peer_id:?}");
202 }
203 match self.with_local_peer_id(info.observed_addr.clone()) {
205 Ok(observed_addr) => {
206 let (is_new, expired) = self.is_new_external_address(&observed_addr, *peer_id);
207 if is_new && self.external_addresses.add(observed_addr.clone()) {
208 trace!(
209 target: LOG_TARGET,
210 "Observed address reported by Identify confirmed as external {}",
211 observed_addr,
212 );
213 self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(observed_addr));
214 }
215 if let Some(expired) = expired {
216 trace!(target: LOG_TARGET, "Removing replaced external address: {expired}");
217 self.external_addresses.remove(&expired);
218 self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(expired));
219 }
220 },
221 Err(addr) => {
222 warn!(
223 target: LOG_TARGET,
224 "Identify reported observed address for a peer that is not us: {addr}",
225 );
226 },
227 }
228 }
229
230 fn is_same_address(left: &Multiaddr, right: &Multiaddr) -> bool {
233 let mut left = left.iter();
234 let mut right = right.iter();
235
236 loop {
237 match (left.next(), right.next()) {
238 (None, None) => return true,
239 (None, Some(Protocol::P2p(_))) => return true,
240 (Some(Protocol::P2p(_)), None) => return true,
241 (left, right) if left != right => return false,
242 _ => {},
243 }
244 }
245 }
246
247 fn is_new_external_address(
251 &mut self,
252 address: &Multiaddr,
253 peer_id: PeerId,
254 ) -> (bool, Option<Multiaddr>) {
255 trace!(target: LOG_TARGET, "Verify new external address: {address}");
256
257 if self
262 .listen_addresses
263 .iter()
264 .chain(self.public_addresses.iter())
265 .any(|known_address| PeerInfoBehaviour::is_same_address(&known_address, &address))
266 {
267 return (true, None);
268 }
269
270 match self.address_confirmations.get(address) {
271 Some(confirmations) => {
272 confirmations.insert(peer_id);
273
274 if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
275 return (true, None);
276 }
277 },
278 None => {
279 let oldest = (self.address_confirmations.len()
280 >= self.address_confirmations.limiter().max_length() as usize)
281 .then(|| {
282 self.address_confirmations.pop_oldest().map(|(address, peers)| {
283 if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
284 return Some(address);
285 } else {
286 None
287 }
288 })
289 })
290 .flatten()
291 .flatten();
292
293 self.address_confirmations
294 .insert(address.clone(), iter::once(peer_id).collect());
295
296 return (false, oldest);
297 },
298 }
299
300 (false, None)
301 }
302}
303
304pub struct Node<'a>(&'a NodeInfo);
306
307impl<'a> Node<'a> {
308 pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
312 self.0.endpoints.get(0)
313 }
314
315 pub fn client_version(&self) -> Option<&'a str> {
317 self.0.client_version.as_deref()
318 }
319
320 pub fn latest_ping(&self) -> Option<Duration> {
323 self.0.latest_ping
324 }
325}
326
327#[derive(Debug)]
329pub enum PeerInfoEvent {
330 Identified {
333 peer_id: PeerId,
335 info: IdentifyInfo,
337 },
338}
339
340impl NetworkBehaviour for PeerInfoBehaviour {
341 type ConnectionHandler = ConnectionHandlerSelect<
342 <Ping as NetworkBehaviour>::ConnectionHandler,
343 <Identify as NetworkBehaviour>::ConnectionHandler,
344 >;
345 type ToSwarm = PeerInfoEvent;
346
347 fn handle_pending_inbound_connection(
348 &mut self,
349 connection_id: ConnectionId,
350 local_addr: &Multiaddr,
351 remote_addr: &Multiaddr,
352 ) -> Result<(), ConnectionDenied> {
353 self.ping
354 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
355 self.identify
356 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
357 }
358
359 fn handle_pending_outbound_connection(
360 &mut self,
361 _connection_id: ConnectionId,
362 _maybe_peer: Option<PeerId>,
363 _addresses: &[Multiaddr],
364 _effective_role: Endpoint,
365 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
366 Ok(Vec::new())
369 }
370
371 fn handle_established_inbound_connection(
372 &mut self,
373 connection_id: ConnectionId,
374 peer: PeerId,
375 local_addr: &Multiaddr,
376 remote_addr: &Multiaddr,
377 ) -> Result<THandler<Self>, ConnectionDenied> {
378 let ping_handler = self.ping.handle_established_inbound_connection(
379 connection_id,
380 peer,
381 local_addr,
382 remote_addr,
383 )?;
384 let identify_handler = self.identify.handle_established_inbound_connection(
385 connection_id,
386 peer,
387 local_addr,
388 remote_addr,
389 )?;
390 Ok(ping_handler.select(identify_handler))
391 }
392
393 fn handle_established_outbound_connection(
394 &mut self,
395 connection_id: ConnectionId,
396 peer: PeerId,
397 addr: &Multiaddr,
398 role_override: Endpoint,
399 port_use: PortUse,
400 ) -> Result<THandler<Self>, ConnectionDenied> {
401 let ping_handler = self.ping.handle_established_outbound_connection(
402 connection_id,
403 peer,
404 addr,
405 role_override,
406 port_use,
407 )?;
408 let identify_handler = self.identify.handle_established_outbound_connection(
409 connection_id,
410 peer,
411 addr,
412 role_override,
413 port_use,
414 )?;
415 Ok(ping_handler.select(identify_handler))
416 }
417
418 fn on_swarm_event(&mut self, event: FromSwarm) {
419 match event {
420 FromSwarm::ConnectionEstablished(
421 e @ ConnectionEstablished { peer_id, endpoint, .. },
422 ) => {
423 self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
424 self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
425
426 match self.nodes_info.entry(peer_id) {
427 Entry::Vacant(e) => {
428 e.insert(NodeInfo::new(endpoint.clone()));
429 },
430 Entry::Occupied(e) => {
431 let e = e.into_mut();
432 if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
433 {
434 e.client_version = None;
435 e.latest_ping = None;
436 }
437 e.info_expire = None;
438 e.endpoints.push(endpoint.clone());
439 },
440 }
441 },
442 FromSwarm::ConnectionClosed(ConnectionClosed {
443 peer_id,
444 connection_id,
445 endpoint,
446 cause,
447 remaining_established,
448 }) => {
449 self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
450 peer_id,
451 connection_id,
452 endpoint,
453 cause,
454 remaining_established,
455 }));
456 self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
457 peer_id,
458 connection_id,
459 endpoint,
460 cause,
461 remaining_established,
462 }));
463
464 if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
465 if remaining_established == 0 {
466 entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
467 }
468 entry.endpoints.retain(|ep| ep != endpoint)
469 } else {
470 error!(target: LOG_TARGET,
471 "Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
472 }
473 },
474 FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
475 self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
476 peer_id,
477 error,
478 connection_id,
479 }));
480 self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
481 peer_id,
482 error,
483 connection_id,
484 }));
485 },
486 FromSwarm::ListenerClosed(e) => {
487 self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
488 self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
489 },
490 FromSwarm::ListenFailure(ListenFailure {
491 local_addr,
492 send_back_addr,
493 error,
494 connection_id,
495 peer_id,
496 }) => {
497 self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
498 local_addr,
499 send_back_addr,
500 error,
501 connection_id,
502 peer_id,
503 }));
504 self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
505 local_addr,
506 send_back_addr,
507 error,
508 connection_id,
509 peer_id,
510 }));
511 },
512 FromSwarm::ListenerError(e) => {
513 self.ping.on_swarm_event(FromSwarm::ListenerError(e));
514 self.identify.on_swarm_event(FromSwarm::ListenerError(e));
515 },
516 FromSwarm::ExternalAddrExpired(e) => {
517 self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
518 self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
519 },
520 FromSwarm::NewListener(e) => {
521 self.ping.on_swarm_event(FromSwarm::NewListener(e));
522 self.identify.on_swarm_event(FromSwarm::NewListener(e));
523 },
524 FromSwarm::NewListenAddr(e) => {
525 self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
526 self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
527 self.listen_addresses.insert(e.addr.clone());
528 },
529 FromSwarm::ExpiredListenAddr(e) => {
530 self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
531 self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
532 self.listen_addresses.remove(e.addr);
533 match self.with_local_peer_id(e.addr.clone()) {
535 Ok(addr) => {
536 self.external_addresses.remove(&addr);
537 self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(addr));
538 },
539 Err(addr) => {
540 warn!(
541 target: LOG_TARGET,
542 "Listen address expired with peer ID that is not us: {addr}",
543 );
544 },
545 }
546 },
547 FromSwarm::NewExternalAddrCandidate(e) => {
548 self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
549 self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
550 },
551 FromSwarm::ExternalAddrConfirmed(e) => {
552 self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
553 self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
554 },
555 FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
556 self.ping.on_swarm_event(FromSwarm::AddressChange(e));
557 self.identify.on_swarm_event(FromSwarm::AddressChange(e));
558
559 if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
560 if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
561 *endpoint = new.clone();
562 } else {
563 error!(target: LOG_TARGET,
564 "Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
565 }
566 } else {
567 error!(target: LOG_TARGET,
568 "Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
569 }
570 },
571 FromSwarm::NewExternalAddrOfPeer(e) => {
572 self.ping.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
573 self.identify.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
574 },
575 event => {
576 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
577 self.ping.on_swarm_event(event);
578 self.identify.on_swarm_event(event);
579 },
580 }
581 }
582
583 fn on_connection_handler_event(
584 &mut self,
585 peer_id: PeerId,
586 connection_id: ConnectionId,
587 event: THandlerOutEvent<Self>,
588 ) {
589 match event {
590 Either::Left(event) => {
591 self.ping.on_connection_handler_event(peer_id, connection_id, event)
592 },
593 Either::Right(event) => {
594 self.identify.on_connection_handler_event(peer_id, connection_id, event)
595 },
596 }
597 }
598
599 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
600 if let Some(event) = self.pending_actions.pop_front() {
601 return Poll::Ready(event);
602 }
603
604 loop {
605 match self.ping.poll(cx) {
606 Poll::Pending => break,
607 Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
608 if let PingEvent { peer, result: Ok(rtt), connection } = ev {
609 self.handle_ping_report(&peer, rtt, connection)
610 }
611 },
612 Poll::Ready(event) => {
613 return Poll::Ready(event.map_in(Either::Left).map_out(|_| {
614 unreachable!("`GenerateEvent` is handled in a branch above; qed")
615 }));
616 },
617 }
618 }
619
620 loop {
621 match self.identify.poll(cx) {
622 Poll::Pending => break,
623 Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
624 IdentifyEvent::Received { peer_id, info, .. } => {
625 self.handle_identify_report(&peer_id, &info);
626 let event = PeerInfoEvent::Identified { peer_id, info };
627 return Poll::Ready(ToSwarm::GenerateEvent(event));
628 },
629 IdentifyEvent::Error { connection_id, peer_id, error } => {
630 debug!(
631 target: LOG_TARGET,
632 "Identification with peer {peer_id:?}({connection_id}) failed => {error}"
633 );
634 },
635 IdentifyEvent::Pushed { .. } => {},
636 IdentifyEvent::Sent { .. } => {},
637 },
638 Poll::Ready(event) => {
639 return Poll::Ready(event.map_in(Either::Right).map_out(|_| {
640 unreachable!("`GenerateEvent` is handled in a branch above; qed")
641 }));
642 },
643 }
644 }
645
646 while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
647 self.nodes_info.retain(|_, node| {
648 node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
649 });
650 }
651
652 Poll::Pending
653 }
654}