zebra_network/peer/handshake.rs
1//! Initial [`Handshake`]s with Zebra peers over a `PeerTransport`.
2
3use std::{
4 cmp::min,
5 fmt,
6 future::Future,
7 net::{Ipv4Addr, SocketAddr},
8 panic,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Duration,
13};
14
15use chrono::{TimeZone, Utc};
16use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
17use indexmap::IndexSet;
18use tokio::{
19 io::{AsyncRead, AsyncWrite},
20 sync::broadcast,
21 task::JoinError,
22 time::{error, timeout, Instant},
23};
24use tokio_stream::wrappers::IntervalStream;
25use tokio_util::codec::Framed;
26use tower::Service;
27use tracing::{span, Level, Span};
28use tracing_futures::Instrument;
29
30use zebra_chain::{
31 chain_tip::{ChainTip, NoChainTip},
32 parameters::Network,
33 serialization::{DateTime32, SerializationError},
34};
35
36use crate::{
37 constants,
38 meta_addr::MetaAddrChange,
39 peer::{
40 CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
41 MinimumPeerVersion, PeerError,
42 },
43 peer_set::{ConnectionTracker, InventoryChange},
44 protocol::{
45 external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
46 internal::{Request, Response},
47 },
48 types::MetaAddr,
49 BoxError, Config, PeerSocketAddr, VersionMessage,
50};
51
52#[cfg(test)]
53mod tests;
54
55/// A [`Service`] that handshakes with a remote peer and constructs a
56/// client/server pair.
57///
58/// CORRECTNESS
59///
60/// To avoid hangs, each handshake (or its connector) should be:
61/// - launched in a separate task, and
62/// - wrapped in a timeout.
63pub struct Handshake<S, C = NoChainTip>
64where
65 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
66 S::Future: Send,
67 C: ChainTip + Clone + Send + 'static,
68{
69 config: Config,
70 user_agent: String,
71 our_services: PeerServices,
72 relay: bool,
73
74 inbound_service: S,
75 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
76 inv_collector: broadcast::Sender<InventoryChange>,
77 minimum_peer_version: MinimumPeerVersion<C>,
78 nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
79
80 parent_span: Span,
81}
82
83impl<S, C> fmt::Debug for Handshake<S, C>
84where
85 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
86 S::Future: Send,
87 C: ChainTip + Clone + Send + 'static,
88{
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 // skip the channels, they don't tell us anything useful
91 f.debug_struct(std::any::type_name::<Handshake<S, C>>())
92 .field("config", &self.config)
93 .field("user_agent", &self.user_agent)
94 .field("our_services", &self.our_services)
95 .field("relay", &self.relay)
96 .field("minimum_peer_version", &self.minimum_peer_version)
97 .field("parent_span", &self.parent_span)
98 .finish()
99 }
100}
101
102impl<S, C> Clone for Handshake<S, C>
103where
104 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
105 S::Future: Send,
106 C: ChainTip + Clone + Send + 'static,
107{
108 fn clone(&self) -> Self {
109 Self {
110 config: self.config.clone(),
111 user_agent: self.user_agent.clone(),
112 our_services: self.our_services,
113 relay: self.relay,
114 inbound_service: self.inbound_service.clone(),
115 address_book_updater: self.address_book_updater.clone(),
116 inv_collector: self.inv_collector.clone(),
117 minimum_peer_version: self.minimum_peer_version.clone(),
118 nonces: self.nonces.clone(),
119 parent_span: self.parent_span.clone(),
120 }
121 }
122}
123
124/// The metadata for a peer connection.
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub struct ConnectionInfo {
127 /// The connected peer address, if known.
128 /// This address might not be valid for outbound connections.
129 ///
130 /// Peers can be connected via a transient inbound or proxy address,
131 /// which will appear as the connected address to the OS and Zebra.
132 pub connected_addr: ConnectedAddr,
133
134 /// The network protocol [`VersionMessage`] sent by the remote peer.
135 pub remote: VersionMessage,
136
137 /// The network protocol version negotiated with the remote peer.
138 ///
139 /// Derived from `remote.version` and the
140 /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
141 pub negotiated_version: Version,
142}
143
144/// The peer address that we are handshaking with.
145///
146/// Typically, we can rely on outbound addresses, but inbound addresses don't
147/// give us enough information to reconnect to that peer.
148#[derive(Copy, Clone, PartialEq, Eq)]
149pub enum ConnectedAddr {
150 /// The address we used to make a direct outbound connection.
151 ///
152 /// In an honest network, a Zcash peer is listening on this exact address
153 /// and port.
154 OutboundDirect {
155 /// The connected outbound remote address and port.
156 addr: PeerSocketAddr,
157 },
158
159 /// The address we received from the OS, when a remote peer directly
160 /// connected to our Zcash listener port.
161 ///
162 /// In an honest network, a Zcash peer might be listening on this address,
163 /// if its outbound address is the same as its listener address. But the port
164 /// is an ephemeral outbound TCP port, not a listener port.
165 InboundDirect {
166 /// The connected inbound remote address and ephemeral port.
167 ///
168 /// The IP address might be the address of a Zcash peer, but the port is an ephemeral port.
169 addr: PeerSocketAddr,
170 },
171
172 /// The proxy address we used to make an outbound connection.
173 ///
174 /// The proxy address can be used by many connections, but our own ephemeral
175 /// outbound address and port can be used as an identifier for the duration
176 /// of this connection.
177 OutboundProxy {
178 /// The remote address and port of the proxy.
179 proxy_addr: SocketAddr,
180
181 /// The local address and transient port we used to connect to the proxy.
182 transient_local_addr: SocketAddr,
183 },
184
185 /// The address we received from the OS, when a remote peer connected via an
186 /// inbound proxy.
187 ///
188 /// The proxy's ephemeral outbound address can be used as an identifier for
189 /// the duration of this connection.
190 InboundProxy {
191 /// The local address and transient port we used to connect to the proxy.
192 transient_addr: SocketAddr,
193 },
194
195 /// An isolated connection, where we deliberately don't have any connection metadata.
196 Isolated,
197 //
198 // TODO: handle Tor onion addresses
199}
200
201/// Get an unspecified IPv4 address for `network`
202pub fn get_unspecified_ipv4_addr(network: Network) -> SocketAddr {
203 (Ipv4Addr::UNSPECIFIED, network.default_port()).into()
204}
205
206use ConnectedAddr::*;
207
208impl ConnectedAddr {
209 /// Returns a new outbound directly connected addr.
210 pub fn new_outbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
211 OutboundDirect { addr }
212 }
213
214 /// Returns a new inbound directly connected addr.
215 pub fn new_inbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
216 InboundDirect { addr }
217 }
218
219 /// Returns a new outbound connected addr via `proxy`.
220 ///
221 /// `local_addr` is the ephemeral local address of the connection.
222 #[allow(unused)]
223 pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
224 OutboundProxy {
225 proxy_addr: proxy,
226 transient_local_addr: local_addr,
227 }
228 }
229
230 /// Returns a new inbound connected addr from `proxy`.
231 //
232 // TODO: distinguish between direct listeners and proxy listeners in the
233 // rest of zebra-network
234 #[allow(unused)]
235 pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
236 InboundProxy {
237 transient_addr: proxy,
238 }
239 }
240
241 /// Returns a new isolated connected addr, with no metadata.
242 pub fn new_isolated() -> ConnectedAddr {
243 Isolated
244 }
245
246 /// Returns a `PeerSocketAddr` that can be used to track this connection in the
247 /// `AddressBook`.
248 ///
249 /// `None` for inbound connections, proxy connections, and isolated
250 /// connections.
251 ///
252 /// # Correctness
253 ///
254 /// This address can be used for reconnection attempts, or as a permanent
255 /// identifier.
256 ///
257 /// # Security
258 ///
259 /// This address must not depend on the canonical address from the `Version`
260 /// message. Otherwise, malicious peers could interfere with other peers
261 /// `AddressBook` state.
262 ///
263 /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
264 pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
265 match self {
266 OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
267 // TODO: consider using the canonical address of the peer to track
268 // outbound proxy connections
269 OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
270 }
271 }
272
273 /// Returns a `PeerSocketAddr` that can be used to temporarily identify a
274 /// connection.
275 ///
276 /// Isolated connections must not change Zebra's peer set or address book
277 /// state, so they do not have an identifier.
278 ///
279 /// # Correctness
280 ///
281 /// The returned address is only valid while the original connection is
282 /// open. It must not be used in the `AddressBook`, for outbound connection
283 /// attempts, or as a permanent identifier.
284 ///
285 /// # Security
286 ///
287 /// This address must not depend on the canonical address from the `Version`
288 /// message. Otherwise, malicious peers could interfere with other peers'
289 /// `PeerSet` state.
290 pub fn get_transient_addr(&self) -> Option<PeerSocketAddr> {
291 match self {
292 OutboundDirect { addr } => Some(*addr),
293 InboundDirect { addr } => Some(*addr),
294 OutboundProxy {
295 transient_local_addr,
296 ..
297 } => Some(PeerSocketAddr::from(*transient_local_addr)),
298 InboundProxy { transient_addr } => Some(PeerSocketAddr::from(*transient_addr)),
299 Isolated => None,
300 }
301 }
302
303 /// Returns the metrics label for this connection's address.
304 pub fn get_transient_addr_label(&self) -> String {
305 self.get_transient_addr()
306 .map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
307 }
308
309 /// Returns a short label for the kind of connection.
310 pub fn get_short_kind_label(&self) -> &'static str {
311 match self {
312 OutboundDirect { .. } => "Out",
313 InboundDirect { .. } => "In",
314 OutboundProxy { .. } => "ProxOut",
315 InboundProxy { .. } => "ProxIn",
316 Isolated => "Isol",
317 }
318 }
319
320 /// Returns a list of alternate remote peer addresses, which can be used for
321 /// reconnection attempts.
322 ///
323 /// Uses the connected address, and the remote canonical address.
324 ///
325 /// Skips duplicates. If this is an outbound connection, also skips the
326 /// remote address that we're currently connected to.
327 pub fn get_alternate_addrs(
328 &self,
329 mut canonical_remote: PeerSocketAddr,
330 ) -> impl Iterator<Item = PeerSocketAddr> {
331 let addrs = match self {
332 OutboundDirect { addr } => {
333 // Fixup unspecified addresses and ports using known good data
334 if canonical_remote.ip().is_unspecified() {
335 canonical_remote.set_ip(addr.ip());
336 }
337 if canonical_remote.port() == 0 {
338 canonical_remote.set_port(addr.port());
339 }
340
341 // Try the canonical remote address, if it is different from the
342 // outbound address (which we already have in our address book)
343 if &canonical_remote != addr {
344 vec![canonical_remote]
345 } else {
346 // we didn't learn a new address from the handshake:
347 // it's the same as the outbound address, which is already in our address book
348 Vec::new()
349 }
350 }
351
352 InboundDirect { addr } => {
353 // Use the IP from the TCP connection, and the port the peer told us
354 let maybe_addr = SocketAddr::new(addr.ip(), canonical_remote.port()).into();
355
356 // Try both addresses, but remove one duplicate if they match
357 if canonical_remote != maybe_addr {
358 vec![canonical_remote, maybe_addr]
359 } else {
360 vec![canonical_remote]
361 }
362 }
363
364 // Proxy addresses can't be used for reconnection attempts, but we
365 // can try the canonical remote address
366 OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],
367
368 // Hide all metadata for isolated connections
369 Isolated => Vec::new(),
370 };
371
372 addrs.into_iter()
373 }
374
375 /// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
376 pub fn is_inbound(&self) -> bool {
377 matches!(self, InboundDirect { .. } | InboundProxy { .. })
378 }
379}
380
381impl fmt::Debug for ConnectedAddr {
382 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383 let kind = self.get_short_kind_label();
384 let addr = self.get_transient_addr_label();
385
386 if matches!(self, Isolated) {
387 f.write_str(kind)
388 } else {
389 f.debug_tuple(kind).field(&addr).finish()
390 }
391 }
392}
393
394/// A builder for `Handshake`.
395pub struct Builder<S, C = NoChainTip>
396where
397 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
398 S::Future: Send,
399 C: ChainTip + Clone + Send + 'static,
400{
401 config: Option<Config>,
402 our_services: Option<PeerServices>,
403 user_agent: Option<String>,
404 relay: Option<bool>,
405
406 inbound_service: Option<S>,
407 address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
408 inv_collector: Option<broadcast::Sender<InventoryChange>>,
409 latest_chain_tip: C,
410}
411
412impl<S, C> Builder<S, C>
413where
414 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
415 S::Future: Send,
416 C: ChainTip + Clone + Send + 'static,
417{
418 /// Provide a config. Mandatory.
419 pub fn with_config(mut self, config: Config) -> Self {
420 self.config = Some(config);
421 self
422 }
423
424 /// Provide a service to handle inbound requests. Mandatory.
425 pub fn with_inbound_service(mut self, inbound_service: S) -> Self {
426 self.inbound_service = Some(inbound_service);
427 self
428 }
429
430 /// Provide a channel for registering inventory advertisements. Optional.
431 ///
432 /// This channel takes transient remote addresses, which the `PeerSet` uses
433 /// to look up peers that have specific inventory.
434 pub fn with_inventory_collector(
435 mut self,
436 inv_collector: broadcast::Sender<InventoryChange>,
437 ) -> Self {
438 self.inv_collector = Some(inv_collector);
439 self
440 }
441
442 /// Provide a hook for timestamp collection. Optional.
443 ///
444 /// This channel takes `MetaAddr`s, permanent addresses which can be used to
445 /// make outbound connections to peers.
446 pub fn with_address_book_updater(
447 mut self,
448 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
449 ) -> Self {
450 self.address_book_updater = Some(address_book_updater);
451 self
452 }
453
454 /// Provide the services this node advertises to other peers. Optional.
455 ///
456 /// If this is unset, the node will advertise itself as a client.
457 pub fn with_advertised_services(mut self, services: PeerServices) -> Self {
458 self.our_services = Some(services);
459 self
460 }
461
462 /// Provide this node's user agent. Optional.
463 ///
464 /// This must be a valid BIP14 string. If it is unset, the user-agent will be empty.
465 pub fn with_user_agent(mut self, user_agent: String) -> Self {
466 self.user_agent = Some(user_agent);
467 self
468 }
469
470 /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
471 ///
472 /// If this is unset, the minimum accepted protocol version for peer connections is kept
473 /// constant over network upgrade activations.
474 ///
475 /// Use [`NoChainTip`] to explicitly provide no chain tip.
476 pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
477 where
478 NewC: ChainTip + Clone + Send + 'static,
479 {
480 Builder {
481 latest_chain_tip,
482
483 // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
484 config: self.config,
485 inbound_service: self.inbound_service,
486 address_book_updater: self.address_book_updater,
487 our_services: self.our_services,
488 user_agent: self.user_agent,
489 relay: self.relay,
490 inv_collector: self.inv_collector,
491 }
492 }
493
494 /// Whether to request that peers relay transactions to our node. Optional.
495 ///
496 /// If this is unset, the node will not request transactions.
497 pub fn want_transactions(mut self, relay: bool) -> Self {
498 self.relay = Some(relay);
499 self
500 }
501
502 /// Consume this builder and produce a [`Handshake`].
503 ///
504 /// Returns an error only if any mandatory field was unset.
505 pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
506 let config = self.config.ok_or("did not specify config")?;
507 let inbound_service = self
508 .inbound_service
509 .ok_or("did not specify inbound service")?;
510 let inv_collector = self.inv_collector.unwrap_or_else(|| {
511 let (tx, _) = broadcast::channel(100);
512 tx
513 });
514 let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
515 // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
516 // channel. Dropping the receiver means sends will fail, but we don't care.
517 let (tx, _rx) = tokio::sync::mpsc::channel(1);
518 tx
519 });
520 let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
521 let user_agent = self.user_agent.unwrap_or_default();
522 let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
523 let relay = self.relay.unwrap_or(false);
524 let network = config.network.clone();
525 let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, &network);
526
527 Ok(Handshake {
528 config,
529 user_agent,
530 our_services,
531 relay,
532 inbound_service,
533 address_book_updater,
534 inv_collector,
535 minimum_peer_version,
536 nonces,
537 parent_span: Span::current(),
538 })
539 }
540}
541
542impl<S> Handshake<S, NoChainTip>
543where
544 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
545 S::Future: Send,
546{
547 /// Create a builder that configures a [`Handshake`] service.
548 pub fn builder() -> Builder<S, NoChainTip> {
549 // We don't derive `Default` because the derive inserts a `where S:
550 // Default` bound even though `Option<S>` implements `Default` even if
551 // `S` does not.
552 Builder {
553 config: None,
554 our_services: None,
555 user_agent: None,
556 relay: None,
557 inbound_service: None,
558 address_book_updater: None,
559 inv_collector: None,
560 latest_chain_tip: NoChainTip,
561 }
562 }
563}
564
565/// Negotiate the Zcash network protocol version with the remote peer at `connected_addr`, using
566/// the connection `peer_conn`.
567///
568/// We split `Handshake` into its components before calling this function, to avoid infectious
569/// `Sync` bounds on the returned future.
570///
571/// Returns the [`VersionMessage`] sent by the remote peer, and the [`Version`] negotiated with the
572/// remote peer, inside a [`ConnectionInfo`] struct.
573#[allow(clippy::too_many_arguments)]
574pub async fn negotiate_version<PeerTransport>(
575 peer_conn: &mut Framed<PeerTransport, Codec>,
576 connected_addr: &ConnectedAddr,
577 config: Config,
578 nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
579 user_agent: String,
580 our_services: PeerServices,
581 relay: bool,
582 mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
583) -> Result<Arc<ConnectionInfo>, HandshakeError>
584where
585 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
586{
587 // Create a random nonce for this connection
588 let local_nonce = Nonce::default();
589
590 // Insert the nonce for this handshake into the shared nonce set.
591 // Each connection has its own connection state, and handshakes execute concurrently.
592 //
593 // # Correctness
594 //
595 // It is ok to wait for the lock here, because handshakes have a short
596 // timeout, and the async mutex will be released when the task times
597 // out.
598 {
599 let mut locked_nonces = nonces.lock().await;
600
601 // Duplicate nonces are very rare, because they require a 64-bit random number collision,
602 // and the nonce set is limited to a few hundred entries.
603 let is_unique_nonce = locked_nonces.insert(local_nonce);
604 if !is_unique_nonce {
605 return Err(HandshakeError::LocalDuplicateNonce);
606 }
607
608 // # Security
609 //
610 // Limit the amount of memory used for nonces.
611 // Nonces can be left in the set if the connection fails or times out between
612 // the nonce being inserted, and it being removed.
613 //
614 // Zebra has strict connection limits, so we limit the number of nonces to
615 // the configured connection limit.
616 // This is a tradeoff between:
617 // - avoiding memory denial of service attacks which make large numbers of connections,
618 // for example, 100 failed inbound connections takes 1 second.
619 // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
620 // - collision probability: two hundred 64-bit nonces have a very low collision probability
621 // <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
622 while locked_nonces.len() > config.peerset_total_connection_limit() {
623 locked_nonces.shift_remove_index(0);
624 }
625
626 std::mem::drop(locked_nonces);
627 }
628
629 // Don't leak our exact clock skew to our peers. On the other hand,
630 // we can't deviate too much, or zcashd will get confused.
631 // Inspection of the zcashd source code reveals that the timestamp
632 // is only ever used at the end of parsing the version message, in
633 //
634 // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
635 //
636 // AddTimeData is defined in src/timedata.cpp and is a no-op as long
637 // as the difference between the specified timestamp and the
638 // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
639 // to 10 * 60 seconds (10 minutes).
640 //
641 // nTimeOffset is peer metadata that is never used, except for
642 // statistics.
643 //
644 // To try to stay within the range where zcashd will ignore our clock skew,
645 // truncate the timestamp to the nearest 5 minutes.
646 let now = Utc::now().timestamp();
647 let timestamp = Utc
648 .timestamp_opt(now - now.rem_euclid(5 * 60), 0)
649 .single()
650 .expect("in-range number of seconds and valid nanosecond");
651
652 let (their_addr, our_services, our_listen_addr) = match connected_addr {
653 // Version messages require an address, so we use
654 // an unspecified address for Isolated connections
655 Isolated => {
656 let unspec_ipv4 = get_unspecified_ipv4_addr(config.network);
657 (unspec_ipv4.into(), PeerServices::empty(), unspec_ipv4)
658 }
659 _ => {
660 let their_addr = connected_addr
661 .get_transient_addr()
662 .expect("non-Isolated connections have a remote addr");
663
664 // Include the configured external address in our version message, if any, otherwise, include our listen address.
665 let advertise_addr = match config.external_addr {
666 Some(external_addr) => {
667 info!(?their_addr, ?config.listen_addr, "using external address for Version messages");
668 external_addr
669 }
670 None => config.listen_addr,
671 };
672
673 (their_addr, our_services, advertise_addr)
674 }
675 };
676
677 let our_version = VersionMessage {
678 version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
679 services: our_services,
680 timestamp,
681 address_recv: AddrInVersion::new(their_addr, PeerServices::NODE_NETWORK),
682 // TODO: detect external address (#1893)
683 address_from: AddrInVersion::new(our_listen_addr, our_services),
684 nonce: local_nonce,
685 user_agent: user_agent.clone(),
686 start_height: minimum_peer_version.chain_tip_height(),
687 relay,
688 }
689 .into();
690
691 debug!(?our_version, "sending initial version message");
692 peer_conn.send(our_version).await?;
693
694 let mut remote_msg = peer_conn
695 .next()
696 .await
697 .ok_or(HandshakeError::ConnectionClosed)??;
698
699 // Wait for next message if the one we got is not Version
700 let remote: VersionMessage = loop {
701 match remote_msg {
702 Message::Version(version_message) => {
703 debug!(?version_message, "got version message from remote peer");
704 break version_message;
705 }
706 _ => {
707 remote_msg = peer_conn
708 .next()
709 .await
710 .ok_or(HandshakeError::ConnectionClosed)??;
711 debug!(?remote_msg, "ignoring non-version message from remote peer");
712 }
713 }
714 };
715
716 let remote_address_services = remote.address_from.untrusted_services();
717 if remote_address_services != remote.services {
718 info!(
719 ?remote.services,
720 ?remote_address_services,
721 ?remote.user_agent,
722 "peer with inconsistent version services and version address services",
723 );
724 }
725
726 // Check for nonce reuse, indicating self-connection
727 //
728 // # Correctness
729 //
730 // We must wait for the lock before we continue with the connection, to avoid
731 // self-connection. If the connection times out, the async lock will be
732 // released.
733 //
734 // # Security
735 //
736 // We don't remove the nonce here, because peers that observe our network traffic could
737 // maliciously remove nonces, and force us to make self-connections.
738 let nonce_reuse = nonces.lock().await.contains(&remote.nonce);
739 if nonce_reuse {
740 info!(?connected_addr, "rejecting self-connection attempt");
741 Err(HandshakeError::RemoteNonceReuse)?;
742 }
743
744 // # Security
745 //
746 // Reject connections to peers on old versions, because they might not know about all
747 // network upgrades and could lead to chain forks or slower block propagation.
748 let min_version = minimum_peer_version.current();
749
750 if remote.version < min_version {
751 debug!(
752 remote_ip = ?their_addr,
753 ?remote.version,
754 ?min_version,
755 ?remote.user_agent,
756 "disconnecting from peer with obsolete network protocol version",
757 );
758
759 // the value is the number of rejected handshakes, by peer IP and protocol version
760 metrics::counter!(
761 "zcash.net.peers.obsolete",
762 "remote_ip" => their_addr.to_string(),
763 "remote_version" => remote.version.to_string(),
764 "min_version" => min_version.to_string(),
765 "user_agent" => remote.user_agent.clone(),
766 )
767 .increment(1);
768
769 // the value is the remote version of the most recent rejected handshake from each peer
770 metrics::gauge!(
771 "zcash.net.peers.version.obsolete",
772 "remote_ip" => their_addr.to_string(),
773 )
774 .set(remote.version.0 as f64);
775
776 // Disconnect if peer is using an obsolete version.
777 return Err(HandshakeError::ObsoleteVersion(remote.version));
778 }
779
780 let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
781
782 // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
783 let connection_info = Arc::new(ConnectionInfo {
784 connected_addr: *connected_addr,
785 remote,
786 negotiated_version,
787 });
788
789 debug!(
790 remote_ip = ?their_addr,
791 ?connection_info.remote.version,
792 ?negotiated_version,
793 ?min_version,
794 ?connection_info.remote.user_agent,
795 "negotiated network protocol version with peer",
796 );
797
798 // the value is the number of connected handshakes, by peer IP and protocol version
799 metrics::counter!(
800 "zcash.net.peers.connected",
801 "remote_ip" => their_addr.to_string(),
802 "remote_version" => connection_info.remote.version.to_string(),
803 "negotiated_version" => negotiated_version.to_string(),
804 "min_version" => min_version.to_string(),
805 "user_agent" => connection_info.remote.user_agent.clone(),
806 )
807 .increment(1);
808
809 // the value is the remote version of the most recent connected handshake from each peer
810 metrics::gauge!(
811 "zcash.net.peers.version.connected",
812 "remote_ip" => their_addr.to_string(),
813 )
814 .set(connection_info.remote.version.0 as f64);
815
816 peer_conn.send(Message::Verack).await?;
817
818 let mut remote_msg = peer_conn
819 .next()
820 .await
821 .ok_or(HandshakeError::ConnectionClosed)??;
822
823 // Wait for next message if the one we got is not Verack
824 loop {
825 match remote_msg {
826 Message::Verack => {
827 debug!(?remote_msg, "got verack message from remote peer");
828 break;
829 }
830 _ => {
831 remote_msg = peer_conn
832 .next()
833 .await
834 .ok_or(HandshakeError::ConnectionClosed)??;
835 debug!(?remote_msg, "ignoring non-verack message from remote peer");
836 }
837 }
838 }
839
840 Ok(connection_info)
841}
842
843/// A handshake request.
844/// Contains the information needed to handshake with the peer.
845pub struct HandshakeRequest<PeerTransport>
846where
847 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
848{
849 /// The tokio [`TcpStream`](tokio::net::TcpStream) or Tor
850 /// `arti_client::DataStream` to the peer.
851 // Use [`arti_client::DataStream`] when #5492 is done.
852 pub data_stream: PeerTransport,
853
854 /// The address of the peer, and other related information.
855 pub connected_addr: ConnectedAddr,
856
857 /// A connection tracker that reduces the open connection count when dropped.
858 ///
859 /// Used to limit the number of open connections in Zebra.
860 pub connection_tracker: ConnectionTracker,
861}
862
863impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
864where
865 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
866 S::Future: Send,
867 C: ChainTip + Clone + Send + 'static,
868 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
869{
870 type Response = Client;
871 type Error = BoxError;
872 type Future =
873 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
874
875 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
876 Poll::Ready(Ok(()))
877 }
878
879 fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
880 let HandshakeRequest {
881 data_stream,
882 connected_addr,
883 connection_tracker,
884 } = req;
885
886 let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
887 // set the peer connection span's parent to the global span, as it
888 // should exist independently of its creation source (inbound
889 // connection, crawler, initial peer, ...)
890 let connection_span =
891 span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
892
893 // Clone these upfront, so they can be moved into the future.
894 let nonces = self.nonces.clone();
895 let inbound_service = self.inbound_service.clone();
896 let address_book_updater = self.address_book_updater.clone();
897 let inv_collector = self.inv_collector.clone();
898 let config = self.config.clone();
899 let user_agent = self.user_agent.clone();
900 let our_services = self.our_services;
901 let relay = self.relay;
902 let minimum_peer_version = self.minimum_peer_version.clone();
903
904 // # Security
905 //
906 // `zebra_network::init()` implements a connection timeout on this future.
907 // Any code outside this future does not have a timeout.
908 let fut = async move {
909 debug!(
910 addr = ?connected_addr,
911 "negotiating protocol version with remote peer"
912 );
913
914 let mut peer_conn = Framed::new(
915 data_stream,
916 Codec::builder()
917 .for_network(&config.network)
918 .with_metrics_addr_label(connected_addr.get_transient_addr_label())
919 .finish(),
920 );
921
922 let connection_info = negotiate_version(
923 &mut peer_conn,
924 &connected_addr,
925 config,
926 nonces,
927 user_agent,
928 our_services,
929 relay,
930 minimum_peer_version,
931 )
932 .await?;
933
934 let remote_services = connection_info.remote.services;
935
936 // The handshake succeeded: update the peer status from AttemptPending to Responded,
937 // and send initial connection info.
938 if let Some(book_addr) = connected_addr.get_address_book_addr() {
939 // the collector doesn't depend on network activity,
940 // so this await should not hang
941 let _ = address_book_updater
942 .send(MetaAddr::new_connected(
943 book_addr,
944 &remote_services,
945 connected_addr.is_inbound(),
946 ))
947 .await;
948 }
949
950 // Reconfigure the codec to use the negotiated version.
951 //
952 // TODO: The tokio documentation says not to do this while any frames are still being processed.
953 // Since we don't know that here, another way might be to release the tcp
954 // stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
955 let bare_codec = peer_conn.codec_mut();
956 bare_codec.reconfigure_version(connection_info.negotiated_version);
957
958 debug!("constructing client, spawning server");
959
960 // These channels communicate between the inbound and outbound halves of the connection,
961 // and between the different connection tasks. We create separate tasks and channels
962 // for each new connection.
963 let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
964 let (shutdown_tx, shutdown_rx) = oneshot::channel();
965 let error_slot = ErrorSlot::default();
966
967 let (peer_tx, peer_rx) = peer_conn.split();
968
969 // Instrument the peer's rx and tx streams.
970
971 let inner_conn_span = connection_span.clone();
972 let peer_tx = peer_tx.with(move |msg: Message| {
973 let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
974 // Add a metric for outbound messages.
975 metrics::counter!(
976 "zcash.net.out.messages",
977 "command" => msg.command(),
978 "addr" => connected_addr.get_transient_addr_label(),
979 )
980 .increment(1);
981 // We need to use future::ready rather than an async block here,
982 // because we need the sink to be Unpin, and the With<Fut, ...>
983 // returned by .with is Unpin only if Fut is Unpin, and the
984 // futures generated by async blocks are not Unpin.
985 future::ready(Ok(msg)).instrument(span)
986 });
987
988 // CORRECTNESS
989 //
990 // Ping/Pong messages and every error must update the peer address state via
991 // the inbound_ts_collector.
992 //
993 // The heartbeat task sends regular Ping/Pong messages,
994 // and it ends the connection if the heartbeat times out.
995 // So we can just track peer activity based on Ping and Pong.
996 // (This significantly improves performance, by reducing time system calls.)
997 let inbound_ts_collector = address_book_updater.clone();
998 let inbound_inv_collector = inv_collector.clone();
999 let ts_inner_conn_span = connection_span.clone();
1000 let inv_inner_conn_span = connection_span.clone();
1001 let peer_rx = peer_rx
1002 .then(move |msg| {
1003 // Add a metric for inbound messages and errors.
1004 // Fire a timestamp or failure event.
1005 let inbound_ts_collector = inbound_ts_collector.clone();
1006 let span =
1007 debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
1008
1009 async move {
1010 match &msg {
1011 Ok(msg) => {
1012 metrics::counter!(
1013 "zcash.net.in.messages",
1014 "command" => msg.command(),
1015 "addr" => connected_addr.get_transient_addr_label(),
1016 )
1017 .increment(1);
1018
1019 // # Security
1020 //
1021 // Peer messages are not rate-limited, so we can't send anything
1022 // to a shared channel or do anything expensive here.
1023 }
1024 Err(err) => {
1025 metrics::counter!(
1026 "zebra.net.in.errors",
1027 "error" => err.to_string(),
1028 "addr" => connected_addr.get_transient_addr_label(),
1029 )
1030 .increment(1);
1031
1032 // # Security
1033 //
1034 // Peer errors are rate-limited because:
1035 // - opening connections is rate-limited
1036 // - the number of connections is limited
1037 // - after the first error, the peer is disconnected
1038 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1039 let _ = inbound_ts_collector
1040 .send(MetaAddr::new_errored(book_addr, remote_services))
1041 .await;
1042 }
1043 }
1044 }
1045 msg
1046 }
1047 .instrument(span)
1048 })
1049 .then(move |msg| {
1050 let inbound_inv_collector = inbound_inv_collector.clone();
1051 let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
1052 register_inventory_status(msg, connected_addr, inbound_inv_collector)
1053 .instrument(span)
1054 })
1055 .boxed();
1056
1057 // If we've learned potential peer addresses from the inbound connection remote address
1058 // or the handshake version message, add those addresses to the peer cache for this
1059 // peer.
1060 //
1061 // # Security
1062 //
1063 // We can't add these alternate addresses directly to the address book. If we did,
1064 // malicious peers could interfere with the address book state of other peers by
1065 // providing their addresses in `Version` messages. Or they could fill the address book
1066 // with fake addresses.
1067 //
1068 // These peer addresses are rate-limited because:
1069 // - opening connections is rate-limited
1070 // - these addresses are put in the peer address cache
1071 // - the peer address cache is only used when Zebra requests addresses from that peer
1072 let remote_canonical_addr = connection_info.remote.address_from.addr();
1073 let alternate_addrs = connected_addr
1074 .get_alternate_addrs(remote_canonical_addr)
1075 .map(|addr| {
1076 // Assume the connecting node is a server node, and it's available now.
1077 MetaAddr::new_gossiped_meta_addr(
1078 addr,
1079 PeerServices::NODE_NETWORK,
1080 DateTime32::now(),
1081 )
1082 });
1083
1084 let server = Connection::new(
1085 inbound_service,
1086 server_rx,
1087 error_slot.clone(),
1088 peer_tx,
1089 connection_tracker,
1090 connection_info.clone(),
1091 alternate_addrs.collect(),
1092 );
1093
1094 let connection_task = tokio::spawn(
1095 server
1096 .run(peer_rx)
1097 .instrument(connection_span.clone())
1098 .boxed(),
1099 );
1100
1101 let heartbeat_task = tokio::spawn(
1102 send_periodic_heartbeats_with_shutdown_handle(
1103 connected_addr,
1104 shutdown_rx,
1105 server_tx.clone(),
1106 address_book_updater.clone(),
1107 )
1108 .instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
1109 .boxed(),
1110 );
1111
1112 let client = Client {
1113 connection_info,
1114 shutdown_tx: Some(shutdown_tx),
1115 server_tx,
1116 inv_collector,
1117 error_slot,
1118 connection_task,
1119 heartbeat_task,
1120 };
1121
1122 Ok(client)
1123 };
1124
1125 // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
1126 let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
1127
1128 // Spawn a new task to drive this handshake, forwarding panics to the calling task.
1129 tokio::spawn(fut.instrument(negotiator_span))
1130 .map(
1131 |join_result: Result<
1132 Result<Result<Client, HandshakeError>, error::Elapsed>,
1133 JoinError,
1134 >| {
1135 match join_result {
1136 Ok(Ok(Ok(connection_client))) => Ok(connection_client),
1137 Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
1138 Ok(Err(timeout_error)) => Err(timeout_error.into()),
1139 Err(join_error) => match join_error.try_into_panic() {
1140 // Forward panics to the calling task
1141 Ok(panic_reason) => panic::resume_unwind(panic_reason),
1142 Err(join_error) => Err(join_error.into()),
1143 },
1144 }
1145 },
1146 )
1147 .boxed()
1148 }
1149}
1150
1151/// Register any advertised or missing inventory in `msg` for `connected_addr`.
1152pub(crate) async fn register_inventory_status(
1153 msg: Result<Message, SerializationError>,
1154 connected_addr: ConnectedAddr,
1155 inv_collector: broadcast::Sender<InventoryChange>,
1156) -> Result<Message, SerializationError> {
1157 match (&msg, connected_addr.get_transient_addr()) {
1158 (Ok(Message::Inv(advertised)), Some(transient_addr)) => {
1159 // We ignore inventory messages with more than one
1160 // block, because they are most likely replies to a
1161 // query, rather than a newly gossiped block.
1162 //
1163 // (We process inventory messages with any number of
1164 // transactions.)
1165 //
1166 // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
1167 //
1168 // Note: zcashd has a bug where it merges queued inv messages of
1169 // the same or different types. Zebra compensates by sending `notfound`
1170 // responses to the inv collector. (#2156, #1768)
1171 //
1172 // (We can't split `inv`s, because that fills the inventory registry
1173 // with useless entries that the whole network has, making it large and slow.)
1174 match advertised.as_slice() {
1175 [advertised @ InventoryHash::Block(_)] => {
1176 debug!(
1177 ?advertised,
1178 "registering gossiped advertised block inventory for peer"
1179 );
1180
1181 // The peer set and inv collector use the peer's remote
1182 // address as an identifier
1183 // If all receivers have been dropped, `send` returns an error.
1184 // When that happens, Zebra is shutting down, so we want to ignore this error.
1185 let _ = inv_collector
1186 .send(InventoryChange::new_available(*advertised, transient_addr));
1187 }
1188 advertised => {
1189 let advertised = advertised
1190 .iter()
1191 .filter(|advertised| advertised.unmined_tx_id().is_some());
1192
1193 debug!(
1194 ?advertised,
1195 "registering advertised unmined transaction inventory for peer",
1196 );
1197
1198 if let Some(change) =
1199 InventoryChange::new_available_multi(advertised, transient_addr)
1200 {
1201 // Ignore channel errors that should only happen during shutdown.
1202 let _ = inv_collector.send(change);
1203 }
1204 }
1205 }
1206 }
1207
1208 (Ok(Message::NotFound(missing)), Some(transient_addr)) => {
1209 // Ignore Errors and the unsupported FilteredBlock type
1210 let missing = missing.iter().filter(|missing| {
1211 missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
1212 });
1213
1214 debug!(?missing, "registering missing inventory for peer");
1215
1216 if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
1217 let _ = inv_collector.send(change);
1218 }
1219 }
1220 _ => {}
1221 }
1222
1223 msg
1224}
1225
1226/// Send periodical heartbeats to `server_tx`, and update the peer status through
1227/// `heartbeat_ts_collector`.
1228///
1229/// # Correctness
1230///
1231/// To prevent hangs:
1232/// - every await that depends on the network must have a timeout (or interval)
1233/// - every error/shutdown must update the address book state and return
1234///
1235/// The address book state can be updated via `ClientRequest.tx`, or the
1236/// heartbeat_ts_collector.
1237///
1238/// Returning from this function terminates the connection's heartbeat task.
1239async fn send_periodic_heartbeats_with_shutdown_handle(
1240 connected_addr: ConnectedAddr,
1241 shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
1242 server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1243 heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1244) -> Result<(), BoxError> {
1245 use futures::future::Either;
1246
1247 let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
1248 connected_addr,
1249 server_tx,
1250 heartbeat_ts_collector.clone(),
1251 );
1252
1253 pin_mut!(shutdown_rx);
1254 pin_mut!(heartbeat_run_loop);
1255
1256 // CORRECTNESS
1257 //
1258 // Currently, select prefers the first future if multiple
1259 // futures are ready.
1260 //
1261 // Starvation is impossible here, because interval has a
1262 // slow rate, and shutdown is a oneshot. If both futures
1263 // are ready, we want the shutdown to take priority over
1264 // sending a useless heartbeat.
1265 match future::select(shutdown_rx, heartbeat_run_loop).await {
1266 Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
1267 tracing::trace!("shutting down because Client requested shut down");
1268 handle_heartbeat_shutdown(
1269 PeerError::ClientCancelledHeartbeatTask,
1270 &heartbeat_ts_collector,
1271 &connected_addr,
1272 )
1273 .await
1274 }
1275 Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
1276 tracing::trace!("shutting down because Client was dropped");
1277 handle_heartbeat_shutdown(
1278 PeerError::ClientDropped,
1279 &heartbeat_ts_collector,
1280 &connected_addr,
1281 )
1282 .await
1283 }
1284 Either::Right((result, _unused_shutdown)) => {
1285 tracing::trace!("shutting down due to heartbeat failure");
1286 // heartbeat_timeout() already send an error on the timestamp collector channel
1287
1288 result
1289 }
1290 }
1291}
1292
1293/// Send periodical heartbeats to `server_tx`, and update the peer status through
1294/// `heartbeat_ts_collector`.
1295///
1296/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
1297async fn send_periodic_heartbeats_run_loop(
1298 connected_addr: ConnectedAddr,
1299 mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1300 heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1301) -> Result<(), BoxError> {
1302 // Don't send the first heartbeat immediately - we've just completed the handshake!
1303 let mut interval = tokio::time::interval_at(
1304 Instant::now() + constants::HEARTBEAT_INTERVAL,
1305 constants::HEARTBEAT_INTERVAL,
1306 );
1307 // If the heartbeat is delayed, also delay all future heartbeats.
1308 // (Shorter heartbeat intervals just add load, without any benefit.)
1309 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1310
1311 let mut interval_stream = IntervalStream::new(interval);
1312
1313 while let Some(_instant) = interval_stream.next().await {
1314 // We've reached another heartbeat interval without
1315 // shutting down, so do a heartbeat request.
1316 let ping_sent_at = Instant::now();
1317 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1318 let _ = heartbeat_ts_collector
1319 .send(MetaAddr::new_ping_sent(book_addr, ping_sent_at.into()))
1320 .await;
1321 }
1322
1323 let heartbeat = send_one_heartbeat(&mut server_tx);
1324 let rtt = heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1325
1326 // # Security
1327 //
1328 // Peer heartbeats are rate-limited because:
1329 // - opening connections is rate-limited
1330 // - the number of connections is limited
1331 // - Zebra initiates each heartbeat using a timer
1332 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1333 if let Some(rtt) = rtt {
1334 // the collector doesn't depend on network activity,
1335 // so this await should not hang
1336 let _ = heartbeat_ts_collector
1337 .send(MetaAddr::new_responded(book_addr, Some(rtt)))
1338 .await;
1339 }
1340 }
1341 }
1342
1343 unreachable!("unexpected IntervalStream termination")
1344}
1345
1346/// Send one heartbeat using `server_tx`.
1347async fn send_one_heartbeat(
1348 server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1349) -> Result<Response, BoxError> {
1350 // We just reached a heartbeat interval, so start sending
1351 // a heartbeat.
1352 let (tx, rx) = oneshot::channel();
1353
1354 // Try to send the heartbeat request
1355 let request = Request::Ping(Nonce::default());
1356 tracing::trace!(?request, "queueing heartbeat request");
1357 match server_tx.try_send(ClientRequest {
1358 request,
1359 tx,
1360 // we're not requesting inventory, so we don't need to update the registry
1361 inv_collector: None,
1362 transient_addr: None,
1363 span: tracing::Span::current(),
1364 }) {
1365 Ok(()) => {}
1366 Err(e) => {
1367 if e.is_disconnected() {
1368 Err(PeerError::ConnectionClosed)?;
1369 } else if e.is_full() {
1370 // Send the message when the Client becomes ready.
1371 // If sending takes too long, the heartbeat timeout will elapse
1372 // and close the connection, reducing our load to busy peers.
1373 server_tx.send(e.into_inner()).await?;
1374 } else {
1375 // we need to map unexpected error types to PeerErrors
1376 warn!(?e, "unexpected try_send error");
1377 Err(e)?;
1378 };
1379 }
1380 }
1381
1382 // Flush the heartbeat request from the queue
1383 server_tx.flush().await?;
1384 tracing::trace!("sent heartbeat request");
1385
1386 // Heartbeats are checked internally to the
1387 // connection logic, but we need to wait on the
1388 // response to avoid canceling the request.
1389 let response = rx.await??;
1390 tracing::trace!(?response, "got heartbeat response");
1391
1392 Ok(response)
1393}
1394
1395/// Wrap `fut` in a timeout, handing any inner or outer errors using
1396/// `handle_heartbeat_error`.
1397async fn heartbeat_timeout(
1398 fut: impl Future<Output = Result<Response, BoxError>>,
1399 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1400 connected_addr: &ConnectedAddr,
1401) -> Result<Option<Duration>, BoxError> {
1402 let response = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1403 Ok(inner_result) => {
1404 handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
1405 }
1406 Err(elapsed) => {
1407 handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
1408 }
1409 };
1410
1411 let rtt = match response {
1412 Response::Pong(rtt) => Some(rtt),
1413 _ => None,
1414 };
1415
1416 Ok(rtt)
1417}
1418
1419/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
1420async fn handle_heartbeat_error<T, E>(
1421 result: Result<T, E>,
1422 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1423 connected_addr: &ConnectedAddr,
1424) -> Result<T, E>
1425where
1426 E: std::fmt::Debug,
1427{
1428 match result {
1429 Ok(t) => Ok(t),
1430 Err(err) => {
1431 tracing::debug!(?err, "heartbeat error, shutting down");
1432
1433 // # Security
1434 //
1435 // Peer errors and shutdowns are rate-limited because:
1436 // - opening connections is rate-limited
1437 // - the number of connections is limited
1438 // - after the first error or shutdown, the peer is disconnected
1439 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1440 let _ = address_book_updater
1441 .send(MetaAddr::new_errored(book_addr, None))
1442 .await;
1443 }
1444 Err(err)
1445 }
1446 }
1447}
1448
1449/// Mark `connected_addr` as shut down using `address_book_updater`.
1450async fn handle_heartbeat_shutdown(
1451 peer_error: PeerError,
1452 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1453 connected_addr: &ConnectedAddr,
1454) -> Result<(), BoxError> {
1455 tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
1456
1457 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1458 let _ = address_book_updater
1459 .send(MetaAddr::new_shutdown(book_addr))
1460 .await;
1461 }
1462
1463 Err(peer_error.into())
1464}