zebra_network/peer/
handshake.rs

1//! Initial [`Handshake`]s with Zebra peers over a `PeerTransport`.
2
3use std::{
4    cmp::min,
5    fmt,
6    future::Future,
7    net::{Ipv4Addr, SocketAddr},
8    panic,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Duration,
13};
14
15use chrono::{TimeZone, Utc};
16use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
17use indexmap::IndexSet;
18use tokio::{
19    io::{AsyncRead, AsyncWrite},
20    sync::broadcast,
21    task::JoinError,
22    time::{error, timeout, Instant},
23};
24use tokio_stream::wrappers::IntervalStream;
25use tokio_util::codec::Framed;
26use tower::Service;
27use tracing::{span, Level, Span};
28use tracing_futures::Instrument;
29
30use zebra_chain::{
31    chain_tip::{ChainTip, NoChainTip},
32    parameters::Network,
33    serialization::{DateTime32, SerializationError},
34};
35
36use crate::{
37    constants,
38    meta_addr::MetaAddrChange,
39    peer::{
40        CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
41        MinimumPeerVersion, PeerError,
42    },
43    peer_set::{ConnectionTracker, InventoryChange},
44    protocol::{
45        external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
46        internal::{Request, Response},
47    },
48    types::MetaAddr,
49    BoxError, Config, PeerSocketAddr, VersionMessage,
50};
51
52#[cfg(test)]
53mod tests;
54
55/// A [`Service`] that handshakes with a remote peer and constructs a
56/// client/server pair.
57///
58/// CORRECTNESS
59///
60/// To avoid hangs, each handshake (or its connector) should be:
61/// - launched in a separate task, and
62/// - wrapped in a timeout.
63pub struct Handshake<S, C = NoChainTip>
64where
65    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
66    S::Future: Send,
67    C: ChainTip + Clone + Send + 'static,
68{
69    config: Config,
70    user_agent: String,
71    our_services: PeerServices,
72    relay: bool,
73
74    inbound_service: S,
75    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
76    inv_collector: broadcast::Sender<InventoryChange>,
77    minimum_peer_version: MinimumPeerVersion<C>,
78    nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
79
80    parent_span: Span,
81}
82
83impl<S, C> fmt::Debug for Handshake<S, C>
84where
85    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
86    S::Future: Send,
87    C: ChainTip + Clone + Send + 'static,
88{
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        // skip the channels, they don't tell us anything useful
91        f.debug_struct(std::any::type_name::<Handshake<S, C>>())
92            .field("config", &self.config)
93            .field("user_agent", &self.user_agent)
94            .field("our_services", &self.our_services)
95            .field("relay", &self.relay)
96            .field("minimum_peer_version", &self.minimum_peer_version)
97            .field("parent_span", &self.parent_span)
98            .finish()
99    }
100}
101
102impl<S, C> Clone for Handshake<S, C>
103where
104    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
105    S::Future: Send,
106    C: ChainTip + Clone + Send + 'static,
107{
108    fn clone(&self) -> Self {
109        Self {
110            config: self.config.clone(),
111            user_agent: self.user_agent.clone(),
112            our_services: self.our_services,
113            relay: self.relay,
114            inbound_service: self.inbound_service.clone(),
115            address_book_updater: self.address_book_updater.clone(),
116            inv_collector: self.inv_collector.clone(),
117            minimum_peer_version: self.minimum_peer_version.clone(),
118            nonces: self.nonces.clone(),
119            parent_span: self.parent_span.clone(),
120        }
121    }
122}
123
124/// The metadata for a peer connection.
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub struct ConnectionInfo {
127    /// The connected peer address, if known.
128    /// This address might not be valid for outbound connections.
129    ///
130    /// Peers can be connected via a transient inbound or proxy address,
131    /// which will appear as the connected address to the OS and Zebra.
132    pub connected_addr: ConnectedAddr,
133
134    /// The network protocol [`VersionMessage`] sent by the remote peer.
135    pub remote: VersionMessage,
136
137    /// The network protocol version negotiated with the remote peer.
138    ///
139    /// Derived from `remote.version` and the
140    /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
141    pub negotiated_version: Version,
142}
143
144/// The peer address that we are handshaking with.
145///
146/// Typically, we can rely on outbound addresses, but inbound addresses don't
147/// give us enough information to reconnect to that peer.
148#[derive(Copy, Clone, PartialEq, Eq)]
149pub enum ConnectedAddr {
150    /// The address we used to make a direct outbound connection.
151    ///
152    /// In an honest network, a Zcash peer is listening on this exact address
153    /// and port.
154    OutboundDirect {
155        /// The connected outbound remote address and port.
156        addr: PeerSocketAddr,
157    },
158
159    /// The address we received from the OS, when a remote peer directly
160    /// connected to our Zcash listener port.
161    ///
162    /// In an honest network, a Zcash peer might be listening on this address,
163    /// if its outbound address is the same as its listener address. But the port
164    /// is an ephemeral outbound TCP port, not a listener port.
165    InboundDirect {
166        /// The connected inbound remote address and ephemeral port.
167        ///
168        /// The IP address might be the address of a Zcash peer, but the port is an ephemeral port.
169        addr: PeerSocketAddr,
170    },
171
172    /// The proxy address we used to make an outbound connection.
173    ///
174    /// The proxy address can be used by many connections, but our own ephemeral
175    /// outbound address and port can be used as an identifier for the duration
176    /// of this connection.
177    OutboundProxy {
178        /// The remote address and port of the proxy.
179        proxy_addr: SocketAddr,
180
181        /// The local address and transient port we used to connect to the proxy.
182        transient_local_addr: SocketAddr,
183    },
184
185    /// The address we received from the OS, when a remote peer connected via an
186    /// inbound proxy.
187    ///
188    /// The proxy's ephemeral outbound address can be used as an identifier for
189    /// the duration of this connection.
190    InboundProxy {
191        /// The local address and transient port we used to connect to the proxy.
192        transient_addr: SocketAddr,
193    },
194
195    /// An isolated connection, where we deliberately don't have any connection metadata.
196    Isolated,
197    //
198    // TODO: handle Tor onion addresses
199}
200
201/// Get an unspecified IPv4 address for `network`
202pub fn get_unspecified_ipv4_addr(network: Network) -> SocketAddr {
203    (Ipv4Addr::UNSPECIFIED, network.default_port()).into()
204}
205
206use ConnectedAddr::*;
207
208impl ConnectedAddr {
209    /// Returns a new outbound directly connected addr.
210    pub fn new_outbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
211        OutboundDirect { addr }
212    }
213
214    /// Returns a new inbound directly connected addr.
215    pub fn new_inbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
216        InboundDirect { addr }
217    }
218
219    /// Returns a new outbound connected addr via `proxy`.
220    ///
221    /// `local_addr` is the ephemeral local address of the connection.
222    #[allow(unused)]
223    pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
224        OutboundProxy {
225            proxy_addr: proxy,
226            transient_local_addr: local_addr,
227        }
228    }
229
230    /// Returns a new inbound connected addr from `proxy`.
231    //
232    // TODO: distinguish between direct listeners and proxy listeners in the
233    //       rest of zebra-network
234    #[allow(unused)]
235    pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
236        InboundProxy {
237            transient_addr: proxy,
238        }
239    }
240
241    /// Returns a new isolated connected addr, with no metadata.
242    pub fn new_isolated() -> ConnectedAddr {
243        Isolated
244    }
245
246    /// Returns a `PeerSocketAddr` that can be used to track this connection in the
247    /// `AddressBook`.
248    ///
249    /// `None` for inbound connections, proxy connections, and isolated
250    /// connections.
251    ///
252    /// # Correctness
253    ///
254    /// This address can be used for reconnection attempts, or as a permanent
255    /// identifier.
256    ///
257    /// # Security
258    ///
259    /// This address must not depend on the canonical address from the `Version`
260    /// message. Otherwise, malicious peers could interfere with other peers
261    /// `AddressBook` state.
262    ///
263    /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
264    pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
265        match self {
266            OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
267            // TODO: consider using the canonical address of the peer to track
268            //       outbound proxy connections
269            OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
270        }
271    }
272
273    /// Returns a `PeerSocketAddr` that can be used to temporarily identify a
274    /// connection.
275    ///
276    /// Isolated connections must not change Zebra's peer set or address book
277    /// state, so they do not have an identifier.
278    ///
279    /// # Correctness
280    ///
281    /// The returned address is only valid while the original connection is
282    /// open. It must not be used in the `AddressBook`, for outbound connection
283    /// attempts, or as a permanent identifier.
284    ///
285    /// # Security
286    ///
287    /// This address must not depend on the canonical address from the `Version`
288    /// message. Otherwise, malicious peers could interfere with other peers'
289    /// `PeerSet` state.
290    pub fn get_transient_addr(&self) -> Option<PeerSocketAddr> {
291        match self {
292            OutboundDirect { addr } => Some(*addr),
293            InboundDirect { addr } => Some(*addr),
294            OutboundProxy {
295                transient_local_addr,
296                ..
297            } => Some(PeerSocketAddr::from(*transient_local_addr)),
298            InboundProxy { transient_addr } => Some(PeerSocketAddr::from(*transient_addr)),
299            Isolated => None,
300        }
301    }
302
303    /// Returns the metrics label for this connection's address.
304    pub fn get_transient_addr_label(&self) -> String {
305        self.get_transient_addr()
306            .map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
307    }
308
309    /// Returns a short label for the kind of connection.
310    pub fn get_short_kind_label(&self) -> &'static str {
311        match self {
312            OutboundDirect { .. } => "Out",
313            InboundDirect { .. } => "In",
314            OutboundProxy { .. } => "ProxOut",
315            InboundProxy { .. } => "ProxIn",
316            Isolated => "Isol",
317        }
318    }
319
320    /// Returns a list of alternate remote peer addresses, which can be used for
321    /// reconnection attempts.
322    ///
323    /// Uses the connected address, and the remote canonical address.
324    ///
325    /// Skips duplicates. If this is an outbound connection, also skips the
326    /// remote address that we're currently connected to.
327    pub fn get_alternate_addrs(
328        &self,
329        mut canonical_remote: PeerSocketAddr,
330    ) -> impl Iterator<Item = PeerSocketAddr> {
331        let addrs = match self {
332            OutboundDirect { addr } => {
333                // Fixup unspecified addresses and ports using known good data
334                if canonical_remote.ip().is_unspecified() {
335                    canonical_remote.set_ip(addr.ip());
336                }
337                if canonical_remote.port() == 0 {
338                    canonical_remote.set_port(addr.port());
339                }
340
341                // Try the canonical remote address, if it is different from the
342                // outbound address (which we already have in our address book)
343                if &canonical_remote != addr {
344                    vec![canonical_remote]
345                } else {
346                    // we didn't learn a new address from the handshake:
347                    // it's the same as the outbound address, which is already in our address book
348                    Vec::new()
349                }
350            }
351
352            InboundDirect { addr } => {
353                // Use the IP from the TCP connection, and the port the peer told us
354                let maybe_addr = SocketAddr::new(addr.ip(), canonical_remote.port()).into();
355
356                // Try both addresses, but remove one duplicate if they match
357                if canonical_remote != maybe_addr {
358                    vec![canonical_remote, maybe_addr]
359                } else {
360                    vec![canonical_remote]
361                }
362            }
363
364            // Proxy addresses can't be used for reconnection attempts, but we
365            // can try the canonical remote address
366            OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],
367
368            // Hide all metadata for isolated connections
369            Isolated => Vec::new(),
370        };
371
372        addrs.into_iter()
373    }
374
375    /// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
376    pub fn is_inbound(&self) -> bool {
377        matches!(self, InboundDirect { .. } | InboundProxy { .. })
378    }
379}
380
381impl fmt::Debug for ConnectedAddr {
382    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383        let kind = self.get_short_kind_label();
384        let addr = self.get_transient_addr_label();
385
386        if matches!(self, Isolated) {
387            f.write_str(kind)
388        } else {
389            f.debug_tuple(kind).field(&addr).finish()
390        }
391    }
392}
393
394/// A builder for `Handshake`.
395pub struct Builder<S, C = NoChainTip>
396where
397    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
398    S::Future: Send,
399    C: ChainTip + Clone + Send + 'static,
400{
401    config: Option<Config>,
402    our_services: Option<PeerServices>,
403    user_agent: Option<String>,
404    relay: Option<bool>,
405
406    inbound_service: Option<S>,
407    address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
408    inv_collector: Option<broadcast::Sender<InventoryChange>>,
409    latest_chain_tip: C,
410}
411
412impl<S, C> Builder<S, C>
413where
414    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
415    S::Future: Send,
416    C: ChainTip + Clone + Send + 'static,
417{
418    /// Provide a config.  Mandatory.
419    pub fn with_config(mut self, config: Config) -> Self {
420        self.config = Some(config);
421        self
422    }
423
424    /// Provide a service to handle inbound requests. Mandatory.
425    pub fn with_inbound_service(mut self, inbound_service: S) -> Self {
426        self.inbound_service = Some(inbound_service);
427        self
428    }
429
430    /// Provide a channel for registering inventory advertisements. Optional.
431    ///
432    /// This channel takes transient remote addresses, which the `PeerSet` uses
433    /// to look up peers that have specific inventory.
434    pub fn with_inventory_collector(
435        mut self,
436        inv_collector: broadcast::Sender<InventoryChange>,
437    ) -> Self {
438        self.inv_collector = Some(inv_collector);
439        self
440    }
441
442    /// Provide a hook for timestamp collection. Optional.
443    ///
444    /// This channel takes `MetaAddr`s, permanent addresses which can be used to
445    /// make outbound connections to peers.
446    pub fn with_address_book_updater(
447        mut self,
448        address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
449    ) -> Self {
450        self.address_book_updater = Some(address_book_updater);
451        self
452    }
453
454    /// Provide the services this node advertises to other peers.  Optional.
455    ///
456    /// If this is unset, the node will advertise itself as a client.
457    pub fn with_advertised_services(mut self, services: PeerServices) -> Self {
458        self.our_services = Some(services);
459        self
460    }
461
462    /// Provide this node's user agent.  Optional.
463    ///
464    /// This must be a valid BIP14 string.  If it is unset, the user-agent will be empty.
465    pub fn with_user_agent(mut self, user_agent: String) -> Self {
466        self.user_agent = Some(user_agent);
467        self
468    }
469
470    /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
471    ///
472    /// If this is unset, the minimum accepted protocol version for peer connections is kept
473    /// constant over network upgrade activations.
474    ///
475    /// Use [`NoChainTip`] to explicitly provide no chain tip.
476    pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
477    where
478        NewC: ChainTip + Clone + Send + 'static,
479    {
480        Builder {
481            latest_chain_tip,
482
483            // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
484            config: self.config,
485            inbound_service: self.inbound_service,
486            address_book_updater: self.address_book_updater,
487            our_services: self.our_services,
488            user_agent: self.user_agent,
489            relay: self.relay,
490            inv_collector: self.inv_collector,
491        }
492    }
493
494    /// Whether to request that peers relay transactions to our node.  Optional.
495    ///
496    /// If this is unset, the node will not request transactions.
497    pub fn want_transactions(mut self, relay: bool) -> Self {
498        self.relay = Some(relay);
499        self
500    }
501
502    /// Consume this builder and produce a [`Handshake`].
503    ///
504    /// Returns an error only if any mandatory field was unset.
505    pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
506        let config = self.config.ok_or("did not specify config")?;
507        let inbound_service = self
508            .inbound_service
509            .ok_or("did not specify inbound service")?;
510        let inv_collector = self.inv_collector.unwrap_or_else(|| {
511            let (tx, _) = broadcast::channel(100);
512            tx
513        });
514        let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
515            // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
516            // channel. Dropping the receiver means sends will fail, but we don't care.
517            let (tx, _rx) = tokio::sync::mpsc::channel(1);
518            tx
519        });
520        let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
521        let user_agent = self.user_agent.unwrap_or_default();
522        let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
523        let relay = self.relay.unwrap_or(false);
524        let network = config.network.clone();
525        let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, &network);
526
527        Ok(Handshake {
528            config,
529            user_agent,
530            our_services,
531            relay,
532            inbound_service,
533            address_book_updater,
534            inv_collector,
535            minimum_peer_version,
536            nonces,
537            parent_span: Span::current(),
538        })
539    }
540}
541
542impl<S> Handshake<S, NoChainTip>
543where
544    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
545    S::Future: Send,
546{
547    /// Create a builder that configures a [`Handshake`] service.
548    pub fn builder() -> Builder<S, NoChainTip> {
549        // We don't derive `Default` because the derive inserts a `where S:
550        // Default` bound even though `Option<S>` implements `Default` even if
551        // `S` does not.
552        Builder {
553            config: None,
554            our_services: None,
555            user_agent: None,
556            relay: None,
557            inbound_service: None,
558            address_book_updater: None,
559            inv_collector: None,
560            latest_chain_tip: NoChainTip,
561        }
562    }
563}
564
565/// Negotiate the Zcash network protocol version with the remote peer at `connected_addr`, using
566/// the connection `peer_conn`.
567///
568/// We split `Handshake` into its components before calling this function, to avoid infectious
569/// `Sync` bounds on the returned future.
570///
571/// Returns the [`VersionMessage`] sent by the remote peer, and the [`Version`] negotiated with the
572/// remote peer, inside a [`ConnectionInfo`] struct.
573#[allow(clippy::too_many_arguments)]
574pub async fn negotiate_version<PeerTransport>(
575    peer_conn: &mut Framed<PeerTransport, Codec>,
576    connected_addr: &ConnectedAddr,
577    config: Config,
578    nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
579    user_agent: String,
580    our_services: PeerServices,
581    relay: bool,
582    mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
583) -> Result<Arc<ConnectionInfo>, HandshakeError>
584where
585    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
586{
587    // Create a random nonce for this connection
588    let local_nonce = Nonce::default();
589
590    // Insert the nonce for this handshake into the shared nonce set.
591    // Each connection has its own connection state, and handshakes execute concurrently.
592    //
593    // # Correctness
594    //
595    // It is ok to wait for the lock here, because handshakes have a short
596    // timeout, and the async mutex will be released when the task times
597    // out.
598    {
599        let mut locked_nonces = nonces.lock().await;
600
601        // Duplicate nonces are very rare, because they require a 64-bit random number collision,
602        // and the nonce set is limited to a few hundred entries.
603        let is_unique_nonce = locked_nonces.insert(local_nonce);
604        if !is_unique_nonce {
605            return Err(HandshakeError::LocalDuplicateNonce);
606        }
607
608        // # Security
609        //
610        // Limit the amount of memory used for nonces.
611        // Nonces can be left in the set if the connection fails or times out between
612        // the nonce being inserted, and it being removed.
613        //
614        // Zebra has strict connection limits, so we limit the number of nonces to
615        // the configured connection limit.
616        // This is a tradeoff between:
617        // - avoiding memory denial of service attacks which make large numbers of connections,
618        //   for example, 100 failed inbound connections takes 1 second.
619        // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
620        // - collision probability: two hundred 64-bit nonces have a very low collision probability
621        //   <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
622        while locked_nonces.len() > config.peerset_total_connection_limit() {
623            locked_nonces.shift_remove_index(0);
624        }
625
626        std::mem::drop(locked_nonces);
627    }
628
629    // Don't leak our exact clock skew to our peers. On the other hand,
630    // we can't deviate too much, or zcashd will get confused.
631    // Inspection of the zcashd source code reveals that the timestamp
632    // is only ever used at the end of parsing the version message, in
633    //
634    // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
635    //
636    // AddTimeData is defined in src/timedata.cpp and is a no-op as long
637    // as the difference between the specified timestamp and the
638    // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
639    // to 10 * 60 seconds (10 minutes).
640    //
641    // nTimeOffset is peer metadata that is never used, except for
642    // statistics.
643    //
644    // To try to stay within the range where zcashd will ignore our clock skew,
645    // truncate the timestamp to the nearest 5 minutes.
646    let now = Utc::now().timestamp();
647    let timestamp = Utc
648        .timestamp_opt(now - now.rem_euclid(5 * 60), 0)
649        .single()
650        .expect("in-range number of seconds and valid nanosecond");
651
652    let (their_addr, our_services, our_listen_addr) = match connected_addr {
653        // Version messages require an address, so we use
654        // an unspecified address for Isolated connections
655        Isolated => {
656            let unspec_ipv4 = get_unspecified_ipv4_addr(config.network);
657            (unspec_ipv4.into(), PeerServices::empty(), unspec_ipv4)
658        }
659        _ => {
660            let their_addr = connected_addr
661                .get_transient_addr()
662                .expect("non-Isolated connections have a remote addr");
663
664            // Include the configured external address in our version message, if any, otherwise, include our listen address.
665            let advertise_addr = match config.external_addr {
666                Some(external_addr) => {
667                    info!(?their_addr, ?config.listen_addr, "using external address for Version messages");
668                    external_addr
669                }
670                None => config.listen_addr,
671            };
672
673            (their_addr, our_services, advertise_addr)
674        }
675    };
676
677    let our_version = VersionMessage {
678        version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
679        services: our_services,
680        timestamp,
681        address_recv: AddrInVersion::new(their_addr, PeerServices::NODE_NETWORK),
682        // TODO: detect external address (#1893)
683        address_from: AddrInVersion::new(our_listen_addr, our_services),
684        nonce: local_nonce,
685        user_agent: user_agent.clone(),
686        start_height: minimum_peer_version.chain_tip_height(),
687        relay,
688    }
689    .into();
690
691    debug!(?our_version, "sending initial version message");
692    peer_conn.send(our_version).await?;
693
694    let mut remote_msg = peer_conn
695        .next()
696        .await
697        .ok_or(HandshakeError::ConnectionClosed)??;
698
699    // Wait for next message if the one we got is not Version
700    let remote: VersionMessage = loop {
701        match remote_msg {
702            Message::Version(version_message) => {
703                debug!(?version_message, "got version message from remote peer");
704                break version_message;
705            }
706            _ => {
707                remote_msg = peer_conn
708                    .next()
709                    .await
710                    .ok_or(HandshakeError::ConnectionClosed)??;
711                debug!(?remote_msg, "ignoring non-version message from remote peer");
712            }
713        }
714    };
715
716    let remote_address_services = remote.address_from.untrusted_services();
717    if remote_address_services != remote.services {
718        info!(
719            ?remote.services,
720            ?remote_address_services,
721            ?remote.user_agent,
722            "peer with inconsistent version services and version address services",
723        );
724    }
725
726    // Check for nonce reuse, indicating self-connection
727    //
728    // # Correctness
729    //
730    // We must wait for the lock before we continue with the connection, to avoid
731    // self-connection. If the connection times out, the async lock will be
732    // released.
733    //
734    // # Security
735    //
736    // We don't remove the nonce here, because peers that observe our network traffic could
737    // maliciously remove nonces, and force us to make self-connections.
738    let nonce_reuse = nonces.lock().await.contains(&remote.nonce);
739    if nonce_reuse {
740        info!(?connected_addr, "rejecting self-connection attempt");
741        Err(HandshakeError::RemoteNonceReuse)?;
742    }
743
744    // # Security
745    //
746    // Reject connections to peers on old versions, because they might not know about all
747    // network upgrades and could lead to chain forks or slower block propagation.
748    let min_version = minimum_peer_version.current();
749
750    if remote.version < min_version {
751        debug!(
752            remote_ip = ?their_addr,
753            ?remote.version,
754            ?min_version,
755            ?remote.user_agent,
756            "disconnecting from peer with obsolete network protocol version",
757        );
758
759        // the value is the number of rejected handshakes, by peer IP and protocol version
760        metrics::counter!(
761            "zcash.net.peers.obsolete",
762            "remote_ip" => their_addr.to_string(),
763            "remote_version" => remote.version.to_string(),
764            "min_version" => min_version.to_string(),
765            "user_agent" => remote.user_agent.clone(),
766        )
767        .increment(1);
768
769        // the value is the remote version of the most recent rejected handshake from each peer
770        metrics::gauge!(
771            "zcash.net.peers.version.obsolete",
772            "remote_ip" => their_addr.to_string(),
773        )
774        .set(remote.version.0 as f64);
775
776        // Disconnect if peer is using an obsolete version.
777        return Err(HandshakeError::ObsoleteVersion(remote.version));
778    }
779
780    let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
781
782    // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
783    let connection_info = Arc::new(ConnectionInfo {
784        connected_addr: *connected_addr,
785        remote,
786        negotiated_version,
787    });
788
789    debug!(
790        remote_ip = ?their_addr,
791        ?connection_info.remote.version,
792        ?negotiated_version,
793        ?min_version,
794        ?connection_info.remote.user_agent,
795        "negotiated network protocol version with peer",
796    );
797
798    // the value is the number of connected handshakes, by peer IP and protocol version
799    metrics::counter!(
800        "zcash.net.peers.connected",
801        "remote_ip" => their_addr.to_string(),
802        "remote_version" => connection_info.remote.version.to_string(),
803        "negotiated_version" => negotiated_version.to_string(),
804        "min_version" => min_version.to_string(),
805        "user_agent" => connection_info.remote.user_agent.clone(),
806    )
807    .increment(1);
808
809    // the value is the remote version of the most recent connected handshake from each peer
810    metrics::gauge!(
811        "zcash.net.peers.version.connected",
812        "remote_ip" => their_addr.to_string(),
813    )
814    .set(connection_info.remote.version.0 as f64);
815
816    peer_conn.send(Message::Verack).await?;
817
818    let mut remote_msg = peer_conn
819        .next()
820        .await
821        .ok_or(HandshakeError::ConnectionClosed)??;
822
823    // Wait for next message if the one we got is not Verack
824    loop {
825        match remote_msg {
826            Message::Verack => {
827                debug!(?remote_msg, "got verack message from remote peer");
828                break;
829            }
830            _ => {
831                remote_msg = peer_conn
832                    .next()
833                    .await
834                    .ok_or(HandshakeError::ConnectionClosed)??;
835                debug!(?remote_msg, "ignoring non-verack message from remote peer");
836            }
837        }
838    }
839
840    Ok(connection_info)
841}
842
843/// A handshake request.
844/// Contains the information needed to handshake with the peer.
845pub struct HandshakeRequest<PeerTransport>
846where
847    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
848{
849    /// The tokio [`TcpStream`](tokio::net::TcpStream) or Tor
850    /// `arti_client::DataStream` to the peer.
851    // Use [`arti_client::DataStream`] when #5492 is done.
852    pub data_stream: PeerTransport,
853
854    /// The address of the peer, and other related information.
855    pub connected_addr: ConnectedAddr,
856
857    /// A connection tracker that reduces the open connection count when dropped.
858    ///
859    /// Used to limit the number of open connections in Zebra.
860    pub connection_tracker: ConnectionTracker,
861}
862
863impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
864where
865    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
866    S::Future: Send,
867    C: ChainTip + Clone + Send + 'static,
868    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
869{
870    type Response = Client;
871    type Error = BoxError;
872    type Future =
873        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
874
875    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
876        Poll::Ready(Ok(()))
877    }
878
879    fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
880        let HandshakeRequest {
881            data_stream,
882            connected_addr,
883            connection_tracker,
884        } = req;
885
886        let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
887        // set the peer connection span's parent to the global span, as it
888        // should exist independently of its creation source (inbound
889        // connection, crawler, initial peer, ...)
890        let connection_span =
891            span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
892
893        // Clone these upfront, so they can be moved into the future.
894        let nonces = self.nonces.clone();
895        let inbound_service = self.inbound_service.clone();
896        let address_book_updater = self.address_book_updater.clone();
897        let inv_collector = self.inv_collector.clone();
898        let config = self.config.clone();
899        let user_agent = self.user_agent.clone();
900        let our_services = self.our_services;
901        let relay = self.relay;
902        let minimum_peer_version = self.minimum_peer_version.clone();
903
904        // # Security
905        //
906        // `zebra_network::init()` implements a connection timeout on this future.
907        // Any code outside this future does not have a timeout.
908        let fut = async move {
909            debug!(
910                addr = ?connected_addr,
911                "negotiating protocol version with remote peer"
912            );
913
914            let mut peer_conn = Framed::new(
915                data_stream,
916                Codec::builder()
917                    .for_network(&config.network)
918                    .with_metrics_addr_label(connected_addr.get_transient_addr_label())
919                    .finish(),
920            );
921
922            let connection_info = negotiate_version(
923                &mut peer_conn,
924                &connected_addr,
925                config,
926                nonces,
927                user_agent,
928                our_services,
929                relay,
930                minimum_peer_version,
931            )
932            .await?;
933
934            let remote_services = connection_info.remote.services;
935
936            // The handshake succeeded: update the peer status from AttemptPending to Responded,
937            // and send initial connection info.
938            if let Some(book_addr) = connected_addr.get_address_book_addr() {
939                // the collector doesn't depend on network activity,
940                // so this await should not hang
941                let _ = address_book_updater
942                    .send(MetaAddr::new_connected(
943                        book_addr,
944                        &remote_services,
945                        connected_addr.is_inbound(),
946                    ))
947                    .await;
948            }
949
950            // Reconfigure the codec to use the negotiated version.
951            //
952            // TODO: The tokio documentation says not to do this while any frames are still being processed.
953            // Since we don't know that here, another way might be to release the tcp
954            // stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
955            let bare_codec = peer_conn.codec_mut();
956            bare_codec.reconfigure_version(connection_info.negotiated_version);
957
958            debug!("constructing client, spawning server");
959
960            // These channels communicate between the inbound and outbound halves of the connection,
961            // and between the different connection tasks. We create separate tasks and channels
962            // for each new connection.
963            let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
964            let (shutdown_tx, shutdown_rx) = oneshot::channel();
965            let error_slot = ErrorSlot::default();
966
967            let (peer_tx, peer_rx) = peer_conn.split();
968
969            // Instrument the peer's rx and tx streams.
970
971            let inner_conn_span = connection_span.clone();
972            let peer_tx = peer_tx.with(move |msg: Message| {
973                let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
974                // Add a metric for outbound messages.
975                metrics::counter!(
976                    "zcash.net.out.messages",
977                    "command" => msg.command(),
978                    "addr" => connected_addr.get_transient_addr_label(),
979                )
980                .increment(1);
981                // We need to use future::ready rather than an async block here,
982                // because we need the sink to be Unpin, and the With<Fut, ...>
983                // returned by .with is Unpin only if Fut is Unpin, and the
984                // futures generated by async blocks are not Unpin.
985                future::ready(Ok(msg)).instrument(span)
986            });
987
988            // CORRECTNESS
989            //
990            // Ping/Pong messages and every error must update the peer address state via
991            // the inbound_ts_collector.
992            //
993            // The heartbeat task sends regular Ping/Pong messages,
994            // and it ends the connection if the heartbeat times out.
995            // So we can just track peer activity based on Ping and Pong.
996            // (This significantly improves performance, by reducing time system calls.)
997            let inbound_ts_collector = address_book_updater.clone();
998            let inbound_inv_collector = inv_collector.clone();
999            let ts_inner_conn_span = connection_span.clone();
1000            let inv_inner_conn_span = connection_span.clone();
1001            let peer_rx = peer_rx
1002                .then(move |msg| {
1003                    // Add a metric for inbound messages and errors.
1004                    // Fire a timestamp or failure event.
1005                    let inbound_ts_collector = inbound_ts_collector.clone();
1006                    let span =
1007                        debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
1008
1009                    async move {
1010                        match &msg {
1011                            Ok(msg) => {
1012                                metrics::counter!(
1013                                    "zcash.net.in.messages",
1014                                    "command" => msg.command(),
1015                                    "addr" => connected_addr.get_transient_addr_label(),
1016                                )
1017                                .increment(1);
1018
1019                                // # Security
1020                                //
1021                                // Peer messages are not rate-limited, so we can't send anything
1022                                // to a shared channel or do anything expensive here.
1023                            }
1024                            Err(err) => {
1025                                metrics::counter!(
1026                                    "zebra.net.in.errors",
1027                                    "error" => err.to_string(),
1028                                    "addr" => connected_addr.get_transient_addr_label(),
1029                                )
1030                                .increment(1);
1031
1032                                // # Security
1033                                //
1034                                // Peer errors are rate-limited because:
1035                                // - opening connections is rate-limited
1036                                // - the number of connections is limited
1037                                // - after the first error, the peer is disconnected
1038                                if let Some(book_addr) = connected_addr.get_address_book_addr() {
1039                                    let _ = inbound_ts_collector
1040                                        .send(MetaAddr::new_errored(book_addr, remote_services))
1041                                        .await;
1042                                }
1043                            }
1044                        }
1045                        msg
1046                    }
1047                    .instrument(span)
1048                })
1049                .then(move |msg| {
1050                    let inbound_inv_collector = inbound_inv_collector.clone();
1051                    let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
1052                    register_inventory_status(msg, connected_addr, inbound_inv_collector)
1053                        .instrument(span)
1054                })
1055                .boxed();
1056
1057            // If we've learned potential peer addresses from the inbound connection remote address
1058            // or the handshake version message, add those addresses to the peer cache for this
1059            // peer.
1060            //
1061            // # Security
1062            //
1063            // We can't add these alternate addresses directly to the address book. If we did,
1064            // malicious peers could interfere with the address book state of other peers by
1065            // providing their addresses in `Version` messages. Or they could fill the address book
1066            // with fake addresses.
1067            //
1068            // These peer addresses are rate-limited because:
1069            // - opening connections is rate-limited
1070            // - these addresses are put in the peer address cache
1071            // - the peer address cache is only used when Zebra requests addresses from that peer
1072            let remote_canonical_addr = connection_info.remote.address_from.addr();
1073            let alternate_addrs = connected_addr
1074                .get_alternate_addrs(remote_canonical_addr)
1075                .map(|addr| {
1076                    // Assume the connecting node is a server node, and it's available now.
1077                    MetaAddr::new_gossiped_meta_addr(
1078                        addr,
1079                        PeerServices::NODE_NETWORK,
1080                        DateTime32::now(),
1081                    )
1082                });
1083
1084            let server = Connection::new(
1085                inbound_service,
1086                server_rx,
1087                error_slot.clone(),
1088                peer_tx,
1089                connection_tracker,
1090                connection_info.clone(),
1091                alternate_addrs.collect(),
1092            );
1093
1094            let connection_task = tokio::spawn(
1095                server
1096                    .run(peer_rx)
1097                    .instrument(connection_span.clone())
1098                    .boxed(),
1099            );
1100
1101            let heartbeat_task = tokio::spawn(
1102                send_periodic_heartbeats_with_shutdown_handle(
1103                    connected_addr,
1104                    shutdown_rx,
1105                    server_tx.clone(),
1106                    address_book_updater.clone(),
1107                )
1108                .instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
1109                .boxed(),
1110            );
1111
1112            let client = Client {
1113                connection_info,
1114                shutdown_tx: Some(shutdown_tx),
1115                server_tx,
1116                inv_collector,
1117                error_slot,
1118                connection_task,
1119                heartbeat_task,
1120            };
1121
1122            Ok(client)
1123        };
1124
1125        // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
1126        let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
1127
1128        // Spawn a new task to drive this handshake, forwarding panics to the calling task.
1129        tokio::spawn(fut.instrument(negotiator_span))
1130            .map(
1131                |join_result: Result<
1132                    Result<Result<Client, HandshakeError>, error::Elapsed>,
1133                    JoinError,
1134                >| {
1135                    match join_result {
1136                        Ok(Ok(Ok(connection_client))) => Ok(connection_client),
1137                        Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
1138                        Ok(Err(timeout_error)) => Err(timeout_error.into()),
1139                        Err(join_error) => match join_error.try_into_panic() {
1140                            // Forward panics to the calling task
1141                            Ok(panic_reason) => panic::resume_unwind(panic_reason),
1142                            Err(join_error) => Err(join_error.into()),
1143                        },
1144                    }
1145                },
1146            )
1147            .boxed()
1148    }
1149}
1150
1151/// Register any advertised or missing inventory in `msg` for `connected_addr`.
1152pub(crate) async fn register_inventory_status(
1153    msg: Result<Message, SerializationError>,
1154    connected_addr: ConnectedAddr,
1155    inv_collector: broadcast::Sender<InventoryChange>,
1156) -> Result<Message, SerializationError> {
1157    match (&msg, connected_addr.get_transient_addr()) {
1158        (Ok(Message::Inv(advertised)), Some(transient_addr)) => {
1159            // We ignore inventory messages with more than one
1160            // block, because they are most likely replies to a
1161            // query, rather than a newly gossiped block.
1162            //
1163            // (We process inventory messages with any number of
1164            // transactions.)
1165            //
1166            // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
1167            //
1168            // Note: zcashd has a bug where it merges queued inv messages of
1169            // the same or different types. Zebra compensates by sending `notfound`
1170            // responses to the inv collector. (#2156, #1768)
1171            //
1172            // (We can't split `inv`s, because that fills the inventory registry
1173            // with useless entries that the whole network has, making it large and slow.)
1174            match advertised.as_slice() {
1175                [advertised @ InventoryHash::Block(_)] => {
1176                    debug!(
1177                        ?advertised,
1178                        "registering gossiped advertised block inventory for peer"
1179                    );
1180
1181                    // The peer set and inv collector use the peer's remote
1182                    // address as an identifier
1183                    // If all receivers have been dropped, `send` returns an error.
1184                    // When that happens, Zebra is shutting down, so we want to ignore this error.
1185                    let _ = inv_collector
1186                        .send(InventoryChange::new_available(*advertised, transient_addr));
1187                }
1188                advertised => {
1189                    let advertised = advertised
1190                        .iter()
1191                        .filter(|advertised| advertised.unmined_tx_id().is_some());
1192
1193                    debug!(
1194                        ?advertised,
1195                        "registering advertised unmined transaction inventory for peer",
1196                    );
1197
1198                    if let Some(change) =
1199                        InventoryChange::new_available_multi(advertised, transient_addr)
1200                    {
1201                        // Ignore channel errors that should only happen during shutdown.
1202                        let _ = inv_collector.send(change);
1203                    }
1204                }
1205            }
1206        }
1207
1208        (Ok(Message::NotFound(missing)), Some(transient_addr)) => {
1209            // Ignore Errors and the unsupported FilteredBlock type
1210            let missing = missing.iter().filter(|missing| {
1211                missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
1212            });
1213
1214            debug!(?missing, "registering missing inventory for peer");
1215
1216            if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
1217                let _ = inv_collector.send(change);
1218            }
1219        }
1220        _ => {}
1221    }
1222
1223    msg
1224}
1225
1226/// Send periodical heartbeats to `server_tx`, and update the peer status through
1227/// `heartbeat_ts_collector`.
1228///
1229/// # Correctness
1230///
1231/// To prevent hangs:
1232/// - every await that depends on the network must have a timeout (or interval)
1233/// - every error/shutdown must update the address book state and return
1234///
1235/// The address book state can be updated via `ClientRequest.tx`, or the
1236/// heartbeat_ts_collector.
1237///
1238/// Returning from this function terminates the connection's heartbeat task.
1239async fn send_periodic_heartbeats_with_shutdown_handle(
1240    connected_addr: ConnectedAddr,
1241    shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
1242    server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1243    heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1244) -> Result<(), BoxError> {
1245    use futures::future::Either;
1246
1247    let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
1248        connected_addr,
1249        server_tx,
1250        heartbeat_ts_collector.clone(),
1251    );
1252
1253    pin_mut!(shutdown_rx);
1254    pin_mut!(heartbeat_run_loop);
1255
1256    // CORRECTNESS
1257    //
1258    // Currently, select prefers the first future if multiple
1259    // futures are ready.
1260    //
1261    // Starvation is impossible here, because interval has a
1262    // slow rate, and shutdown is a oneshot. If both futures
1263    // are ready, we want the shutdown to take priority over
1264    // sending a useless heartbeat.
1265    match future::select(shutdown_rx, heartbeat_run_loop).await {
1266        Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
1267            tracing::trace!("shutting down because Client requested shut down");
1268            handle_heartbeat_shutdown(
1269                PeerError::ClientCancelledHeartbeatTask,
1270                &heartbeat_ts_collector,
1271                &connected_addr,
1272            )
1273            .await
1274        }
1275        Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
1276            tracing::trace!("shutting down because Client was dropped");
1277            handle_heartbeat_shutdown(
1278                PeerError::ClientDropped,
1279                &heartbeat_ts_collector,
1280                &connected_addr,
1281            )
1282            .await
1283        }
1284        Either::Right((result, _unused_shutdown)) => {
1285            tracing::trace!("shutting down due to heartbeat failure");
1286            // heartbeat_timeout() already send an error on the timestamp collector channel
1287
1288            result
1289        }
1290    }
1291}
1292
1293/// Send periodical heartbeats to `server_tx`, and update the peer status through
1294/// `heartbeat_ts_collector`.
1295///
1296/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
1297async fn send_periodic_heartbeats_run_loop(
1298    connected_addr: ConnectedAddr,
1299    mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1300    heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1301) -> Result<(), BoxError> {
1302    // Don't send the first heartbeat immediately - we've just completed the handshake!
1303    let mut interval = tokio::time::interval_at(
1304        Instant::now() + constants::HEARTBEAT_INTERVAL,
1305        constants::HEARTBEAT_INTERVAL,
1306    );
1307    // If the heartbeat is delayed, also delay all future heartbeats.
1308    // (Shorter heartbeat intervals just add load, without any benefit.)
1309    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1310
1311    let mut interval_stream = IntervalStream::new(interval);
1312
1313    while let Some(_instant) = interval_stream.next().await {
1314        // We've reached another heartbeat interval without
1315        // shutting down, so do a heartbeat request.
1316        let ping_sent_at = Instant::now();
1317        if let Some(book_addr) = connected_addr.get_address_book_addr() {
1318            let _ = heartbeat_ts_collector
1319                .send(MetaAddr::new_ping_sent(book_addr, ping_sent_at.into()))
1320                .await;
1321        }
1322
1323        let heartbeat = send_one_heartbeat(&mut server_tx);
1324        let rtt = heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1325
1326        // # Security
1327        //
1328        // Peer heartbeats are rate-limited because:
1329        // - opening connections is rate-limited
1330        // - the number of connections is limited
1331        // - Zebra initiates each heartbeat using a timer
1332        if let Some(book_addr) = connected_addr.get_address_book_addr() {
1333            if let Some(rtt) = rtt {
1334                // the collector doesn't depend on network activity,
1335                // so this await should not hang
1336                let _ = heartbeat_ts_collector
1337                    .send(MetaAddr::new_responded(book_addr, Some(rtt)))
1338                    .await;
1339            }
1340        }
1341    }
1342
1343    unreachable!("unexpected IntervalStream termination")
1344}
1345
1346/// Send one heartbeat using `server_tx`.
1347async fn send_one_heartbeat(
1348    server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1349) -> Result<Response, BoxError> {
1350    // We just reached a heartbeat interval, so start sending
1351    // a heartbeat.
1352    let (tx, rx) = oneshot::channel();
1353
1354    // Try to send the heartbeat request
1355    let request = Request::Ping(Nonce::default());
1356    tracing::trace!(?request, "queueing heartbeat request");
1357    match server_tx.try_send(ClientRequest {
1358        request,
1359        tx,
1360        // we're not requesting inventory, so we don't need to update the registry
1361        inv_collector: None,
1362        transient_addr: None,
1363        span: tracing::Span::current(),
1364    }) {
1365        Ok(()) => {}
1366        Err(e) => {
1367            if e.is_disconnected() {
1368                Err(PeerError::ConnectionClosed)?;
1369            } else if e.is_full() {
1370                // Send the message when the Client becomes ready.
1371                // If sending takes too long, the heartbeat timeout will elapse
1372                // and close the connection, reducing our load to busy peers.
1373                server_tx.send(e.into_inner()).await?;
1374            } else {
1375                // we need to map unexpected error types to PeerErrors
1376                warn!(?e, "unexpected try_send error");
1377                Err(e)?;
1378            };
1379        }
1380    }
1381
1382    // Flush the heartbeat request from the queue
1383    server_tx.flush().await?;
1384    tracing::trace!("sent heartbeat request");
1385
1386    // Heartbeats are checked internally to the
1387    // connection logic, but we need to wait on the
1388    // response to avoid canceling the request.
1389    let response = rx.await??;
1390    tracing::trace!(?response, "got heartbeat response");
1391
1392    Ok(response)
1393}
1394
1395/// Wrap `fut` in a timeout, handing any inner or outer errors using
1396/// `handle_heartbeat_error`.
1397async fn heartbeat_timeout(
1398    fut: impl Future<Output = Result<Response, BoxError>>,
1399    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1400    connected_addr: &ConnectedAddr,
1401) -> Result<Option<Duration>, BoxError> {
1402    let response = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1403        Ok(inner_result) => {
1404            handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
1405        }
1406        Err(elapsed) => {
1407            handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
1408        }
1409    };
1410
1411    let rtt = match response {
1412        Response::Pong(rtt) => Some(rtt),
1413        _ => None,
1414    };
1415
1416    Ok(rtt)
1417}
1418
1419/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
1420async fn handle_heartbeat_error<T, E>(
1421    result: Result<T, E>,
1422    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1423    connected_addr: &ConnectedAddr,
1424) -> Result<T, E>
1425where
1426    E: std::fmt::Debug,
1427{
1428    match result {
1429        Ok(t) => Ok(t),
1430        Err(err) => {
1431            tracing::debug!(?err, "heartbeat error, shutting down");
1432
1433            // # Security
1434            //
1435            // Peer errors and shutdowns are rate-limited because:
1436            // - opening connections is rate-limited
1437            // - the number of connections is limited
1438            // - after the first error or shutdown, the peer is disconnected
1439            if let Some(book_addr) = connected_addr.get_address_book_addr() {
1440                let _ = address_book_updater
1441                    .send(MetaAddr::new_errored(book_addr, None))
1442                    .await;
1443            }
1444            Err(err)
1445        }
1446    }
1447}
1448
1449/// Mark `connected_addr` as shut down using `address_book_updater`.
1450async fn handle_heartbeat_shutdown(
1451    peer_error: PeerError,
1452    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1453    connected_addr: &ConnectedAddr,
1454) -> Result<(), BoxError> {
1455    tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
1456
1457    if let Some(book_addr) = connected_addr.get_address_book_addr() {
1458        let _ = address_book_updater
1459            .send(MetaAddr::new_shutdown(book_addr))
1460            .await;
1461    }
1462
1463    Err(peer_error.into())
1464}