zebra_network/address_book.rs
1//! The `AddressBook` manages information about what peers exist, when they were
2//! seen, and what services they provide.
3
4use std::{
5 cmp::Reverse,
6 collections::HashMap,
7 net::{IpAddr, SocketAddr},
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use chrono::Utc;
13use indexmap::IndexMap;
14use ordered_map::OrderedMap;
15use tokio::sync::watch;
16use tracing::Span;
17
18use zebra_chain::{parameters::Network, serialization::DateTime32};
19
20use crate::{
21 constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
22 meta_addr::MetaAddrChange,
23 protocol::external::{canonical_peer_addr, canonical_socket_addr},
24 types::MetaAddr,
25 AddressBookPeers, PeerAddrState, PeerSocketAddr,
26};
27
28#[cfg(test)]
29mod tests;
30
31/// A database of peer listener addresses, their advertised services, and
32/// information on when they were last seen.
33///
34/// # Security
35///
36/// Address book state must be based on outbound connections to peers.
37///
38/// If the address book is updated incorrectly:
39/// - malicious peers can interfere with other peers' `AddressBook` state,
40/// or
41/// - Zebra can advertise unreachable addresses to its own peers.
42///
43/// ## Adding Addresses
44///
45/// The address book should only contain Zcash listener port addresses from peers
46/// on the configured network. These addresses can come from:
47/// - DNS seeders
48/// - addresses gossiped by other peers
49/// - the canonical address (`Version.address_from`) provided by each peer,
50/// particularly peers on inbound connections.
51///
52/// The remote addresses of inbound connections must not be added to the address
53/// book, because they contain ephemeral outbound ports, not listener ports.
54///
55/// Isolated connections must not add addresses or update the address book.
56///
57/// ## Updating Address State
58///
59/// Updates to address state must be based on outbound connections to peers.
60///
61/// Updates must not be based on:
62/// - the remote addresses of inbound connections, or
63/// - the canonical address of any connection.
64#[derive(Debug)]
65pub struct AddressBook {
66 /// Peer listener addresses, suitable for outbound connections,
67 /// in connection attempt order.
68 ///
69 /// Some peers in this list might have open outbound or inbound connections.
70 ///
71 /// We reverse the comparison order, because the standard library
72 /// ([`BTreeMap`](std::collections::BTreeMap)) sorts in ascending order, but
73 /// [`OrderedMap`] sorts in descending order.
74 by_addr: OrderedMap<PeerSocketAddr, MetaAddr, Reverse<MetaAddr>>,
75
76 /// The address with a last_connection_state of [`PeerAddrState::Responded`] and
77 /// the most recent `last_response` time by IP.
78 ///
79 /// This is used to avoid initiating outbound connections past [`Config::max_connections_per_ip`](crate::config::Config), and
80 /// currently only supports a `max_connections_per_ip` of 1, and must be `None` when used with a greater `max_connections_per_ip`.
81 // TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
82 most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,
83
84 /// A list of banned addresses, with the time they were banned.
85 bans_by_ip: Arc<IndexMap<IpAddr, Instant>>,
86
87 /// The local listener address.
88 local_listener: SocketAddr,
89
90 /// The configured Zcash network.
91 network: Network,
92
93 /// The maximum number of addresses in the address book.
94 ///
95 /// Always set to [`MAX_ADDRS_IN_ADDRESS_BOOK`](constants::MAX_ADDRS_IN_ADDRESS_BOOK),
96 /// in release builds. Lower values are used during testing.
97 addr_limit: usize,
98
99 /// The span for operations on this address book.
100 span: Span,
101
102 /// A channel used to send the latest address book metrics.
103 address_metrics_tx: watch::Sender<AddressMetrics>,
104
105 /// The last time we logged a message about the address metrics.
106 last_address_log: Option<Instant>,
107}
108
109/// Metrics about the states of the addresses in an [`AddressBook`].
110#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
111pub struct AddressMetrics {
112 /// The number of addresses in the `Responded` state.
113 pub responded: usize,
114
115 /// The number of addresses in the `NeverAttemptedGossiped` state.
116 pub never_attempted_gossiped: usize,
117
118 /// The number of addresses in the `Failed` state.
119 pub failed: usize,
120
121 /// The number of addresses in the `AttemptPending` state.
122 pub attempt_pending: usize,
123
124 /// The number of `Responded` addresses within the liveness limit.
125 pub recently_live: usize,
126
127 /// The number of `Responded` addresses outside the liveness limit.
128 pub recently_stopped_responding: usize,
129
130 /// The number of addresses in the address book, regardless of their states.
131 pub num_addresses: usize,
132
133 /// The maximum number of addresses in the address book.
134 pub address_limit: usize,
135}
136
137#[allow(clippy::len_without_is_empty)]
138impl AddressBook {
139 /// Construct an [`AddressBook`] with the given `local_listener` on `network`.
140 ///
141 /// Uses the supplied [`tracing::Span`] for address book operations.
142 pub fn new(
143 local_listener: SocketAddr,
144 network: &Network,
145 max_connections_per_ip: usize,
146 span: Span,
147 ) -> AddressBook {
148 let constructor_span = span.clone();
149 let _guard = constructor_span.enter();
150
151 let instant_now = Instant::now();
152 let chrono_now = Utc::now();
153
154 // The default value is correct for an empty address book,
155 // and it gets replaced by `update_metrics` anyway.
156 let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());
157
158 // Avoid initiating outbound handshakes when max_connections_per_ip is 1.
159 let should_limit_outbound_conns_per_ip = max_connections_per_ip == 1;
160 let mut new_book = AddressBook {
161 by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
162 local_listener: canonical_socket_addr(local_listener),
163 network: network.clone(),
164 addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
165 span,
166 address_metrics_tx,
167 last_address_log: None,
168 most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
169 bans_by_ip: Default::default(),
170 };
171
172 new_book.update_metrics(instant_now, chrono_now);
173 new_book
174 }
175
176 /// Construct an [`AddressBook`] with the given `local_listener`, `network`,
177 /// `addr_limit`, [`tracing::Span`], and addresses.
178 ///
179 /// `addr_limit` is enforced by this method, and by [`AddressBook::update`].
180 ///
181 /// If there are multiple [`MetaAddr`]s with the same address,
182 /// an arbitrary address is inserted into the address book,
183 /// and the rest are dropped.
184 ///
185 /// This constructor can be used to break address book invariants,
186 /// so it should only be used in tests.
187 #[cfg(any(test, feature = "proptest-impl"))]
188 pub fn new_with_addrs(
189 local_listener: SocketAddr,
190 network: &Network,
191 max_connections_per_ip: usize,
192 addr_limit: usize,
193 span: Span,
194 addrs: impl IntoIterator<Item = MetaAddr>,
195 ) -> AddressBook {
196 let constructor_span = span.clone();
197 let _guard = constructor_span.enter();
198
199 let instant_now = Instant::now();
200 let chrono_now = Utc::now();
201
202 // The maximum number of addresses should be always greater than 0
203 assert!(addr_limit > 0);
204
205 let mut new_book = AddressBook::new(local_listener, network, max_connections_per_ip, span);
206 new_book.addr_limit = addr_limit;
207
208 let addrs = addrs
209 .into_iter()
210 .map(|mut meta_addr| {
211 meta_addr.addr = canonical_peer_addr(meta_addr.addr);
212 meta_addr
213 })
214 .filter(|meta_addr| meta_addr.address_is_valid_for_outbound(network))
215 .map(|meta_addr| (meta_addr.addr, meta_addr));
216
217 for (socket_addr, meta_addr) in addrs {
218 // overwrite any duplicate addresses
219 new_book.by_addr.insert(socket_addr, meta_addr);
220 // Add the address to `most_recent_by_ip` if it has responded
221 if new_book.should_update_most_recent_by_ip(meta_addr) {
222 new_book
223 .most_recent_by_ip
224 .as_mut()
225 .expect("should be some when should_update_most_recent_by_ip is true")
226 .insert(socket_addr.ip(), meta_addr);
227 }
228 // exit as soon as we get enough addresses
229 if new_book.by_addr.len() >= addr_limit {
230 break;
231 }
232 }
233
234 new_book.update_metrics(instant_now, chrono_now);
235 new_book
236 }
237
238 /// Return a watch channel for the address book metrics.
239 ///
240 /// The metrics in the watch channel are only updated when the address book updates,
241 /// so they can be significantly outdated if Zebra is disconnected or hung.
242 ///
243 /// The current metrics value is marked as seen.
244 /// So `Receiver::changed` will only return after the next address book update.
245 pub fn address_metrics_watcher(&self) -> watch::Receiver<AddressMetrics> {
246 self.address_metrics_tx.subscribe()
247 }
248
249 /// Set the local listener address. Only for use in tests.
250 #[cfg(any(test, feature = "proptest-impl"))]
251 pub fn set_local_listener(&mut self, addr: SocketAddr) {
252 self.local_listener = addr;
253 }
254
255 /// Get the local listener address.
256 ///
257 /// This address contains minimal state, but it is not sanitized.
258 pub fn local_listener_meta_addr(&self, now: chrono::DateTime<Utc>) -> MetaAddr {
259 let now: DateTime32 = now.try_into().expect("will succeed until 2038");
260
261 MetaAddr::new_local_listener_change(self.local_listener)
262 .local_listener_into_new_meta_addr(now)
263 }
264
265 /// Get the local listener [`SocketAddr`].
266 pub fn local_listener_socket_addr(&self) -> SocketAddr {
267 self.local_listener
268 }
269
270 /// Get the active addresses in `self` in random order with sanitized timestamps,
271 /// including our local listener address.
272 ///
273 /// Limited to the number of peer addresses Zebra should give out per `GetAddr` request.
274 pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
275 let now = Utc::now();
276 let mut peers = self.sanitized(now);
277 let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
278 peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));
279
280 peers
281 }
282
283 /// Get the active addresses in `self` in random order with sanitized timestamps,
284 /// including our local listener address.
285 pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
286 use rand::seq::SliceRandom;
287 let _guard = self.span.enter();
288
289 let mut peers = self.by_addr.clone();
290
291 // Unconditionally add our local listener address to the advertised peers,
292 // to replace any self-connection failures. The address book and change
293 // constructors make sure that the SocketAddr is canonical.
294 let local_listener = self.local_listener_meta_addr(now);
295 peers.insert(local_listener.addr, local_listener);
296
297 // Then sanitize and shuffle
298 let mut peers: Vec<MetaAddr> = peers
299 .descending_values()
300 .filter_map(|meta_addr| meta_addr.sanitize(&self.network))
301 // # Security
302 //
303 // Remove peers that:
304 // - last responded more than three hours ago, or
305 // - haven't responded yet but were reported last seen more than three hours ago
306 //
307 // This prevents Zebra from gossiping nodes that are likely unreachable. Gossiping such
308 // nodes impacts the network health, because connection attempts end up being wasted on
309 // peers that are less likely to respond.
310 .filter(|addr| addr.is_active_for_gossip(now))
311 .collect();
312
313 peers.shuffle(&mut rand::thread_rng());
314
315 peers
316 }
317
318 /// Get the active addresses in `self`, in preferred caching order,
319 /// excluding our local listener address.
320 pub fn cacheable(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
321 let _guard = self.span.enter();
322
323 let peers = self.by_addr.clone();
324
325 // Get peers in preferred order, then keep the recently active ones
326 peers
327 .descending_values()
328 // # Security
329 //
330 // Remove peers that:
331 // - last responded more than three hours ago, or
332 // - haven't responded yet but were reported last seen more than three hours ago
333 //
334 // This prevents Zebra from caching nodes that are likely unreachable,
335 // which improves startup time and reliability.
336 .filter(|addr| addr.is_active_for_gossip(now))
337 .cloned()
338 .collect()
339 }
340
341 /// Look up `addr` in the address book, and return its [`MetaAddr`].
342 ///
343 /// Converts `addr` to a canonical address before looking it up.
344 pub fn get(&mut self, addr: PeerSocketAddr) -> Option<MetaAddr> {
345 let addr = canonical_peer_addr(*addr);
346
347 // Unfortunately, `OrderedMap` doesn't implement `get`.
348 let meta_addr = self.by_addr.remove(&addr);
349
350 if let Some(meta_addr) = meta_addr {
351 self.by_addr.insert(addr, meta_addr);
352 }
353
354 meta_addr
355 }
356
357 /// Returns true if `updated` needs to be applied to the recent outbound peer connection IP cache.
358 ///
359 /// Checks if there are no existing entries in the address book with this IP,
360 /// or if `updated` has a more recent `last_response` requiring the outbound connector to wait
361 /// longer before initiating handshakes with peers at this IP.
362 ///
363 /// This code only needs to check a single cache entry, rather than the entire address book,
364 /// because other code maintains these invariants:
365 /// - `last_response` times for an entry can only increase.
366 /// - this is the only field checked by `has_connection_recently_responded()`
367 ///
368 /// See [`AddressBook::is_ready_for_connection_attempt_with_ip`] for more details.
369 fn should_update_most_recent_by_ip(&self, updated: MetaAddr) -> bool {
370 let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
371 return false;
372 };
373
374 if let Some(previous) = most_recent_by_ip.get(&updated.addr.ip()) {
375 updated.last_connection_state == PeerAddrState::Responded
376 && updated.last_response() > previous.last_response()
377 } else {
378 updated.last_connection_state == PeerAddrState::Responded
379 }
380 }
381
382 /// Returns true if `addr` is the latest entry for its IP, which is stored in `most_recent_by_ip`.
383 /// The entry is checked for an exact match to the IP and port of `addr`.
384 fn should_remove_most_recent_by_ip(&self, addr: PeerSocketAddr) -> bool {
385 let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
386 return false;
387 };
388
389 if let Some(previous) = most_recent_by_ip.get(&addr.ip()) {
390 previous.addr == addr
391 } else {
392 false
393 }
394 }
395
396 /// Apply `change` to the address book, returning the updated `MetaAddr`,
397 /// if the change was valid.
398 ///
399 /// # Correctness
400 ///
401 /// All changes should go through `update`, so that the address book
402 /// only contains valid outbound addresses.
403 ///
404 /// Change addresses must be canonical `PeerSocketAddr`s. This makes sure that
405 /// each address book entry has a unique IP address.
406 ///
407 /// # Security
408 ///
409 /// This function must apply every attempted, responded, and failed change
410 /// to the address book. This prevents rapid reconnections to the same peer.
411 ///
412 /// As an exception, this function can ignore all changes for specific
413 /// [`PeerSocketAddr`]s. Ignored addresses will never be used to connect to
414 /// peers.
415 #[allow(clippy::unwrap_in_result)]
416 pub fn update(&mut self, change: MetaAddrChange) -> Option<MetaAddr> {
417 if self.bans_by_ip.contains_key(&change.addr().ip()) {
418 tracing::warn!(
419 ?change,
420 "attempted to add a banned peer addr to address book"
421 );
422 return None;
423 }
424
425 let previous = self.get(change.addr());
426
427 let _guard = self.span.enter();
428
429 let instant_now = Instant::now();
430 let chrono_now = Utc::now();
431
432 let updated = change.apply_to_meta_addr(previous, instant_now, chrono_now);
433
434 trace!(
435 ?change,
436 ?updated,
437 ?previous,
438 total_peers = self.by_addr.len(),
439 recent_peers = self.recently_live_peers(chrono_now).len(),
440 "calculated updated address book entry",
441 );
442
443 if let Some(updated) = updated {
444 if updated.misbehavior() >= constants::MAX_PEER_MISBEHAVIOR_SCORE {
445 // Ban and skip outbound connections with excessively misbehaving peers.
446 let banned_ip = updated.addr.ip();
447 let bans_by_ip = Arc::make_mut(&mut self.bans_by_ip);
448
449 bans_by_ip.insert(banned_ip, Instant::now());
450 if bans_by_ip.len() > constants::MAX_BANNED_IPS {
451 // Remove the oldest banned IP from the address book.
452 bans_by_ip.shift_remove_index(0);
453 }
454
455 // `most_recent_by_ip` is only populated when
456 // `max_connections_per_ip == 1`. The ban path runs for any
457 // configured value, so we must guard the optional cache rather
458 // than unwrap it.
459 if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
460 most_recent_by_ip.remove(&banned_ip);
461 }
462
463 let banned_addrs: Vec<_> = self
464 .by_addr
465 .descending_keys()
466 .skip_while(|addr| addr.ip() != banned_ip)
467 .take_while(|addr| addr.ip() == banned_ip)
468 .cloned()
469 .collect();
470
471 for addr in banned_addrs {
472 self.by_addr.remove(&addr);
473 }
474
475 warn!(
476 ?updated,
477 total_peers = self.by_addr.len(),
478 recent_peers = self.recently_live_peers(chrono_now).len(),
479 "banned ip and removed banned peer addresses from address book",
480 );
481
482 return None;
483 }
484
485 // Ignore invalid outbound addresses.
486 // (Inbound connections can be monitored via Zebra's metrics.)
487 if !updated.address_is_valid_for_outbound(&self.network) {
488 return None;
489 }
490
491 // Ignore invalid outbound services and other info,
492 // but only if the peer has never been attempted.
493 //
494 // Otherwise, if we got the info directly from the peer,
495 // store it in the address book, so we know not to reconnect.
496 if !updated.last_known_info_is_valid_for_outbound(&self.network)
497 && updated.last_connection_state.is_never_attempted()
498 {
499 return None;
500 }
501
502 self.by_addr.insert(updated.addr, updated);
503
504 // Add the address to `most_recent_by_ip` if it sent the most recent
505 // response Zebra has received from this IP.
506 if self.should_update_most_recent_by_ip(updated) {
507 self.most_recent_by_ip
508 .as_mut()
509 .expect("should be some when should_update_most_recent_by_ip is true")
510 .insert(updated.addr.ip(), updated);
511 }
512
513 debug!(
514 ?change,
515 ?updated,
516 ?previous,
517 total_peers = self.by_addr.len(),
518 recent_peers = self.recently_live_peers(chrono_now).len(),
519 "updated address book entry",
520 );
521
522 // Security: Limit the number of peers in the address book.
523 //
524 // We only delete outdated peers when we have too many peers.
525 // If we deleted them as soon as they became too old,
526 // then other peers could re-insert them into the address book.
527 // And we would start connecting to those outdated peers again,
528 // ignoring the age limit in [`MetaAddr::is_probably_reachable`].
529 while self.by_addr.len() > self.addr_limit {
530 let surplus_peer = self
531 .peers()
532 .next_back()
533 .expect("just checked there is at least one peer");
534
535 self.by_addr.remove(&surplus_peer.addr);
536
537 // Check if this surplus peer's addr matches that in `most_recent_by_ip`
538 // for this the surplus peer's ip to remove it there as well.
539 if self.should_remove_most_recent_by_ip(surplus_peer.addr) {
540 self.most_recent_by_ip
541 .as_mut()
542 .expect("should be some when should_remove_most_recent_by_ip is true")
543 .remove(&surplus_peer.addr.ip());
544 }
545
546 debug!(
547 surplus = ?surplus_peer,
548 ?updated,
549 total_peers = self.by_addr.len(),
550 recent_peers = self.recently_live_peers(chrono_now).len(),
551 "removed surplus address book entry",
552 );
553 }
554
555 assert!(self.len() <= self.addr_limit);
556
557 std::mem::drop(_guard);
558 self.update_metrics(instant_now, chrono_now);
559 }
560
561 updated
562 }
563
564 /// Removes the entry with `addr`, returning it if it exists
565 ///
566 /// # Note
567 ///
568 /// All address removals should go through `take`, so that the address
569 /// book metrics are accurate.
570 #[allow(dead_code)]
571 fn take(&mut self, removed_addr: PeerSocketAddr) -> Option<MetaAddr> {
572 let _guard = self.span.enter();
573
574 let instant_now = Instant::now();
575 let chrono_now = Utc::now();
576
577 trace!(
578 ?removed_addr,
579 total_peers = self.by_addr.len(),
580 recent_peers = self.recently_live_peers(chrono_now).len(),
581 );
582
583 if let Some(entry) = self.by_addr.remove(&removed_addr) {
584 // Check if this surplus peer's addr matches that in `most_recent_by_ip`
585 // for this the surplus peer's ip to remove it there as well.
586 if self.should_remove_most_recent_by_ip(entry.addr) {
587 if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
588 most_recent_by_ip.remove(&entry.addr.ip());
589 }
590 }
591
592 std::mem::drop(_guard);
593 self.update_metrics(instant_now, chrono_now);
594 Some(entry)
595 } else {
596 None
597 }
598 }
599
600 /// Returns true if the given [`PeerSocketAddr`] is pending a reconnection
601 /// attempt.
602 pub fn pending_reconnection_addr(&mut self, addr: PeerSocketAddr) -> bool {
603 let meta_addr = self.get(addr);
604
605 let _guard = self.span.enter();
606 match meta_addr {
607 None => false,
608 Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending,
609 }
610 }
611
612 /// Return an iterator over all peers.
613 ///
614 /// Returns peers in reconnection attempt order, including recently connected peers.
615 pub fn peers(&'_ self) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
616 let _guard = self.span.enter();
617 self.by_addr.descending_values().cloned()
618 }
619
620 /// Is this IP ready for a new outbound connection attempt?
621 /// Checks if the outbound connection with the most recent response at this IP has recently responded.
622 ///
623 /// Note: last_response times may remain live for a long time if the local clock is changed to an earlier time.
624 fn is_ready_for_connection_attempt_with_ip(
625 &self,
626 ip: &IpAddr,
627 chrono_now: chrono::DateTime<Utc>,
628 ) -> bool {
629 let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
630 // if we're not checking IPs, any connection is allowed
631 return true;
632 };
633 let Some(same_ip_peer) = most_recent_by_ip.get(ip) else {
634 // If there's no entry for this IP, any connection is allowed
635 return true;
636 };
637 !same_ip_peer.has_connection_recently_responded(chrono_now)
638 }
639
640 /// Return an iterator over peers that are due for a reconnection attempt,
641 /// in reconnection attempt order.
642 pub fn reconnection_peers(
643 &'_ self,
644 instant_now: Instant,
645 chrono_now: chrono::DateTime<Utc>,
646 ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
647 let _guard = self.span.enter();
648
649 // Skip live peers, and peers pending a reconnect attempt.
650 // The peers are already stored in sorted order.
651 self.by_addr
652 .descending_values()
653 .filter(move |peer| {
654 peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
655 && self.is_ready_for_connection_attempt_with_ip(&peer.addr.ip(), chrono_now)
656 })
657 .cloned()
658 }
659
660 /// Return an iterator over all the peers in `state`,
661 /// in reconnection attempt order, including recently connected peers.
662 pub fn state_peers(
663 &'_ self,
664 state: PeerAddrState,
665 ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
666 let _guard = self.span.enter();
667
668 self.by_addr
669 .descending_values()
670 .filter(move |peer| peer.last_connection_state == state)
671 .cloned()
672 }
673
674 /// Return an iterator over peers that might be connected,
675 /// in reconnection attempt order.
676 pub fn maybe_connected_peers(
677 &'_ self,
678 instant_now: Instant,
679 chrono_now: chrono::DateTime<Utc>,
680 ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
681 let _guard = self.span.enter();
682
683 self.by_addr
684 .descending_values()
685 .filter(move |peer| {
686 !peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
687 })
688 .cloned()
689 }
690
691 /// Returns banned IP addresses.
692 pub fn bans(&self) -> Arc<IndexMap<IpAddr, Instant>> {
693 self.bans_by_ip.clone()
694 }
695
696 /// Returns the number of entries in this address book.
697 pub fn len(&self) -> usize {
698 self.by_addr.len()
699 }
700
701 /// Returns metrics for the addresses in this address book.
702 /// Only for use in tests.
703 ///
704 /// # Correctness
705 ///
706 /// Use [`AddressBook::address_metrics_watcher`] in production code,
707 /// to avoid deadlocks.
708 #[cfg(test)]
709 pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
710 self.address_metrics_internal(now)
711 }
712
713 /// Returns metrics for the addresses in this address book.
714 ///
715 /// # Correctness
716 ///
717 /// External callers should use [`AddressBook::address_metrics_watcher`]
718 /// in production code, to avoid deadlocks.
719 /// (Using the watch channel receiver does not lock the address book mutex.)
720 fn address_metrics_internal(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
721 let responded = self.state_peers(PeerAddrState::Responded).count();
722 let never_attempted_gossiped = self
723 .state_peers(PeerAddrState::NeverAttemptedGossiped)
724 .count();
725 let failed = self.state_peers(PeerAddrState::Failed).count();
726 let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count();
727
728 let recently_live = self.recently_live_peers(now).len();
729 let recently_stopped_responding = responded
730 .checked_sub(recently_live)
731 .expect("all recently live peers must have responded");
732
733 let num_addresses = self.len();
734
735 AddressMetrics {
736 responded,
737 never_attempted_gossiped,
738 failed,
739 attempt_pending,
740 recently_live,
741 recently_stopped_responding,
742 num_addresses,
743 address_limit: self.addr_limit,
744 }
745 }
746
747 /// Update the metrics for this address book.
748 fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) {
749 let _guard = self.span.enter();
750
751 let m = self.address_metrics_internal(chrono_now);
752
753 // Ignore errors: we don't care if any receivers are listening.
754 let _ = self.address_metrics_tx.send(m);
755
756 // TODO: rename to address_book.[state_name]
757 metrics::gauge!("candidate_set.responded").set(m.responded as f64);
758 metrics::gauge!("candidate_set.gossiped").set(m.never_attempted_gossiped as f64);
759 metrics::gauge!("candidate_set.failed").set(m.failed as f64);
760 metrics::gauge!("candidate_set.pending").set(m.attempt_pending as f64);
761
762 // TODO: rename to address_book.responded.recently_live
763 metrics::gauge!("candidate_set.recently_live").set(m.recently_live as f64);
764 // TODO: rename to address_book.responded.stopped_responding
765 metrics::gauge!("candidate_set.disconnected").set(m.recently_stopped_responding as f64);
766
767 std::mem::drop(_guard);
768 self.log_metrics(&m, instant_now);
769 }
770
771 /// Log metrics for this address book
772 fn log_metrics(&mut self, m: &AddressMetrics, now: Instant) {
773 let _guard = self.span.enter();
774
775 trace!(
776 address_metrics = ?m,
777 );
778
779 if m.responded > 0 {
780 return;
781 }
782
783 // These logs are designed to be human-readable in a terminal, at the
784 // default Zebra log level. If you need to know address states for
785 // every request, use the trace-level logs, or the metrics exporter.
786 if let Some(last_address_log) = self.last_address_log {
787 // Avoid duplicate address logs
788 if now.saturating_duration_since(last_address_log).as_secs() < 60 {
789 return;
790 }
791 } else {
792 // Suppress initial logs until the peer set has started up.
793 // There can be multiple address changes before the first peer has
794 // responded.
795 self.last_address_log = Some(now);
796 return;
797 }
798
799 self.last_address_log = Some(now);
800 // if all peers have failed
801 if m.responded + m.attempt_pending + m.never_attempted_gossiped == 0 {
802 warn!(
803 address_metrics = ?m,
804 "all peer addresses have failed. Hint: check your network connection"
805 );
806 } else {
807 info!(
808 address_metrics = ?m,
809 "no active peer connections: trying gossiped addresses"
810 );
811 }
812 }
813}
814
815impl AddressBookPeers for AddressBook {
816 fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
817 let _guard = self.span.enter();
818
819 self.by_addr
820 .descending_values()
821 .filter(|peer| peer.was_recently_live(now))
822 .cloned()
823 .collect()
824 }
825
826 fn add_peer(&mut self, peer: PeerSocketAddr) -> bool {
827 if self.get(peer).is_some() {
828 // Peer already exists in the address book, so we don't need to add it again.
829 return false;
830 }
831 self.update(MetaAddr::new_initial_peer(peer)).is_some()
832 }
833}
834
835impl AddressBookPeers for Arc<Mutex<AddressBook>> {
836 fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
837 self.lock()
838 .expect("panic in a previous thread that was holding the mutex")
839 .recently_live_peers(now)
840 }
841
842 fn add_peer(&mut self, peer: PeerSocketAddr) -> bool {
843 self.lock()
844 .expect("panic in a previous thread that was holding the mutex")
845 .add_peer(peer)
846 }
847}
848
849impl Extend<MetaAddrChange> for AddressBook {
850 fn extend<T>(&mut self, iter: T)
851 where
852 T: IntoIterator<Item = MetaAddrChange>,
853 {
854 for change in iter.into_iter() {
855 self.update(change);
856 }
857 }
858}
859
860impl Clone for AddressBook {
861 /// Clone the addresses, address limit, local listener address, and span.
862 ///
863 /// Cloned address books have a separate metrics struct watch channel, and an empty last address log.
864 ///
865 /// All address books update the same prometheus metrics.
866 fn clone(&self) -> AddressBook {
867 // The existing metrics might be outdated, but we avoid calling `update_metrics`,
868 // so we don't overwrite the prometheus metrics from the main address book.
869 let (address_metrics_tx, _address_metrics_rx) =
870 watch::channel(*self.address_metrics_tx.borrow());
871
872 AddressBook {
873 by_addr: self.by_addr.clone(),
874 local_listener: self.local_listener,
875 network: self.network.clone(),
876 addr_limit: self.addr_limit,
877 span: self.span.clone(),
878 address_metrics_tx,
879 last_address_log: None,
880 most_recent_by_ip: self.most_recent_by_ip.clone(),
881 bans_by_ip: self.bans_by_ip.clone(),
882 }
883 }
884}