Skip to main content

zebra_network/
address_book.rs

1//! The `AddressBook` manages information about what peers exist, when they were
2//! seen, and what services they provide.
3
4use std::{
5    cmp::Reverse,
6    collections::HashMap,
7    net::{IpAddr, SocketAddr},
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use chrono::Utc;
13use indexmap::IndexMap;
14use ordered_map::OrderedMap;
15use tokio::sync::watch;
16use tracing::Span;
17
18use zebra_chain::{parameters::Network, serialization::DateTime32};
19
20use crate::{
21    constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
22    meta_addr::MetaAddrChange,
23    protocol::external::{canonical_peer_addr, canonical_socket_addr},
24    types::MetaAddr,
25    AddressBookPeers, PeerAddrState, PeerSocketAddr,
26};
27
28#[cfg(test)]
29mod tests;
30
31/// A database of peer listener addresses, their advertised services, and
32/// information on when they were last seen.
33///
34/// # Security
35///
36/// Address book state must be based on outbound connections to peers.
37///
38/// If the address book is updated incorrectly:
39/// - malicious peers can interfere with other peers' `AddressBook` state,
40///   or
41/// - Zebra can advertise unreachable addresses to its own peers.
42///
43/// ## Adding Addresses
44///
45/// The address book should only contain Zcash listener port addresses from peers
46/// on the configured network. These addresses can come from:
47/// - DNS seeders
48/// - addresses gossiped by other peers
49/// - the canonical address (`Version.address_from`) provided by each peer,
50///   particularly peers on inbound connections.
51///
52/// The remote addresses of inbound connections must not be added to the address
53/// book, because they contain ephemeral outbound ports, not listener ports.
54///
55/// Isolated connections must not add addresses or update the address book.
56///
57/// ## Updating Address State
58///
59/// Updates to address state must be based on outbound connections to peers.
60///
61/// Updates must not be based on:
62/// - the remote addresses of inbound connections, or
63/// - the canonical address of any connection.
64#[derive(Debug)]
65pub struct AddressBook {
66    /// Peer listener addresses, suitable for outbound connections,
67    /// in connection attempt order.
68    ///
69    /// Some peers in this list might have open outbound or inbound connections.
70    ///
71    /// We reverse the comparison order, because the standard library
72    /// ([`BTreeMap`](std::collections::BTreeMap)) sorts in ascending order, but
73    /// [`OrderedMap`] sorts in descending order.
74    by_addr: OrderedMap<PeerSocketAddr, MetaAddr, Reverse<MetaAddr>>,
75
76    /// The address with a last_connection_state of [`PeerAddrState::Responded`] and
77    /// the most recent `last_response` time by IP.
78    ///
79    /// This is used to avoid initiating outbound connections past [`Config::max_connections_per_ip`](crate::config::Config), and
80    /// currently only supports a `max_connections_per_ip` of 1, and must be `None` when used with a greater `max_connections_per_ip`.
81    // TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
82    most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,
83
84    /// A list of banned addresses, with the time they were banned.
85    bans_by_ip: Arc<IndexMap<IpAddr, Instant>>,
86
87    /// The local listener address.
88    local_listener: SocketAddr,
89
90    /// The configured Zcash network.
91    network: Network,
92
93    /// The maximum number of addresses in the address book.
94    ///
95    /// Always set to [`MAX_ADDRS_IN_ADDRESS_BOOK`](constants::MAX_ADDRS_IN_ADDRESS_BOOK),
96    /// in release builds. Lower values are used during testing.
97    addr_limit: usize,
98
99    /// The span for operations on this address book.
100    span: Span,
101
102    /// A channel used to send the latest address book metrics.
103    address_metrics_tx: watch::Sender<AddressMetrics>,
104
105    /// The last time we logged a message about the address metrics.
106    last_address_log: Option<Instant>,
107}
108
109/// Metrics about the states of the addresses in an [`AddressBook`].
110#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
111pub struct AddressMetrics {
112    /// The number of addresses in the `Responded` state.
113    pub responded: usize,
114
115    /// The number of addresses in the `NeverAttemptedGossiped` state.
116    pub never_attempted_gossiped: usize,
117
118    /// The number of addresses in the `Failed` state.
119    pub failed: usize,
120
121    /// The number of addresses in the `AttemptPending` state.
122    pub attempt_pending: usize,
123
124    /// The number of `Responded` addresses within the liveness limit.
125    pub recently_live: usize,
126
127    /// The number of `Responded` addresses outside the liveness limit.
128    pub recently_stopped_responding: usize,
129
130    /// The number of addresses in the address book, regardless of their states.
131    pub num_addresses: usize,
132
133    /// The maximum number of addresses in the address book.
134    pub address_limit: usize,
135}
136
137#[allow(clippy::len_without_is_empty)]
138impl AddressBook {
139    /// Construct an [`AddressBook`] with the given `local_listener` on `network`.
140    ///
141    /// Uses the supplied [`tracing::Span`] for address book operations.
142    pub fn new(
143        local_listener: SocketAddr,
144        network: &Network,
145        max_connections_per_ip: usize,
146        span: Span,
147    ) -> AddressBook {
148        let constructor_span = span.clone();
149        let _guard = constructor_span.enter();
150
151        let instant_now = Instant::now();
152        let chrono_now = Utc::now();
153
154        // The default value is correct for an empty address book,
155        // and it gets replaced by `update_metrics` anyway.
156        let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());
157
158        // Avoid initiating outbound handshakes when max_connections_per_ip is 1.
159        let should_limit_outbound_conns_per_ip = max_connections_per_ip == 1;
160        let mut new_book = AddressBook {
161            by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
162            local_listener: canonical_socket_addr(local_listener),
163            network: network.clone(),
164            addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
165            span,
166            address_metrics_tx,
167            last_address_log: None,
168            most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
169            bans_by_ip: Default::default(),
170        };
171
172        new_book.update_metrics(instant_now, chrono_now);
173        new_book
174    }
175
176    /// Construct an [`AddressBook`] with the given `local_listener`, `network`,
177    /// `addr_limit`, [`tracing::Span`], and addresses.
178    ///
179    /// `addr_limit` is enforced by this method, and by [`AddressBook::update`].
180    ///
181    /// If there are multiple [`MetaAddr`]s with the same address,
182    /// an arbitrary address is inserted into the address book,
183    /// and the rest are dropped.
184    ///
185    /// This constructor can be used to break address book invariants,
186    /// so it should only be used in tests.
187    #[cfg(any(test, feature = "proptest-impl"))]
188    pub fn new_with_addrs(
189        local_listener: SocketAddr,
190        network: &Network,
191        max_connections_per_ip: usize,
192        addr_limit: usize,
193        span: Span,
194        addrs: impl IntoIterator<Item = MetaAddr>,
195    ) -> AddressBook {
196        let constructor_span = span.clone();
197        let _guard = constructor_span.enter();
198
199        let instant_now = Instant::now();
200        let chrono_now = Utc::now();
201
202        // The maximum number of addresses should be always greater than 0
203        assert!(addr_limit > 0);
204
205        let mut new_book = AddressBook::new(local_listener, network, max_connections_per_ip, span);
206        new_book.addr_limit = addr_limit;
207
208        let addrs = addrs
209            .into_iter()
210            .map(|mut meta_addr| {
211                meta_addr.addr = canonical_peer_addr(meta_addr.addr);
212                meta_addr
213            })
214            .filter(|meta_addr| meta_addr.address_is_valid_for_outbound(network))
215            .map(|meta_addr| (meta_addr.addr, meta_addr));
216
217        for (socket_addr, meta_addr) in addrs {
218            // overwrite any duplicate addresses
219            new_book.by_addr.insert(socket_addr, meta_addr);
220            // Add the address to `most_recent_by_ip` if it has responded
221            if new_book.should_update_most_recent_by_ip(meta_addr) {
222                new_book
223                    .most_recent_by_ip
224                    .as_mut()
225                    .expect("should be some when should_update_most_recent_by_ip is true")
226                    .insert(socket_addr.ip(), meta_addr);
227            }
228            // exit as soon as we get enough addresses
229            if new_book.by_addr.len() >= addr_limit {
230                break;
231            }
232        }
233
234        new_book.update_metrics(instant_now, chrono_now);
235        new_book
236    }
237
238    /// Return a watch channel for the address book metrics.
239    ///
240    /// The metrics in the watch channel are only updated when the address book updates,
241    /// so they can be significantly outdated if Zebra is disconnected or hung.
242    ///
243    /// The current metrics value is marked as seen.
244    /// So `Receiver::changed` will only return after the next address book update.
245    pub fn address_metrics_watcher(&self) -> watch::Receiver<AddressMetrics> {
246        self.address_metrics_tx.subscribe()
247    }
248
249    /// Set the local listener address. Only for use in tests.
250    #[cfg(any(test, feature = "proptest-impl"))]
251    pub fn set_local_listener(&mut self, addr: SocketAddr) {
252        self.local_listener = addr;
253    }
254
255    /// Get the local listener address.
256    ///
257    /// This address contains minimal state, but it is not sanitized.
258    pub fn local_listener_meta_addr(&self, now: chrono::DateTime<Utc>) -> MetaAddr {
259        let now: DateTime32 = now.try_into().expect("will succeed until 2038");
260
261        MetaAddr::new_local_listener_change(self.local_listener)
262            .local_listener_into_new_meta_addr(now)
263    }
264
265    /// Get the local listener [`SocketAddr`].
266    pub fn local_listener_socket_addr(&self) -> SocketAddr {
267        self.local_listener
268    }
269
270    /// Get the active addresses in `self` in random order with sanitized timestamps,
271    /// including our local listener address.
272    ///
273    /// Limited to the number of peer addresses Zebra should give out per `GetAddr` request.
274    pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
275        let now = Utc::now();
276        let mut peers = self.sanitized(now);
277        let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
278        peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));
279
280        peers
281    }
282
283    /// Get the active addresses in `self` in random order with sanitized timestamps,
284    /// including our local listener address.
285    pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
286        use rand::seq::SliceRandom;
287        let _guard = self.span.enter();
288
289        let mut peers = self.by_addr.clone();
290
291        // Unconditionally add our local listener address to the advertised peers,
292        // to replace any self-connection failures. The address book and change
293        // constructors make sure that the SocketAddr is canonical.
294        let local_listener = self.local_listener_meta_addr(now);
295        peers.insert(local_listener.addr, local_listener);
296
297        // Then sanitize and shuffle
298        let mut peers: Vec<MetaAddr> = peers
299            .descending_values()
300            .filter_map(|meta_addr| meta_addr.sanitize(&self.network))
301            // # Security
302            //
303            // Remove peers that:
304            //   - last responded more than three hours ago, or
305            //   - haven't responded yet but were reported last seen more than three hours ago
306            //
307            // This prevents Zebra from gossiping nodes that are likely unreachable. Gossiping such
308            // nodes impacts the network health, because connection attempts end up being wasted on
309            // peers that are less likely to respond.
310            .filter(|addr| addr.is_active_for_gossip(now))
311            .collect();
312
313        peers.shuffle(&mut rand::thread_rng());
314
315        peers
316    }
317
318    /// Get the active addresses in `self`, in preferred caching order,
319    /// excluding our local listener address.
320    pub fn cacheable(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
321        let _guard = self.span.enter();
322
323        let peers = self.by_addr.clone();
324
325        // Get peers in preferred order, then keep the recently active ones
326        peers
327            .descending_values()
328            // # Security
329            //
330            // Remove peers that:
331            //   - last responded more than three hours ago, or
332            //   - haven't responded yet but were reported last seen more than three hours ago
333            //
334            // This prevents Zebra from caching nodes that are likely unreachable,
335            // which improves startup time and reliability.
336            .filter(|addr| addr.is_active_for_gossip(now))
337            .cloned()
338            .collect()
339    }
340
341    /// Look up `addr` in the address book, and return its [`MetaAddr`].
342    ///
343    /// Converts `addr` to a canonical address before looking it up.
344    pub fn get(&mut self, addr: PeerSocketAddr) -> Option<MetaAddr> {
345        let addr = canonical_peer_addr(*addr);
346
347        // Unfortunately, `OrderedMap` doesn't implement `get`.
348        let meta_addr = self.by_addr.remove(&addr);
349
350        if let Some(meta_addr) = meta_addr {
351            self.by_addr.insert(addr, meta_addr);
352        }
353
354        meta_addr
355    }
356
357    /// Returns true if `updated` needs to be applied to the recent outbound peer connection IP cache.
358    ///
359    /// Checks if there are no existing entries in the address book with this IP,
360    /// or if `updated` has a more recent `last_response` requiring the outbound connector to wait
361    /// longer before initiating handshakes with peers at this IP.
362    ///
363    /// This code only needs to check a single cache entry, rather than the entire address book,
364    /// because other code maintains these invariants:
365    /// - `last_response` times for an entry can only increase.
366    /// - this is the only field checked by `has_connection_recently_responded()`
367    ///
368    /// See [`AddressBook::is_ready_for_connection_attempt_with_ip`] for more details.
369    fn should_update_most_recent_by_ip(&self, updated: MetaAddr) -> bool {
370        let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
371            return false;
372        };
373
374        if let Some(previous) = most_recent_by_ip.get(&updated.addr.ip()) {
375            updated.last_connection_state == PeerAddrState::Responded
376                && updated.last_response() > previous.last_response()
377        } else {
378            updated.last_connection_state == PeerAddrState::Responded
379        }
380    }
381
382    /// Returns true if `addr` is the latest entry for its IP, which is stored in `most_recent_by_ip`.
383    /// The entry is checked for an exact match to the IP and port of `addr`.
384    fn should_remove_most_recent_by_ip(&self, addr: PeerSocketAddr) -> bool {
385        let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
386            return false;
387        };
388
389        if let Some(previous) = most_recent_by_ip.get(&addr.ip()) {
390            previous.addr == addr
391        } else {
392            false
393        }
394    }
395
396    /// Apply `change` to the address book, returning the updated `MetaAddr`,
397    /// if the change was valid.
398    ///
399    /// # Correctness
400    ///
401    /// All changes should go through `update`, so that the address book
402    /// only contains valid outbound addresses.
403    ///
404    /// Change addresses must be canonical `PeerSocketAddr`s. This makes sure that
405    /// each address book entry has a unique IP address.
406    ///
407    /// # Security
408    ///
409    /// This function must apply every attempted, responded, and failed change
410    /// to the address book. This prevents rapid reconnections to the same peer.
411    ///
412    /// As an exception, this function can ignore all changes for specific
413    /// [`PeerSocketAddr`]s. Ignored addresses will never be used to connect to
414    /// peers.
415    #[allow(clippy::unwrap_in_result)]
416    pub fn update(&mut self, change: MetaAddrChange) -> Option<MetaAddr> {
417        if self.bans_by_ip.contains_key(&change.addr().ip()) {
418            tracing::warn!(
419                ?change,
420                "attempted to add a banned peer addr to address book"
421            );
422            return None;
423        }
424
425        let previous = self.get(change.addr());
426
427        let _guard = self.span.enter();
428
429        let instant_now = Instant::now();
430        let chrono_now = Utc::now();
431
432        let updated = change.apply_to_meta_addr(previous, instant_now, chrono_now);
433
434        trace!(
435            ?change,
436            ?updated,
437            ?previous,
438            total_peers = self.by_addr.len(),
439            recent_peers = self.recently_live_peers(chrono_now).len(),
440            "calculated updated address book entry",
441        );
442
443        if let Some(updated) = updated {
444            if updated.misbehavior() >= constants::MAX_PEER_MISBEHAVIOR_SCORE {
445                // Ban and skip outbound connections with excessively misbehaving peers.
446                let banned_ip = updated.addr.ip();
447                let bans_by_ip = Arc::make_mut(&mut self.bans_by_ip);
448
449                bans_by_ip.insert(banned_ip, Instant::now());
450                if bans_by_ip.len() > constants::MAX_BANNED_IPS {
451                    // Remove the oldest banned IP from the address book.
452                    bans_by_ip.shift_remove_index(0);
453                }
454
455                // `most_recent_by_ip` is only populated when
456                // `max_connections_per_ip == 1`. The ban path runs for any
457                // configured value, so we must guard the optional cache rather
458                // than unwrap it.
459                if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
460                    most_recent_by_ip.remove(&banned_ip);
461                }
462
463                let banned_addrs: Vec<_> = self
464                    .by_addr
465                    .descending_keys()
466                    .skip_while(|addr| addr.ip() != banned_ip)
467                    .take_while(|addr| addr.ip() == banned_ip)
468                    .cloned()
469                    .collect();
470
471                for addr in banned_addrs {
472                    self.by_addr.remove(&addr);
473                }
474
475                warn!(
476                    ?updated,
477                    total_peers = self.by_addr.len(),
478                    recent_peers = self.recently_live_peers(chrono_now).len(),
479                    "banned ip and removed banned peer addresses from address book",
480                );
481
482                return None;
483            }
484
485            // Ignore invalid outbound addresses.
486            // (Inbound connections can be monitored via Zebra's metrics.)
487            if !updated.address_is_valid_for_outbound(&self.network) {
488                return None;
489            }
490
491            // Ignore invalid outbound services and other info,
492            // but only if the peer has never been attempted.
493            //
494            // Otherwise, if we got the info directly from the peer,
495            // store it in the address book, so we know not to reconnect.
496            if !updated.last_known_info_is_valid_for_outbound(&self.network)
497                && updated.last_connection_state.is_never_attempted()
498            {
499                return None;
500            }
501
502            self.by_addr.insert(updated.addr, updated);
503
504            // Add the address to `most_recent_by_ip` if it sent the most recent
505            // response Zebra has received from this IP.
506            if self.should_update_most_recent_by_ip(updated) {
507                self.most_recent_by_ip
508                    .as_mut()
509                    .expect("should be some when should_update_most_recent_by_ip is true")
510                    .insert(updated.addr.ip(), updated);
511            }
512
513            debug!(
514                ?change,
515                ?updated,
516                ?previous,
517                total_peers = self.by_addr.len(),
518                recent_peers = self.recently_live_peers(chrono_now).len(),
519                "updated address book entry",
520            );
521
522            // Security: Limit the number of peers in the address book.
523            //
524            // We only delete outdated peers when we have too many peers.
525            // If we deleted them as soon as they became too old,
526            // then other peers could re-insert them into the address book.
527            // And we would start connecting to those outdated peers again,
528            // ignoring the age limit in [`MetaAddr::is_probably_reachable`].
529            while self.by_addr.len() > self.addr_limit {
530                let surplus_peer = self
531                    .peers()
532                    .next_back()
533                    .expect("just checked there is at least one peer");
534
535                self.by_addr.remove(&surplus_peer.addr);
536
537                // Check if this surplus peer's addr matches that in `most_recent_by_ip`
538                // for this the surplus peer's ip to remove it there as well.
539                if self.should_remove_most_recent_by_ip(surplus_peer.addr) {
540                    self.most_recent_by_ip
541                        .as_mut()
542                        .expect("should be some when should_remove_most_recent_by_ip is true")
543                        .remove(&surplus_peer.addr.ip());
544                }
545
546                debug!(
547                    surplus = ?surplus_peer,
548                    ?updated,
549                    total_peers = self.by_addr.len(),
550                    recent_peers = self.recently_live_peers(chrono_now).len(),
551                    "removed surplus address book entry",
552                );
553            }
554
555            assert!(self.len() <= self.addr_limit);
556
557            std::mem::drop(_guard);
558            self.update_metrics(instant_now, chrono_now);
559        }
560
561        updated
562    }
563
564    /// Removes the entry with `addr`, returning it if it exists
565    ///
566    /// # Note
567    ///
568    /// All address removals should go through `take`, so that the address
569    /// book metrics are accurate.
570    #[allow(dead_code)]
571    fn take(&mut self, removed_addr: PeerSocketAddr) -> Option<MetaAddr> {
572        let _guard = self.span.enter();
573
574        let instant_now = Instant::now();
575        let chrono_now = Utc::now();
576
577        trace!(
578            ?removed_addr,
579            total_peers = self.by_addr.len(),
580            recent_peers = self.recently_live_peers(chrono_now).len(),
581        );
582
583        if let Some(entry) = self.by_addr.remove(&removed_addr) {
584            // Check if this surplus peer's addr matches that in `most_recent_by_ip`
585            // for this the surplus peer's ip to remove it there as well.
586            if self.should_remove_most_recent_by_ip(entry.addr) {
587                if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
588                    most_recent_by_ip.remove(&entry.addr.ip());
589                }
590            }
591
592            std::mem::drop(_guard);
593            self.update_metrics(instant_now, chrono_now);
594            Some(entry)
595        } else {
596            None
597        }
598    }
599
600    /// Returns true if the given [`PeerSocketAddr`] is pending a reconnection
601    /// attempt.
602    pub fn pending_reconnection_addr(&mut self, addr: PeerSocketAddr) -> bool {
603        let meta_addr = self.get(addr);
604
605        let _guard = self.span.enter();
606        match meta_addr {
607            None => false,
608            Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending,
609        }
610    }
611
612    /// Return an iterator over all peers.
613    ///
614    /// Returns peers in reconnection attempt order, including recently connected peers.
615    pub fn peers(&'_ self) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
616        let _guard = self.span.enter();
617        self.by_addr.descending_values().cloned()
618    }
619
620    /// Is this IP ready for a new outbound connection attempt?
621    /// Checks if the outbound connection with the most recent response at this IP has recently responded.
622    ///
623    /// Note: last_response times may remain live for a long time if the local clock is changed to an earlier time.
624    fn is_ready_for_connection_attempt_with_ip(
625        &self,
626        ip: &IpAddr,
627        chrono_now: chrono::DateTime<Utc>,
628    ) -> bool {
629        let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
630            // if we're not checking IPs, any connection is allowed
631            return true;
632        };
633        let Some(same_ip_peer) = most_recent_by_ip.get(ip) else {
634            // If there's no entry for this IP, any connection is allowed
635            return true;
636        };
637        !same_ip_peer.has_connection_recently_responded(chrono_now)
638    }
639
640    /// Return an iterator over peers that are due for a reconnection attempt,
641    /// in reconnection attempt order.
642    pub fn reconnection_peers(
643        &'_ self,
644        instant_now: Instant,
645        chrono_now: chrono::DateTime<Utc>,
646    ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
647        let _guard = self.span.enter();
648
649        // Skip live peers, and peers pending a reconnect attempt.
650        // The peers are already stored in sorted order.
651        self.by_addr
652            .descending_values()
653            .filter(move |peer| {
654                peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
655                    && self.is_ready_for_connection_attempt_with_ip(&peer.addr.ip(), chrono_now)
656            })
657            .cloned()
658    }
659
660    /// Return an iterator over all the peers in `state`,
661    /// in reconnection attempt order, including recently connected peers.
662    pub fn state_peers(
663        &'_ self,
664        state: PeerAddrState,
665    ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
666        let _guard = self.span.enter();
667
668        self.by_addr
669            .descending_values()
670            .filter(move |peer| peer.last_connection_state == state)
671            .cloned()
672    }
673
674    /// Return an iterator over peers that might be connected,
675    /// in reconnection attempt order.
676    pub fn maybe_connected_peers(
677        &'_ self,
678        instant_now: Instant,
679        chrono_now: chrono::DateTime<Utc>,
680    ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
681        let _guard = self.span.enter();
682
683        self.by_addr
684            .descending_values()
685            .filter(move |peer| {
686                !peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
687            })
688            .cloned()
689    }
690
691    /// Returns banned IP addresses.
692    pub fn bans(&self) -> Arc<IndexMap<IpAddr, Instant>> {
693        self.bans_by_ip.clone()
694    }
695
696    /// Returns the number of entries in this address book.
697    pub fn len(&self) -> usize {
698        self.by_addr.len()
699    }
700
701    /// Returns metrics for the addresses in this address book.
702    /// Only for use in tests.
703    ///
704    /// # Correctness
705    ///
706    /// Use [`AddressBook::address_metrics_watcher`] in production code,
707    /// to avoid deadlocks.
708    #[cfg(test)]
709    pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
710        self.address_metrics_internal(now)
711    }
712
713    /// Returns metrics for the addresses in this address book.
714    ///
715    /// # Correctness
716    ///
717    /// External callers should use [`AddressBook::address_metrics_watcher`]
718    /// in production code, to avoid deadlocks.
719    /// (Using the watch channel receiver does not lock the address book mutex.)
720    fn address_metrics_internal(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
721        let responded = self.state_peers(PeerAddrState::Responded).count();
722        let never_attempted_gossiped = self
723            .state_peers(PeerAddrState::NeverAttemptedGossiped)
724            .count();
725        let failed = self.state_peers(PeerAddrState::Failed).count();
726        let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count();
727
728        let recently_live = self.recently_live_peers(now).len();
729        let recently_stopped_responding = responded
730            .checked_sub(recently_live)
731            .expect("all recently live peers must have responded");
732
733        let num_addresses = self.len();
734
735        AddressMetrics {
736            responded,
737            never_attempted_gossiped,
738            failed,
739            attempt_pending,
740            recently_live,
741            recently_stopped_responding,
742            num_addresses,
743            address_limit: self.addr_limit,
744        }
745    }
746
747    /// Update the metrics for this address book.
748    fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) {
749        let _guard = self.span.enter();
750
751        let m = self.address_metrics_internal(chrono_now);
752
753        // Ignore errors: we don't care if any receivers are listening.
754        let _ = self.address_metrics_tx.send(m);
755
756        // TODO: rename to address_book.[state_name]
757        metrics::gauge!("candidate_set.responded").set(m.responded as f64);
758        metrics::gauge!("candidate_set.gossiped").set(m.never_attempted_gossiped as f64);
759        metrics::gauge!("candidate_set.failed").set(m.failed as f64);
760        metrics::gauge!("candidate_set.pending").set(m.attempt_pending as f64);
761
762        // TODO: rename to address_book.responded.recently_live
763        metrics::gauge!("candidate_set.recently_live").set(m.recently_live as f64);
764        // TODO: rename to address_book.responded.stopped_responding
765        metrics::gauge!("candidate_set.disconnected").set(m.recently_stopped_responding as f64);
766
767        std::mem::drop(_guard);
768        self.log_metrics(&m, instant_now);
769    }
770
771    /// Log metrics for this address book
772    fn log_metrics(&mut self, m: &AddressMetrics, now: Instant) {
773        let _guard = self.span.enter();
774
775        trace!(
776            address_metrics = ?m,
777        );
778
779        if m.responded > 0 {
780            return;
781        }
782
783        // These logs are designed to be human-readable in a terminal, at the
784        // default Zebra log level. If you need to know address states for
785        // every request, use the trace-level logs, or the metrics exporter.
786        if let Some(last_address_log) = self.last_address_log {
787            // Avoid duplicate address logs
788            if now.saturating_duration_since(last_address_log).as_secs() < 60 {
789                return;
790            }
791        } else {
792            // Suppress initial logs until the peer set has started up.
793            // There can be multiple address changes before the first peer has
794            // responded.
795            self.last_address_log = Some(now);
796            return;
797        }
798
799        self.last_address_log = Some(now);
800        // if all peers have failed
801        if m.responded + m.attempt_pending + m.never_attempted_gossiped == 0 {
802            warn!(
803                address_metrics = ?m,
804                "all peer addresses have failed. Hint: check your network connection"
805            );
806        } else {
807            info!(
808                address_metrics = ?m,
809                "no active peer connections: trying gossiped addresses"
810            );
811        }
812    }
813}
814
815impl AddressBookPeers for AddressBook {
816    fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
817        let _guard = self.span.enter();
818
819        self.by_addr
820            .descending_values()
821            .filter(|peer| peer.was_recently_live(now))
822            .cloned()
823            .collect()
824    }
825
826    fn add_peer(&mut self, peer: PeerSocketAddr) -> bool {
827        if self.get(peer).is_some() {
828            // Peer already exists in the address book, so we don't need to add it again.
829            return false;
830        }
831        self.update(MetaAddr::new_initial_peer(peer)).is_some()
832    }
833}
834
835impl AddressBookPeers for Arc<Mutex<AddressBook>> {
836    fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
837        self.lock()
838            .expect("panic in a previous thread that was holding the mutex")
839            .recently_live_peers(now)
840    }
841
842    fn add_peer(&mut self, peer: PeerSocketAddr) -> bool {
843        self.lock()
844            .expect("panic in a previous thread that was holding the mutex")
845            .add_peer(peer)
846    }
847}
848
849impl Extend<MetaAddrChange> for AddressBook {
850    fn extend<T>(&mut self, iter: T)
851    where
852        T: IntoIterator<Item = MetaAddrChange>,
853    {
854        for change in iter.into_iter() {
855            self.update(change);
856        }
857    }
858}
859
860impl Clone for AddressBook {
861    /// Clone the addresses, address limit, local listener address, and span.
862    ///
863    /// Cloned address books have a separate metrics struct watch channel, and an empty last address log.
864    ///
865    /// All address books update the same prometheus metrics.
866    fn clone(&self) -> AddressBook {
867        // The existing metrics might be outdated, but we avoid calling `update_metrics`,
868        // so we don't overwrite the prometheus metrics from the main address book.
869        let (address_metrics_tx, _address_metrics_rx) =
870            watch::channel(*self.address_metrics_tx.borrow());
871
872        AddressBook {
873            by_addr: self.by_addr.clone(),
874            local_listener: self.local_listener,
875            network: self.network.clone(),
876            addr_limit: self.addr_limit,
877            span: self.span.clone(),
878            address_metrics_tx,
879            last_address_log: None,
880            most_recent_by_ip: self.most_recent_by_ip.clone(),
881            bans_by_ip: self.bans_by_ip.clone(),
882        }
883    }
884}