Skip to main content

zebra_network/peer/
handshake.rs

1//! Initial [`Handshake`]s with Zebra peers over a `PeerTransport`.
2
3use std::{
4    cmp::min,
5    fmt,
6    future::Future,
7    net::{Ipv4Addr, SocketAddr},
8    panic,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Duration,
13};
14
15use chrono::{TimeZone, Utc};
16use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
17use indexmap::IndexSet;
18use tokio::{
19    io::{AsyncRead, AsyncWrite},
20    sync::broadcast,
21    task::JoinError,
22    time::{error, timeout, Instant},
23};
24use tokio_stream::wrappers::IntervalStream;
25use tokio_util::codec::Framed;
26use tower::Service;
27use tracing::{span, Level, Span};
28use tracing_futures::Instrument;
29
30use zebra_chain::{
31    chain_tip::{ChainTip, NoChainTip},
32    parameters::Network,
33    serialization::{DateTime32, SerializationError},
34};
35
36use crate::{
37    constants,
38    meta_addr::MetaAddrChange,
39    peer::{
40        CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
41        MinimumPeerVersion, PeerError,
42    },
43    peer_set::{ConnectionTracker, InventoryChange},
44    protocol::{
45        external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
46        internal::{Request, Response},
47    },
48    types::MetaAddr,
49    BoxError, Config, PeerSocketAddr, VersionMessage,
50};
51
52#[cfg(test)]
53mod tests;
54
55/// A [`Service`] that handshakes with a remote peer and constructs a
56/// client/server pair.
57///
58/// CORRECTNESS
59///
60/// To avoid hangs, each handshake (or its connector) should be:
61/// - launched in a separate task, and
62/// - wrapped in a timeout.
63pub struct Handshake<S, C = NoChainTip>
64where
65    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
66    S::Future: Send,
67    C: ChainTip + Clone + Send + 'static,
68{
69    config: Config,
70    user_agent: String,
71    our_services: PeerServices,
72    relay: bool,
73
74    inbound_service: S,
75    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
76    inv_collector: broadcast::Sender<InventoryChange>,
77    minimum_peer_version: MinimumPeerVersion<C>,
78    nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
79
80    parent_span: Span,
81}
82
83impl<S, C> fmt::Debug for Handshake<S, C>
84where
85    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
86    S::Future: Send,
87    C: ChainTip + Clone + Send + 'static,
88{
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        // skip the channels, they don't tell us anything useful
91        f.debug_struct(std::any::type_name::<Handshake<S, C>>())
92            .field("config", &self.config)
93            .field("user_agent", &self.user_agent)
94            .field("our_services", &self.our_services)
95            .field("relay", &self.relay)
96            .field("minimum_peer_version", &self.minimum_peer_version)
97            .field("parent_span", &self.parent_span)
98            .finish()
99    }
100}
101
102impl<S, C> Clone for Handshake<S, C>
103where
104    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
105    S::Future: Send,
106    C: ChainTip + Clone + Send + 'static,
107{
108    fn clone(&self) -> Self {
109        Self {
110            config: self.config.clone(),
111            user_agent: self.user_agent.clone(),
112            our_services: self.our_services,
113            relay: self.relay,
114            inbound_service: self.inbound_service.clone(),
115            address_book_updater: self.address_book_updater.clone(),
116            inv_collector: self.inv_collector.clone(),
117            minimum_peer_version: self.minimum_peer_version.clone(),
118            nonces: self.nonces.clone(),
119            parent_span: self.parent_span.clone(),
120        }
121    }
122}
123
124/// The metadata for a peer connection.
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub struct ConnectionInfo {
127    /// The connected peer address, if known.
128    /// This address might not be valid for outbound connections.
129    ///
130    /// Peers can be connected via a transient inbound or proxy address,
131    /// which will appear as the connected address to the OS and Zebra.
132    pub connected_addr: ConnectedAddr,
133
134    /// The network protocol [`VersionMessage`] sent by the remote peer.
135    pub remote: VersionMessage,
136
137    /// The network protocol version negotiated with the remote peer.
138    ///
139    /// Derived from `remote.version` and the
140    /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
141    pub negotiated_version: Version,
142}
143
144/// The peer address that we are handshaking with.
145///
146/// Typically, we can rely on outbound addresses, but inbound addresses don't
147/// give us enough information to reconnect to that peer.
148#[derive(Copy, Clone, PartialEq, Eq)]
149pub enum ConnectedAddr {
150    /// The address we used to make a direct outbound connection.
151    ///
152    /// In an honest network, a Zcash peer is listening on this exact address
153    /// and port.
154    OutboundDirect {
155        /// The connected outbound remote address and port.
156        addr: PeerSocketAddr,
157    },
158
159    /// The address we received from the OS, when a remote peer directly
160    /// connected to our Zcash listener port.
161    ///
162    /// In an honest network, a Zcash peer might be listening on this address,
163    /// if its outbound address is the same as its listener address. But the port
164    /// is an ephemeral outbound TCP port, not a listener port.
165    InboundDirect {
166        /// The connected inbound remote address and ephemeral port.
167        ///
168        /// The IP address might be the address of a Zcash peer, but the port is an ephemeral port.
169        addr: PeerSocketAddr,
170    },
171
172    /// The proxy address we used to make an outbound connection.
173    ///
174    /// The proxy address can be used by many connections, but our own ephemeral
175    /// outbound address and port can be used as an identifier for the duration
176    /// of this connection.
177    OutboundProxy {
178        /// The remote address and port of the proxy.
179        proxy_addr: SocketAddr,
180
181        /// The local address and transient port we used to connect to the proxy.
182        transient_local_addr: SocketAddr,
183    },
184
185    /// The address we received from the OS, when a remote peer connected via an
186    /// inbound proxy.
187    ///
188    /// The proxy's ephemeral outbound address can be used as an identifier for
189    /// the duration of this connection.
190    InboundProxy {
191        /// The local address and transient port we used to connect to the proxy.
192        transient_addr: SocketAddr,
193    },
194
195    /// An isolated connection, where we deliberately don't have any connection metadata.
196    Isolated,
197    //
198    // TODO: handle Tor onion addresses
199}
200
201/// Get an unspecified IPv4 address for `network`
202pub fn get_unspecified_ipv4_addr(network: Network) -> SocketAddr {
203    (Ipv4Addr::UNSPECIFIED, network.default_port()).into()
204}
205
206use ConnectedAddr::*;
207
208impl ConnectedAddr {
209    /// Returns a new outbound directly connected addr.
210    pub fn new_outbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
211        OutboundDirect { addr }
212    }
213
214    /// Returns a new inbound directly connected addr.
215    pub fn new_inbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
216        InboundDirect { addr }
217    }
218
219    /// Returns a new outbound connected addr via `proxy`.
220    ///
221    /// `local_addr` is the ephemeral local address of the connection.
222    #[allow(unused)]
223    pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
224        OutboundProxy {
225            proxy_addr: proxy,
226            transient_local_addr: local_addr,
227        }
228    }
229
230    /// Returns a new inbound connected addr from `proxy`.
231    //
232    // TODO: distinguish between direct listeners and proxy listeners in the
233    //       rest of zebra-network
234    #[allow(unused)]
235    pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
236        InboundProxy {
237            transient_addr: proxy,
238        }
239    }
240
241    /// Returns a new isolated connected addr, with no metadata.
242    pub fn new_isolated() -> ConnectedAddr {
243        Isolated
244    }
245
246    /// Returns a `PeerSocketAddr` that can be used to track this connection in the
247    /// `AddressBook`.
248    ///
249    /// `None` for inbound connections, proxy connections, and isolated
250    /// connections.
251    ///
252    /// # Correctness
253    ///
254    /// This address can be used for reconnection attempts, or as a permanent
255    /// identifier.
256    ///
257    /// # Security
258    ///
259    /// This address must not depend on the canonical address from the `Version`
260    /// message. Otherwise, malicious peers could interfere with other peers
261    /// `AddressBook` state.
262    ///
263    /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
264    pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
265        match self {
266            OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
267            // TODO: consider using the canonical address of the peer to track
268            //       outbound proxy connections
269            OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
270        }
271    }
272
273    /// Returns a `PeerSocketAddr` that can be used to temporarily identify a
274    /// connection.
275    ///
276    /// Isolated connections must not change Zebra's peer set or address book
277    /// state, so they do not have an identifier.
278    ///
279    /// # Correctness
280    ///
281    /// The returned address is only valid while the original connection is
282    /// open. It must not be used in the `AddressBook`, for outbound connection
283    /// attempts, or as a permanent identifier.
284    ///
285    /// # Security
286    ///
287    /// This address must not depend on the canonical address from the `Version`
288    /// message. Otherwise, malicious peers could interfere with other peers'
289    /// `PeerSet` state.
290    pub fn get_transient_addr(&self) -> Option<PeerSocketAddr> {
291        match self {
292            OutboundDirect { addr } => Some(*addr),
293            InboundDirect { addr } => Some(*addr),
294            OutboundProxy {
295                transient_local_addr,
296                ..
297            } => Some(PeerSocketAddr::from(*transient_local_addr)),
298            InboundProxy { transient_addr } => Some(PeerSocketAddr::from(*transient_addr)),
299            Isolated => None,
300        }
301    }
302
303    /// Returns the metrics label for this connection's address.
304    pub fn get_transient_addr_label(&self) -> String {
305        self.get_transient_addr()
306            .map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
307    }
308
309    /// Returns a short label for the kind of connection.
310    pub fn get_short_kind_label(&self) -> &'static str {
311        match self {
312            OutboundDirect { .. } => "Out",
313            InboundDirect { .. } => "In",
314            OutboundProxy { .. } => "ProxOut",
315            InboundProxy { .. } => "ProxIn",
316            Isolated => "Isol",
317        }
318    }
319
320    /// Returns a list of alternate remote peer addresses, which can be used for
321    /// reconnection attempts.
322    ///
323    /// Uses the connected address, and the remote canonical address.
324    ///
325    /// Skips duplicates. If this is an outbound connection, also skips the
326    /// remote address that we're currently connected to.
327    pub fn get_alternate_addrs(
328        &self,
329        mut canonical_remote: PeerSocketAddr,
330    ) -> impl Iterator<Item = PeerSocketAddr> {
331        let addrs = match self {
332            OutboundDirect { addr } => {
333                // Fixup unspecified addresses and ports using known good data
334                if canonical_remote.ip().is_unspecified() {
335                    canonical_remote.set_ip(addr.ip());
336                }
337                if canonical_remote.port() == 0 {
338                    canonical_remote.set_port(addr.port());
339                }
340
341                // Try the canonical remote address, if it is different from the
342                // outbound address (which we already have in our address book)
343                if &canonical_remote != addr {
344                    vec![canonical_remote]
345                } else {
346                    // we didn't learn a new address from the handshake:
347                    // it's the same as the outbound address, which is already in our address book
348                    Vec::new()
349                }
350            }
351
352            InboundDirect { addr } => {
353                // Use the IP from the TCP connection, and the port the peer told us
354                let maybe_addr = SocketAddr::new(addr.ip(), canonical_remote.port()).into();
355
356                // Try both addresses, but remove one duplicate if they match
357                if canonical_remote != maybe_addr {
358                    vec![canonical_remote, maybe_addr]
359                } else {
360                    vec![canonical_remote]
361                }
362            }
363
364            // Proxy addresses can't be used for reconnection attempts, but we
365            // can try the canonical remote address
366            OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],
367
368            // Hide all metadata for isolated connections
369            Isolated => Vec::new(),
370        };
371
372        addrs.into_iter()
373    }
374
375    /// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
376    pub fn is_inbound(&self) -> bool {
377        matches!(self, InboundDirect { .. } | InboundProxy { .. })
378    }
379}
380
381impl fmt::Debug for ConnectedAddr {
382    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383        let kind = self.get_short_kind_label();
384        let addr = self.get_transient_addr_label();
385
386        if matches!(self, Isolated) {
387            f.write_str(kind)
388        } else {
389            f.debug_tuple(kind).field(&addr).finish()
390        }
391    }
392}
393
394/// A builder for `Handshake`.
395pub struct Builder<S, C = NoChainTip>
396where
397    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
398    S::Future: Send,
399    C: ChainTip + Clone + Send + 'static,
400{
401    config: Option<Config>,
402    our_services: Option<PeerServices>,
403    user_agent: Option<String>,
404    relay: Option<bool>,
405
406    inbound_service: Option<S>,
407    address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
408    inv_collector: Option<broadcast::Sender<InventoryChange>>,
409    latest_chain_tip: C,
410}
411
412impl<S, C> Builder<S, C>
413where
414    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
415    S::Future: Send,
416    C: ChainTip + Clone + Send + 'static,
417{
418    /// Provide a config.  Mandatory.
419    pub fn with_config(mut self, config: Config) -> Self {
420        self.config = Some(config);
421        self
422    }
423
424    /// Provide a service to handle inbound requests. Mandatory.
425    pub fn with_inbound_service(mut self, inbound_service: S) -> Self {
426        self.inbound_service = Some(inbound_service);
427        self
428    }
429
430    /// Provide a channel for registering inventory advertisements. Optional.
431    ///
432    /// This channel takes transient remote addresses, which the `PeerSet` uses
433    /// to look up peers that have specific inventory.
434    pub fn with_inventory_collector(
435        mut self,
436        inv_collector: broadcast::Sender<InventoryChange>,
437    ) -> Self {
438        self.inv_collector = Some(inv_collector);
439        self
440    }
441
442    /// Provide a hook for timestamp collection. Optional.
443    ///
444    /// This channel takes `MetaAddr`s, permanent addresses which can be used to
445    /// make outbound connections to peers.
446    pub fn with_address_book_updater(
447        mut self,
448        address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
449    ) -> Self {
450        self.address_book_updater = Some(address_book_updater);
451        self
452    }
453
454    /// Provide the services this node advertises to other peers.  Optional.
455    ///
456    /// If this is unset, the node will advertise itself as a client.
457    pub fn with_advertised_services(mut self, services: PeerServices) -> Self {
458        self.our_services = Some(services);
459        self
460    }
461
462    /// Provide this node's user agent.  Optional.
463    ///
464    /// This must be a valid BIP14 string.  If it is unset, the user-agent will be empty.
465    pub fn with_user_agent(mut self, user_agent: String) -> Self {
466        self.user_agent = Some(user_agent);
467        self
468    }
469
470    /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
471    ///
472    /// If this is unset, the minimum accepted protocol version for peer connections is kept
473    /// constant over network upgrade activations.
474    ///
475    /// Use [`NoChainTip`] to explicitly provide no chain tip.
476    pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
477    where
478        NewC: ChainTip + Clone + Send + 'static,
479    {
480        Builder {
481            latest_chain_tip,
482
483            // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
484            config: self.config,
485            inbound_service: self.inbound_service,
486            address_book_updater: self.address_book_updater,
487            our_services: self.our_services,
488            user_agent: self.user_agent,
489            relay: self.relay,
490            inv_collector: self.inv_collector,
491        }
492    }
493
494    /// Whether to request that peers relay transactions to our node.  Optional.
495    ///
496    /// If this is unset, the node will not request transactions.
497    pub fn want_transactions(mut self, relay: bool) -> Self {
498        self.relay = Some(relay);
499        self
500    }
501
502    /// Consume this builder and produce a [`Handshake`].
503    ///
504    /// Returns an error only if any mandatory field was unset.
505    pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
506        let config = self.config.ok_or("did not specify config")?;
507        let inbound_service = self
508            .inbound_service
509            .ok_or("did not specify inbound service")?;
510        let inv_collector = self.inv_collector.unwrap_or_else(|| {
511            let (tx, _) = broadcast::channel(100);
512            tx
513        });
514        let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
515            // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
516            // channel. Dropping the receiver means sends will fail, but we don't care.
517            let (tx, _rx) = tokio::sync::mpsc::channel(1);
518            tx
519        });
520        let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
521        let user_agent = self.user_agent.unwrap_or_default();
522        let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
523        let relay = self.relay.unwrap_or(false);
524        let network = config.network.clone();
525        let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, &network);
526
527        Ok(Handshake {
528            config,
529            user_agent,
530            our_services,
531            relay,
532            inbound_service,
533            address_book_updater,
534            inv_collector,
535            minimum_peer_version,
536            nonces,
537            parent_span: Span::current(),
538        })
539    }
540}
541
542impl<S> Handshake<S, NoChainTip>
543where
544    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
545    S::Future: Send,
546{
547    /// Create a builder that configures a [`Handshake`] service.
548    pub fn builder() -> Builder<S, NoChainTip> {
549        // We don't derive `Default` because the derive inserts a `where S:
550        // Default` bound even though `Option<S>` implements `Default` even if
551        // `S` does not.
552        Builder {
553            config: None,
554            our_services: None,
555            user_agent: None,
556            relay: None,
557            inbound_service: None,
558            address_book_updater: None,
559            inv_collector: None,
560            latest_chain_tip: NoChainTip,
561        }
562    }
563}
564
565/// Negotiate the Zcash network protocol version with the remote peer at `connected_addr`, using
566/// the connection `peer_conn`.
567///
568/// We split `Handshake` into its components before calling this function, to avoid infectious
569/// `Sync` bounds on the returned future.
570///
571/// Returns the [`VersionMessage`] sent by the remote peer, and the [`Version`] negotiated with the
572/// remote peer, inside a [`ConnectionInfo`] struct.
573#[allow(clippy::too_many_arguments)]
574pub async fn negotiate_version<PeerTransport>(
575    peer_conn: &mut Framed<PeerTransport, Codec>,
576    connected_addr: &ConnectedAddr,
577    config: Config,
578    nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
579    user_agent: String,
580    our_services: PeerServices,
581    relay: bool,
582    mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
583) -> Result<Arc<ConnectionInfo>, HandshakeError>
584where
585    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
586{
587    // Create a random nonce for this connection
588    let local_nonce = Nonce::default();
589
590    // Insert the nonce for this handshake into the shared nonce set.
591    // Each connection has its own connection state, and handshakes execute concurrently.
592    //
593    // # Correctness
594    //
595    // It is ok to wait for the lock here, because handshakes have a short
596    // timeout, and the async mutex will be released when the task times
597    // out.
598    {
599        let mut locked_nonces = nonces.lock().await;
600
601        // Duplicate nonces are very rare, because they require a 64-bit random number collision,
602        // and the nonce set is limited to a few hundred entries.
603        let is_unique_nonce = locked_nonces.insert(local_nonce);
604        if !is_unique_nonce {
605            return Err(HandshakeError::LocalDuplicateNonce);
606        }
607
608        // # Security
609        //
610        // Limit the amount of memory used for nonces.
611        // Nonces can be left in the set if the connection fails or times out between
612        // the nonce being inserted, and it being removed.
613        //
614        // Zebra has strict connection limits, so we limit the number of nonces to
615        // the configured connection limit.
616        // This is a tradeoff between:
617        // - avoiding memory denial of service attacks which make large numbers of connections,
618        //   for example, 100 failed inbound connections takes 1 second.
619        // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
620        // - collision probability: two hundred 64-bit nonces have a very low collision probability
621        //   <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
622        while locked_nonces.len() > config.peerset_total_connection_limit() {
623            locked_nonces.shift_remove_index(0);
624        }
625
626        std::mem::drop(locked_nonces);
627    }
628
629    // Don't leak our exact clock skew to our peers. On the other hand,
630    // we can't deviate too much, or zcashd will get confused.
631    // Inspection of the zcashd source code reveals that the timestamp
632    // is only ever used at the end of parsing the version message, in
633    //
634    // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
635    //
636    // AddTimeData is defined in src/timedata.cpp and is a no-op as long
637    // as the difference between the specified timestamp and the
638    // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
639    // to 10 * 60 seconds (10 minutes).
640    //
641    // nTimeOffset is peer metadata that is never used, except for
642    // statistics.
643    //
644    // To try to stay within the range where zcashd will ignore our clock skew,
645    // truncate the timestamp to the nearest 5 minutes.
646    let now = Utc::now().timestamp();
647    let timestamp = Utc
648        .timestamp_opt(now - now.rem_euclid(5 * 60), 0)
649        .single()
650        .expect("in-range number of seconds and valid nanosecond");
651
652    let (their_addr, our_services, our_listen_addr) = match connected_addr {
653        // Version messages require an address, so we use
654        // an unspecified address for Isolated connections
655        Isolated => {
656            let unspec_ipv4 = get_unspecified_ipv4_addr(config.network);
657            (unspec_ipv4.into(), PeerServices::empty(), unspec_ipv4)
658        }
659        _ => {
660            let their_addr = connected_addr
661                .get_transient_addr()
662                .expect("non-Isolated connections have a remote addr");
663
664            // Include the configured external address in our version message, if any, otherwise, include our listen address.
665            let advertise_addr = match config.external_addr {
666                Some(external_addr) => {
667                    info!(?their_addr, ?config.listen_addr, "using external address for Version messages");
668                    external_addr
669                }
670                None => config.listen_addr,
671            };
672
673            (their_addr, our_services, advertise_addr)
674        }
675    };
676
677    let our_version = VersionMessage {
678        version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
679        services: our_services,
680        timestamp,
681        address_recv: AddrInVersion::new(their_addr, PeerServices::NODE_NETWORK),
682        // TODO: detect external address (#1893)
683        address_from: AddrInVersion::new(our_listen_addr, our_services),
684        nonce: local_nonce,
685        user_agent: user_agent.clone(),
686        start_height: minimum_peer_version.chain_tip_height(),
687        relay,
688    }
689    .into();
690
691    debug!(?our_version, "sending initial version message");
692    peer_conn.send(our_version).await?;
693
694    let mut remote_msg = peer_conn
695        .next()
696        .await
697        .ok_or(HandshakeError::ConnectionClosed)??;
698
699    // Wait for next message if the one we got is not Version
700    let remote: VersionMessage = loop {
701        match remote_msg {
702            Message::Version(version_message) => {
703                debug!(?version_message, "got version message from remote peer");
704                break version_message;
705            }
706            _ => {
707                remote_msg = peer_conn
708                    .next()
709                    .await
710                    .ok_or(HandshakeError::ConnectionClosed)??;
711                debug!(?remote_msg, "ignoring non-version message from remote peer");
712            }
713        }
714    };
715
716    let remote_address_services = remote.address_from.untrusted_services();
717    if remote_address_services != remote.services {
718        info!(
719            ?remote.services,
720            ?remote_address_services,
721            ?remote.user_agent,
722            "peer with inconsistent version services and version address services",
723        );
724    }
725
726    // Check for nonce reuse, indicating self-connection
727    //
728    // # Correctness
729    //
730    // We must wait for the lock before we continue with the connection, to avoid
731    // self-connection. If the connection times out, the async lock will be
732    // released.
733    //
734    // # Security
735    //
736    // We don't remove the nonce here, because peers that observe our network traffic could
737    // maliciously remove nonces, and force us to make self-connections.
738    let nonce_reuse = nonces.lock().await.contains(&remote.nonce);
739    if nonce_reuse {
740        info!(?connected_addr, "rejecting self-connection attempt");
741        Err(HandshakeError::RemoteNonceReuse)?;
742    }
743
744    // # Security
745    //
746    // Reject connections to peers on old versions, because they might not know about all
747    // network upgrades and could lead to chain forks or slower block propagation.
748    let min_version = minimum_peer_version.current();
749
750    if remote.version < min_version {
751        debug!(
752            remote_ip = ?their_addr,
753            ?remote.version,
754            ?min_version,
755            ?remote.user_agent,
756            "disconnecting from peer with obsolete network protocol version",
757        );
758
759        // the value is the number of rejected handshakes, by peer IP and protocol version
760        metrics::counter!(
761            "zcash.net.peers.obsolete",
762            "remote_ip" => their_addr.to_string(),
763            "remote_version" => remote.version.to_string(),
764            "min_version" => min_version.to_string(),
765            "user_agent" => remote.user_agent.clone(),
766        )
767        .increment(1);
768
769        // the value is the remote version of the most recent rejected handshake from each peer
770        metrics::gauge!(
771            "zcash.net.peers.version.obsolete",
772            "remote_ip" => their_addr.to_string(),
773        )
774        .set(remote.version.0 as f64);
775
776        // Disconnect if peer is using an obsolete version.
777        return Err(HandshakeError::ObsoleteVersion(remote.version));
778    }
779
780    let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
781
782    // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
783    let connection_info = Arc::new(ConnectionInfo {
784        connected_addr: *connected_addr,
785        remote,
786        negotiated_version,
787    });
788
789    debug!(
790        remote_ip = ?their_addr,
791        ?connection_info.remote.version,
792        ?negotiated_version,
793        ?min_version,
794        ?connection_info.remote.user_agent,
795        "negotiated network protocol version with peer",
796    );
797
798    // the value is the number of connected handshakes, by peer IP and protocol version
799    metrics::counter!(
800        "zcash.net.peers.connected",
801        "remote_ip" => their_addr.to_string(),
802        "remote_version" => connection_info.remote.version.to_string(),
803        "negotiated_version" => negotiated_version.to_string(),
804        "min_version" => min_version.to_string(),
805        "user_agent" => connection_info.remote.user_agent.clone(),
806    )
807    .increment(1);
808
809    // the value is the remote version of the most recent connected handshake from each peer
810    metrics::gauge!(
811        "zcash.net.peers.version.connected",
812        "remote_ip" => their_addr.to_string(),
813    )
814    .set(connection_info.remote.version.0 as f64);
815
816    peer_conn.send(Message::Verack).await?;
817
818    let mut remote_msg = peer_conn
819        .next()
820        .await
821        .ok_or(HandshakeError::ConnectionClosed)??;
822
823    // Wait for next message if the one we got is not Verack
824    loop {
825        match remote_msg {
826            Message::Verack => {
827                debug!(?remote_msg, "got verack message from remote peer");
828                break;
829            }
830            _ => {
831                remote_msg = peer_conn
832                    .next()
833                    .await
834                    .ok_or(HandshakeError::ConnectionClosed)??;
835                debug!(?remote_msg, "ignoring non-verack message from remote peer");
836            }
837        }
838    }
839
840    Ok(connection_info)
841}
842
843/// A handshake request.
844/// Contains the information needed to handshake with the peer.
845pub struct HandshakeRequest<PeerTransport>
846where
847    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
848{
849    /// The tokio [`TcpStream`](tokio::net::TcpStream) or Tor
850    /// `arti_client::DataStream` to the peer.
851    // Use [`arti_client::DataStream`] when #5492 is done.
852    pub data_stream: PeerTransport,
853
854    /// The address of the peer, and other related information.
855    pub connected_addr: ConnectedAddr,
856
857    /// A connection tracker that reduces the open connection count when dropped.
858    ///
859    /// Used to limit the number of open connections in Zebra.
860    pub connection_tracker: ConnectionTracker,
861}
862
863impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
864where
865    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
866    S::Future: Send,
867    C: ChainTip + Clone + Send + 'static,
868    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
869{
870    type Response = Client;
871    type Error = BoxError;
872    type Future =
873        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
874
875    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
876        Poll::Ready(Ok(()))
877    }
878
879    fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
880        let HandshakeRequest {
881            data_stream,
882            connected_addr,
883            connection_tracker,
884        } = req;
885
886        let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
887        // set the peer connection span's parent to the global span, as it
888        // should exist independently of its creation source (inbound
889        // connection, crawler, initial peer, ...)
890        let connection_span =
891            span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
892
893        // Clone these upfront, so they can be moved into the future.
894        let nonces = self.nonces.clone();
895        let inbound_service = self.inbound_service.clone();
896        let address_book_updater = self.address_book_updater.clone();
897        let inv_collector = self.inv_collector.clone();
898        let config = self.config.clone();
899        let user_agent = self.user_agent.clone();
900        let our_services = self.our_services;
901        let relay = self.relay;
902        let minimum_peer_version = self.minimum_peer_version.clone();
903
904        // # Security
905        //
906        // `zebra_network::init()` implements a connection timeout on this future.
907        // Any code outside this future does not have a timeout.
908        let fut = async move {
909            debug!(
910                addr = ?connected_addr,
911                "negotiating protocol version with remote peer"
912            );
913
914            // Start timing the handshake for metrics
915            let handshake_start = Instant::now();
916
917            let mut peer_conn = Framed::new(
918                data_stream,
919                Codec::builder()
920                    .for_network(&config.network)
921                    .with_metrics_addr_label(connected_addr.get_transient_addr_label())
922                    .finish(),
923            );
924
925            let connection_info = match negotiate_version(
926                &mut peer_conn,
927                &connected_addr,
928                config,
929                nonces,
930                user_agent,
931                our_services,
932                relay,
933                minimum_peer_version,
934            )
935            .await
936            {
937                Ok(info) => {
938                    // Record successful handshake duration
939                    let duration = handshake_start.elapsed().as_secs_f64();
940                    metrics::histogram!(
941                        "zcash.net.peer.handshake.duration_seconds",
942                        "result" => "success"
943                    )
944                    .record(duration);
945                    info
946                }
947                Err(err) => {
948                    // Record failed handshake duration and failure reason
949                    let duration = handshake_start.elapsed().as_secs_f64();
950                    let reason = match &err {
951                        HandshakeError::UnexpectedMessage(_) => "unexpected_message",
952                        HandshakeError::RemoteNonceReuse => "nonce_reuse",
953                        HandshakeError::LocalDuplicateNonce => "duplicate_nonce",
954                        HandshakeError::ConnectionClosed => "connection_closed",
955                        HandshakeError::Io(_) => "io_error",
956                        HandshakeError::Serialization(_) => "serialization",
957                        HandshakeError::ObsoleteVersion(_) => "obsolete_version",
958                        HandshakeError::Timeout => "timeout",
959                    };
960                    metrics::histogram!(
961                        "zcash.net.peer.handshake.duration_seconds",
962                        "result" => "failure"
963                    )
964                    .record(duration);
965                    metrics::counter!(
966                        "zcash.net.peer.handshake.failures.total",
967                        "reason" => reason
968                    )
969                    .increment(1);
970                    return Err(err);
971                }
972            };
973
974            let remote_services = connection_info.remote.services;
975
976            // The handshake succeeded: update the peer status from AttemptPending to Responded,
977            // and send initial connection info.
978            if let Some(book_addr) = connected_addr.get_address_book_addr() {
979                // the collector doesn't depend on network activity,
980                // so this await should not hang
981                let _ = address_book_updater
982                    .send(MetaAddr::new_connected(
983                        book_addr,
984                        &remote_services,
985                        connected_addr.is_inbound(),
986                    ))
987                    .await;
988            }
989
990            // Reconfigure the codec to use the negotiated version.
991            //
992            // TODO: The tokio documentation says not to do this while any frames are still being processed.
993            // Since we don't know that here, another way might be to release the tcp
994            // stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
995            let bare_codec = peer_conn.codec_mut();
996            bare_codec.reconfigure_version(connection_info.negotiated_version);
997
998            debug!("constructing client, spawning server");
999
1000            // These channels communicate between the inbound and outbound halves of the connection,
1001            // and between the different connection tasks. We create separate tasks and channels
1002            // for each new connection.
1003            let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
1004            let (shutdown_tx, shutdown_rx) = oneshot::channel();
1005            let error_slot = ErrorSlot::default();
1006
1007            let (peer_tx, peer_rx) = peer_conn.split();
1008
1009            // Instrument the peer's rx and tx streams.
1010
1011            let inner_conn_span = connection_span.clone();
1012            let peer_tx = peer_tx.with(move |msg: Message| {
1013                let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
1014                // Add a metric for outbound messages.
1015                metrics::counter!(
1016                    "zcash.net.out.messages",
1017                    "command" => msg.command(),
1018                    "addr" => connected_addr.get_transient_addr_label(),
1019                )
1020                .increment(1);
1021                // We need to use future::ready rather than an async block here,
1022                // because we need the sink to be Unpin, and the With<Fut, ...>
1023                // returned by .with is Unpin only if Fut is Unpin, and the
1024                // futures generated by async blocks are not Unpin.
1025                future::ready(Ok(msg)).instrument(span)
1026            });
1027
1028            // CORRECTNESS
1029            //
1030            // Ping/Pong messages and every error must update the peer address state via
1031            // the inbound_ts_collector.
1032            //
1033            // The heartbeat task sends regular Ping/Pong messages,
1034            // and it ends the connection if the heartbeat times out.
1035            // So we can just track peer activity based on Ping and Pong.
1036            // (This significantly improves performance, by reducing time system calls.)
1037            let inbound_ts_collector = address_book_updater.clone();
1038            let inbound_inv_collector = inv_collector.clone();
1039            let ts_inner_conn_span = connection_span.clone();
1040            let inv_inner_conn_span = connection_span.clone();
1041            let peer_rx = peer_rx
1042                .then(move |msg| {
1043                    // Add a metric for inbound messages and errors.
1044                    // Fire a timestamp or failure event.
1045                    let inbound_ts_collector = inbound_ts_collector.clone();
1046                    let span =
1047                        debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
1048
1049                    async move {
1050                        match &msg {
1051                            Ok(msg) => {
1052                                metrics::counter!(
1053                                    "zcash.net.in.messages",
1054                                    "command" => msg.command(),
1055                                    "addr" => connected_addr.get_transient_addr_label(),
1056                                )
1057                                .increment(1);
1058
1059                                // # Security
1060                                //
1061                                // Peer messages are not rate-limited, so we can't send anything
1062                                // to a shared channel or do anything expensive here.
1063                            }
1064                            Err(err) => {
1065                                metrics::counter!(
1066                                    "zebra.net.in.errors",
1067                                    "error" => err.to_string(),
1068                                    "addr" => connected_addr.get_transient_addr_label(),
1069                                )
1070                                .increment(1);
1071
1072                                // # Security
1073                                //
1074                                // Peer errors are rate-limited because:
1075                                // - opening connections is rate-limited
1076                                // - the number of connections is limited
1077                                // - after the first error, the peer is disconnected
1078                                if let Some(book_addr) = connected_addr.get_address_book_addr() {
1079                                    let _ = inbound_ts_collector
1080                                        .send(MetaAddr::new_errored(book_addr, remote_services))
1081                                        .await;
1082                                }
1083                            }
1084                        }
1085                        msg
1086                    }
1087                    .instrument(span)
1088                })
1089                .then(move |msg| {
1090                    let inbound_inv_collector = inbound_inv_collector.clone();
1091                    let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
1092                    register_inventory_status(msg, connected_addr, inbound_inv_collector)
1093                        .instrument(span)
1094                })
1095                .boxed();
1096
1097            // If we've learned potential peer addresses from the inbound connection remote address
1098            // or the handshake version message, add those addresses to the peer cache for this
1099            // peer.
1100            //
1101            // # Security
1102            //
1103            // We can't add these alternate addresses directly to the address book. If we did,
1104            // malicious peers could interfere with the address book state of other peers by
1105            // providing their addresses in `Version` messages. Or they could fill the address book
1106            // with fake addresses.
1107            //
1108            // These peer addresses are rate-limited because:
1109            // - opening connections is rate-limited
1110            // - these addresses are put in the peer address cache
1111            // - the peer address cache is only used when Zebra requests addresses from that peer
1112            let remote_canonical_addr = connection_info.remote.address_from.addr();
1113            let alternate_addrs = connected_addr
1114                .get_alternate_addrs(remote_canonical_addr)
1115                .map(|addr| {
1116                    // Assume the connecting node is a server node, and it's available now.
1117                    MetaAddr::new_gossiped_meta_addr(
1118                        addr,
1119                        PeerServices::NODE_NETWORK,
1120                        DateTime32::now(),
1121                    )
1122                });
1123
1124            let server = Connection::new(
1125                inbound_service,
1126                server_rx,
1127                error_slot.clone(),
1128                peer_tx,
1129                connection_tracker,
1130                connection_info.clone(),
1131                alternate_addrs.collect(),
1132            );
1133
1134            let connection_task = tokio::spawn(
1135                server
1136                    .run(peer_rx)
1137                    .instrument(connection_span.clone())
1138                    .boxed(),
1139            );
1140
1141            let heartbeat_task = tokio::spawn(
1142                send_periodic_heartbeats_with_shutdown_handle(
1143                    connected_addr,
1144                    shutdown_rx,
1145                    server_tx.clone(),
1146                    address_book_updater.clone(),
1147                )
1148                .instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
1149                .boxed(),
1150            );
1151
1152            let client = Client {
1153                connection_info,
1154                shutdown_tx: Some(shutdown_tx),
1155                server_tx,
1156                inv_collector,
1157                error_slot,
1158                connection_task,
1159                heartbeat_task,
1160            };
1161
1162            Ok(client)
1163        };
1164
1165        // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
1166        let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
1167
1168        // Spawn a new task to drive this handshake, forwarding panics to the calling task.
1169        tokio::spawn(fut.instrument(negotiator_span))
1170            .map(
1171                |join_result: Result<
1172                    Result<Result<Client, HandshakeError>, error::Elapsed>,
1173                    JoinError,
1174                >| {
1175                    match join_result {
1176                        Ok(Ok(Ok(connection_client))) => Ok(connection_client),
1177                        Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
1178                        Ok(Err(timeout_error)) => Err(timeout_error.into()),
1179                        Err(join_error) => match join_error.try_into_panic() {
1180                            // Forward panics to the calling task
1181                            Ok(panic_reason) => panic::resume_unwind(panic_reason),
1182                            Err(join_error) => Err(join_error.into()),
1183                        },
1184                    }
1185                },
1186            )
1187            .boxed()
1188    }
1189}
1190
1191/// Register any advertised or missing inventory in `msg` for `connected_addr`.
1192pub(crate) async fn register_inventory_status(
1193    msg: Result<Message, SerializationError>,
1194    connected_addr: ConnectedAddr,
1195    inv_collector: broadcast::Sender<InventoryChange>,
1196) -> Result<Message, SerializationError> {
1197    match (&msg, connected_addr.get_transient_addr()) {
1198        (Ok(Message::Inv(advertised)), Some(transient_addr)) => {
1199            // We ignore inventory messages with more than one
1200            // block, because they are most likely replies to a
1201            // query, rather than a newly gossiped block.
1202            //
1203            // (We process inventory messages with any number of
1204            // transactions.)
1205            //
1206            // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
1207            //
1208            // Note: zcashd has a bug where it merges queued inv messages of
1209            // the same or different types. Zebra compensates by sending `notfound`
1210            // responses to the inv collector. (#2156, #1768)
1211            //
1212            // (We can't split `inv`s, because that fills the inventory registry
1213            // with useless entries that the whole network has, making it large and slow.)
1214            match advertised.as_slice() {
1215                [advertised @ InventoryHash::Block(_)] => {
1216                    debug!(
1217                        ?advertised,
1218                        "registering gossiped advertised block inventory for peer"
1219                    );
1220
1221                    // The peer set and inv collector use the peer's remote
1222                    // address as an identifier
1223                    // If all receivers have been dropped, `send` returns an error.
1224                    // When that happens, Zebra is shutting down, so we want to ignore this error.
1225                    let _ = inv_collector
1226                        .send(InventoryChange::new_available(*advertised, transient_addr));
1227                }
1228                advertised => {
1229                    let advertised = advertised
1230                        .iter()
1231                        .filter(|advertised| advertised.unmined_tx_id().is_some());
1232
1233                    debug!(
1234                        ?advertised,
1235                        "registering advertised unmined transaction inventory for peer",
1236                    );
1237
1238                    if let Some(change) =
1239                        InventoryChange::new_available_multi(advertised, transient_addr)
1240                    {
1241                        // Ignore channel errors that should only happen during shutdown.
1242                        let _ = inv_collector.send(change);
1243                    }
1244                }
1245            }
1246        }
1247
1248        (Ok(Message::NotFound(missing)), Some(transient_addr)) => {
1249            // Ignore Errors and the unsupported FilteredBlock type
1250            let missing = missing.iter().filter(|missing| {
1251                missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
1252            });
1253
1254            debug!(?missing, "registering missing inventory for peer");
1255
1256            if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
1257                let _ = inv_collector.send(change);
1258            }
1259        }
1260        _ => {}
1261    }
1262
1263    msg
1264}
1265
1266/// Send periodical heartbeats to `server_tx`, and update the peer status through
1267/// `heartbeat_ts_collector`.
1268///
1269/// # Correctness
1270///
1271/// To prevent hangs:
1272/// - every await that depends on the network must have a timeout (or interval)
1273/// - every error/shutdown must update the address book state and return
1274///
1275/// The address book state can be updated via `ClientRequest.tx`, or the
1276/// heartbeat_ts_collector.
1277///
1278/// Returning from this function terminates the connection's heartbeat task.
1279async fn send_periodic_heartbeats_with_shutdown_handle(
1280    connected_addr: ConnectedAddr,
1281    shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
1282    server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1283    heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1284) -> Result<(), BoxError> {
1285    use futures::future::Either;
1286
1287    let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
1288        connected_addr,
1289        server_tx,
1290        heartbeat_ts_collector.clone(),
1291    );
1292
1293    pin_mut!(shutdown_rx);
1294    pin_mut!(heartbeat_run_loop);
1295
1296    // CORRECTNESS
1297    //
1298    // Currently, select prefers the first future if multiple
1299    // futures are ready.
1300    //
1301    // Starvation is impossible here, because interval has a
1302    // slow rate, and shutdown is a oneshot. If both futures
1303    // are ready, we want the shutdown to take priority over
1304    // sending a useless heartbeat.
1305    match future::select(shutdown_rx, heartbeat_run_loop).await {
1306        Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
1307            tracing::trace!("shutting down because Client requested shut down");
1308            handle_heartbeat_shutdown(
1309                PeerError::ClientCancelledHeartbeatTask,
1310                &heartbeat_ts_collector,
1311                &connected_addr,
1312            )
1313            .await
1314        }
1315        Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
1316            tracing::trace!("shutting down because Client was dropped");
1317            handle_heartbeat_shutdown(
1318                PeerError::ClientDropped,
1319                &heartbeat_ts_collector,
1320                &connected_addr,
1321            )
1322            .await
1323        }
1324        Either::Right((result, _unused_shutdown)) => {
1325            tracing::trace!("shutting down due to heartbeat failure");
1326            // heartbeat_timeout() already send an error on the timestamp collector channel
1327
1328            result
1329        }
1330    }
1331}
1332
1333/// Send periodical heartbeats to `server_tx`, and update the peer status through
1334/// `heartbeat_ts_collector`.
1335///
1336/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
1337async fn send_periodic_heartbeats_run_loop(
1338    connected_addr: ConnectedAddr,
1339    mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1340    heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1341) -> Result<(), BoxError> {
1342    // Don't send the first heartbeat immediately - we've just completed the handshake!
1343    let mut interval = tokio::time::interval_at(
1344        Instant::now() + constants::HEARTBEAT_INTERVAL,
1345        constants::HEARTBEAT_INTERVAL,
1346    );
1347    // If the heartbeat is delayed, also delay all future heartbeats.
1348    // (Shorter heartbeat intervals just add load, without any benefit.)
1349    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1350
1351    let mut interval_stream = IntervalStream::new(interval);
1352
1353    while let Some(_instant) = interval_stream.next().await {
1354        // We've reached another heartbeat interval without
1355        // shutting down, so do a heartbeat request.
1356        let ping_sent_at = Instant::now();
1357        if let Some(book_addr) = connected_addr.get_address_book_addr() {
1358            let _ = heartbeat_ts_collector
1359                .send(MetaAddr::new_ping_sent(book_addr, ping_sent_at.into()))
1360                .await;
1361        }
1362
1363        let heartbeat = send_one_heartbeat(&mut server_tx);
1364        let rtt = heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1365
1366        // # Security
1367        //
1368        // Peer heartbeats are rate-limited because:
1369        // - opening connections is rate-limited
1370        // - the number of connections is limited
1371        // - Zebra initiates each heartbeat using a timer
1372        if let Some(book_addr) = connected_addr.get_address_book_addr() {
1373            if let Some(rtt) = rtt {
1374                // the collector doesn't depend on network activity,
1375                // so this await should not hang
1376                let _ = heartbeat_ts_collector
1377                    .send(MetaAddr::new_responded(book_addr, Some(rtt)))
1378                    .await;
1379            }
1380        }
1381    }
1382
1383    unreachable!("unexpected IntervalStream termination")
1384}
1385
1386/// Send one heartbeat using `server_tx`.
1387async fn send_one_heartbeat(
1388    server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1389) -> Result<Response, BoxError> {
1390    // We just reached a heartbeat interval, so start sending
1391    // a heartbeat.
1392    let (tx, rx) = oneshot::channel();
1393
1394    // Try to send the heartbeat request
1395    let request = Request::Ping(Nonce::default());
1396    tracing::trace!(?request, "queueing heartbeat request");
1397    match server_tx.try_send(ClientRequest {
1398        request,
1399        tx,
1400        // we're not requesting inventory, so we don't need to update the registry
1401        inv_collector: None,
1402        transient_addr: None,
1403        span: tracing::Span::current(),
1404    }) {
1405        Ok(()) => {}
1406        Err(e) => {
1407            if e.is_disconnected() {
1408                Err(PeerError::ConnectionClosed)?;
1409            } else if e.is_full() {
1410                // Send the message when the Client becomes ready.
1411                // If sending takes too long, the heartbeat timeout will elapse
1412                // and close the connection, reducing our load to busy peers.
1413                server_tx.send(e.into_inner()).await?;
1414            } else {
1415                // we need to map unexpected error types to PeerErrors
1416                warn!(?e, "unexpected try_send error");
1417                Err(e)?;
1418            };
1419        }
1420    }
1421
1422    // Flush the heartbeat request from the queue
1423    server_tx.flush().await?;
1424    tracing::trace!("sent heartbeat request");
1425
1426    // Heartbeats are checked internally to the
1427    // connection logic, but we need to wait on the
1428    // response to avoid canceling the request.
1429    let response = rx.await??;
1430    tracing::trace!(?response, "got heartbeat response");
1431
1432    Ok(response)
1433}
1434
1435/// Wrap `fut` in a timeout, handing any inner or outer errors using
1436/// `handle_heartbeat_error`.
1437async fn heartbeat_timeout(
1438    fut: impl Future<Output = Result<Response, BoxError>>,
1439    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1440    connected_addr: &ConnectedAddr,
1441) -> Result<Option<Duration>, BoxError> {
1442    let response = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1443        Ok(inner_result) => {
1444            handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
1445        }
1446        Err(elapsed) => {
1447            handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
1448        }
1449    };
1450
1451    let rtt = match response {
1452        Response::Pong(rtt) => Some(rtt),
1453        _ => None,
1454    };
1455
1456    Ok(rtt)
1457}
1458
1459/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
1460async fn handle_heartbeat_error<T, E>(
1461    result: Result<T, E>,
1462    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1463    connected_addr: &ConnectedAddr,
1464) -> Result<T, E>
1465where
1466    E: std::fmt::Debug,
1467{
1468    match result {
1469        Ok(t) => Ok(t),
1470        Err(err) => {
1471            tracing::debug!(?err, "heartbeat error, shutting down");
1472
1473            // # Security
1474            //
1475            // Peer errors and shutdowns are rate-limited because:
1476            // - opening connections is rate-limited
1477            // - the number of connections is limited
1478            // - after the first error or shutdown, the peer is disconnected
1479            if let Some(book_addr) = connected_addr.get_address_book_addr() {
1480                let _ = address_book_updater
1481                    .send(MetaAddr::new_errored(book_addr, None))
1482                    .await;
1483            }
1484            Err(err)
1485        }
1486    }
1487}
1488
1489/// Mark `connected_addr` as shut down using `address_book_updater`.
1490async fn handle_heartbeat_shutdown(
1491    peer_error: PeerError,
1492    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1493    connected_addr: &ConnectedAddr,
1494) -> Result<(), BoxError> {
1495    tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
1496
1497    if let Some(book_addr) = connected_addr.get_address_book_addr() {
1498        let _ = address_book_updater
1499            .send(MetaAddr::new_shutdown(book_addr))
1500            .await;
1501    }
1502
1503    Err(peer_error.into())
1504}