zebra_network/peer/handshake.rs
1//! Initial [`Handshake`]s with Zebra peers over a `PeerTransport`.
2
3use std::{
4 cmp::min,
5 fmt,
6 future::Future,
7 net::{Ipv4Addr, SocketAddr},
8 panic,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Duration,
13};
14
15use chrono::{TimeZone, Utc};
16use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
17use indexmap::IndexSet;
18use tokio::{
19 io::{AsyncRead, AsyncWrite},
20 sync::broadcast,
21 task::JoinError,
22 time::{error, timeout, Instant},
23};
24use tokio_stream::wrappers::IntervalStream;
25use tokio_util::codec::Framed;
26use tower::Service;
27use tracing::{span, Level, Span};
28use tracing_futures::Instrument;
29
30use zebra_chain::{
31 chain_tip::{ChainTip, NoChainTip},
32 parameters::Network,
33 serialization::{DateTime32, SerializationError},
34};
35
36use crate::{
37 constants,
38 meta_addr::MetaAddrChange,
39 peer::{
40 CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
41 MinimumPeerVersion, PeerError,
42 },
43 peer_set::{ConnectionTracker, InventoryChange},
44 protocol::{
45 external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
46 internal::{Request, Response},
47 },
48 types::MetaAddr,
49 BoxError, Config, PeerSocketAddr, VersionMessage,
50};
51
52#[cfg(test)]
53mod tests;
54
55/// A [`Service`] that handshakes with a remote peer and constructs a
56/// client/server pair.
57///
58/// CORRECTNESS
59///
60/// To avoid hangs, each handshake (or its connector) should be:
61/// - launched in a separate task, and
62/// - wrapped in a timeout.
63pub struct Handshake<S, C = NoChainTip>
64where
65 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
66 S::Future: Send,
67 C: ChainTip + Clone + Send + 'static,
68{
69 config: Config,
70 user_agent: String,
71 our_services: PeerServices,
72 relay: bool,
73
74 inbound_service: S,
75 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
76 inv_collector: broadcast::Sender<InventoryChange>,
77 minimum_peer_version: MinimumPeerVersion<C>,
78 nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
79
80 parent_span: Span,
81}
82
83impl<S, C> fmt::Debug for Handshake<S, C>
84where
85 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
86 S::Future: Send,
87 C: ChainTip + Clone + Send + 'static,
88{
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 // skip the channels, they don't tell us anything useful
91 f.debug_struct(std::any::type_name::<Handshake<S, C>>())
92 .field("config", &self.config)
93 .field("user_agent", &self.user_agent)
94 .field("our_services", &self.our_services)
95 .field("relay", &self.relay)
96 .field("minimum_peer_version", &self.minimum_peer_version)
97 .field("parent_span", &self.parent_span)
98 .finish()
99 }
100}
101
102impl<S, C> Clone for Handshake<S, C>
103where
104 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
105 S::Future: Send,
106 C: ChainTip + Clone + Send + 'static,
107{
108 fn clone(&self) -> Self {
109 Self {
110 config: self.config.clone(),
111 user_agent: self.user_agent.clone(),
112 our_services: self.our_services,
113 relay: self.relay,
114 inbound_service: self.inbound_service.clone(),
115 address_book_updater: self.address_book_updater.clone(),
116 inv_collector: self.inv_collector.clone(),
117 minimum_peer_version: self.minimum_peer_version.clone(),
118 nonces: self.nonces.clone(),
119 parent_span: self.parent_span.clone(),
120 }
121 }
122}
123
124/// The metadata for a peer connection.
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub struct ConnectionInfo {
127 /// The connected peer address, if known.
128 /// This address might not be valid for outbound connections.
129 ///
130 /// Peers can be connected via a transient inbound or proxy address,
131 /// which will appear as the connected address to the OS and Zebra.
132 pub connected_addr: ConnectedAddr,
133
134 /// The network protocol [`VersionMessage`] sent by the remote peer.
135 pub remote: VersionMessage,
136
137 /// The network protocol version negotiated with the remote peer.
138 ///
139 /// Derived from `remote.version` and the
140 /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
141 pub negotiated_version: Version,
142}
143
144/// The peer address that we are handshaking with.
145///
146/// Typically, we can rely on outbound addresses, but inbound addresses don't
147/// give us enough information to reconnect to that peer.
148#[derive(Copy, Clone, PartialEq, Eq)]
149pub enum ConnectedAddr {
150 /// The address we used to make a direct outbound connection.
151 ///
152 /// In an honest network, a Zcash peer is listening on this exact address
153 /// and port.
154 OutboundDirect {
155 /// The connected outbound remote address and port.
156 addr: PeerSocketAddr,
157 },
158
159 /// The address we received from the OS, when a remote peer directly
160 /// connected to our Zcash listener port.
161 ///
162 /// In an honest network, a Zcash peer might be listening on this address,
163 /// if its outbound address is the same as its listener address. But the port
164 /// is an ephemeral outbound TCP port, not a listener port.
165 InboundDirect {
166 /// The connected inbound remote address and ephemeral port.
167 ///
168 /// The IP address might be the address of a Zcash peer, but the port is an ephemeral port.
169 addr: PeerSocketAddr,
170 },
171
172 /// The proxy address we used to make an outbound connection.
173 ///
174 /// The proxy address can be used by many connections, but our own ephemeral
175 /// outbound address and port can be used as an identifier for the duration
176 /// of this connection.
177 OutboundProxy {
178 /// The remote address and port of the proxy.
179 proxy_addr: SocketAddr,
180
181 /// The local address and transient port we used to connect to the proxy.
182 transient_local_addr: SocketAddr,
183 },
184
185 /// The address we received from the OS, when a remote peer connected via an
186 /// inbound proxy.
187 ///
188 /// The proxy's ephemeral outbound address can be used as an identifier for
189 /// the duration of this connection.
190 InboundProxy {
191 /// The local address and transient port we used to connect to the proxy.
192 transient_addr: SocketAddr,
193 },
194
195 /// An isolated connection, where we deliberately don't have any connection metadata.
196 Isolated,
197 //
198 // TODO: handle Tor onion addresses
199}
200
201/// Get an unspecified IPv4 address for `network`
202pub fn get_unspecified_ipv4_addr(network: Network) -> SocketAddr {
203 (Ipv4Addr::UNSPECIFIED, network.default_port()).into()
204}
205
206use ConnectedAddr::*;
207
208impl ConnectedAddr {
209 /// Returns a new outbound directly connected addr.
210 pub fn new_outbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
211 OutboundDirect { addr }
212 }
213
214 /// Returns a new inbound directly connected addr.
215 pub fn new_inbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
216 InboundDirect { addr }
217 }
218
219 /// Returns a new outbound connected addr via `proxy`.
220 ///
221 /// `local_addr` is the ephemeral local address of the connection.
222 #[allow(unused)]
223 pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
224 OutboundProxy {
225 proxy_addr: proxy,
226 transient_local_addr: local_addr,
227 }
228 }
229
230 /// Returns a new inbound connected addr from `proxy`.
231 //
232 // TODO: distinguish between direct listeners and proxy listeners in the
233 // rest of zebra-network
234 #[allow(unused)]
235 pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
236 InboundProxy {
237 transient_addr: proxy,
238 }
239 }
240
241 /// Returns a new isolated connected addr, with no metadata.
242 pub fn new_isolated() -> ConnectedAddr {
243 Isolated
244 }
245
246 /// Returns a `PeerSocketAddr` that can be used to track this connection in the
247 /// `AddressBook`.
248 ///
249 /// `None` for inbound connections, proxy connections, and isolated
250 /// connections.
251 ///
252 /// # Correctness
253 ///
254 /// This address can be used for reconnection attempts, or as a permanent
255 /// identifier.
256 ///
257 /// # Security
258 ///
259 /// This address must not depend on the canonical address from the `Version`
260 /// message. Otherwise, malicious peers could interfere with other peers
261 /// `AddressBook` state.
262 ///
263 /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
264 pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
265 match self {
266 OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
267 // TODO: consider using the canonical address of the peer to track
268 // outbound proxy connections
269 OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
270 }
271 }
272
273 /// Returns a `PeerSocketAddr` that can be used to temporarily identify a
274 /// connection.
275 ///
276 /// Isolated connections must not change Zebra's peer set or address book
277 /// state, so they do not have an identifier.
278 ///
279 /// # Correctness
280 ///
281 /// The returned address is only valid while the original connection is
282 /// open. It must not be used in the `AddressBook`, for outbound connection
283 /// attempts, or as a permanent identifier.
284 ///
285 /// # Security
286 ///
287 /// This address must not depend on the canonical address from the `Version`
288 /// message. Otherwise, malicious peers could interfere with other peers'
289 /// `PeerSet` state.
290 pub fn get_transient_addr(&self) -> Option<PeerSocketAddr> {
291 match self {
292 OutboundDirect { addr } => Some(*addr),
293 InboundDirect { addr } => Some(*addr),
294 OutboundProxy {
295 transient_local_addr,
296 ..
297 } => Some(PeerSocketAddr::from(*transient_local_addr)),
298 InboundProxy { transient_addr } => Some(PeerSocketAddr::from(*transient_addr)),
299 Isolated => None,
300 }
301 }
302
303 /// Returns the metrics label for this connection's address.
304 pub fn get_transient_addr_label(&self) -> String {
305 self.get_transient_addr()
306 .map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
307 }
308
309 /// Returns a short label for the kind of connection.
310 pub fn get_short_kind_label(&self) -> &'static str {
311 match self {
312 OutboundDirect { .. } => "Out",
313 InboundDirect { .. } => "In",
314 OutboundProxy { .. } => "ProxOut",
315 InboundProxy { .. } => "ProxIn",
316 Isolated => "Isol",
317 }
318 }
319
320 /// Returns a list of alternate remote peer addresses, which can be used for
321 /// reconnection attempts.
322 ///
323 /// Uses the connected address, and the remote canonical address.
324 ///
325 /// Skips duplicates. If this is an outbound connection, also skips the
326 /// remote address that we're currently connected to.
327 pub fn get_alternate_addrs(
328 &self,
329 mut canonical_remote: PeerSocketAddr,
330 ) -> impl Iterator<Item = PeerSocketAddr> {
331 let addrs = match self {
332 OutboundDirect { addr } => {
333 // Fixup unspecified addresses and ports using known good data
334 if canonical_remote.ip().is_unspecified() {
335 canonical_remote.set_ip(addr.ip());
336 }
337 if canonical_remote.port() == 0 {
338 canonical_remote.set_port(addr.port());
339 }
340
341 // Try the canonical remote address, if it is different from the
342 // outbound address (which we already have in our address book)
343 if &canonical_remote != addr {
344 vec![canonical_remote]
345 } else {
346 // we didn't learn a new address from the handshake:
347 // it's the same as the outbound address, which is already in our address book
348 Vec::new()
349 }
350 }
351
352 InboundDirect { addr } => {
353 // Use the IP from the TCP connection, and the port the peer told us
354 let maybe_addr = SocketAddr::new(addr.ip(), canonical_remote.port()).into();
355
356 // Try both addresses, but remove one duplicate if they match
357 if canonical_remote != maybe_addr {
358 vec![canonical_remote, maybe_addr]
359 } else {
360 vec![canonical_remote]
361 }
362 }
363
364 // Proxy addresses can't be used for reconnection attempts, but we
365 // can try the canonical remote address
366 OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],
367
368 // Hide all metadata for isolated connections
369 Isolated => Vec::new(),
370 };
371
372 addrs.into_iter()
373 }
374
375 /// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
376 pub fn is_inbound(&self) -> bool {
377 matches!(self, InboundDirect { .. } | InboundProxy { .. })
378 }
379}
380
381impl fmt::Debug for ConnectedAddr {
382 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383 let kind = self.get_short_kind_label();
384 let addr = self.get_transient_addr_label();
385
386 if matches!(self, Isolated) {
387 f.write_str(kind)
388 } else {
389 f.debug_tuple(kind).field(&addr).finish()
390 }
391 }
392}
393
394/// A builder for `Handshake`.
395pub struct Builder<S, C = NoChainTip>
396where
397 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
398 S::Future: Send,
399 C: ChainTip + Clone + Send + 'static,
400{
401 config: Option<Config>,
402 our_services: Option<PeerServices>,
403 user_agent: Option<String>,
404 relay: Option<bool>,
405
406 inbound_service: Option<S>,
407 address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
408 inv_collector: Option<broadcast::Sender<InventoryChange>>,
409 latest_chain_tip: C,
410}
411
412impl<S, C> Builder<S, C>
413where
414 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
415 S::Future: Send,
416 C: ChainTip + Clone + Send + 'static,
417{
418 /// Provide a config. Mandatory.
419 pub fn with_config(mut self, config: Config) -> Self {
420 self.config = Some(config);
421 self
422 }
423
424 /// Provide a service to handle inbound requests. Mandatory.
425 pub fn with_inbound_service(mut self, inbound_service: S) -> Self {
426 self.inbound_service = Some(inbound_service);
427 self
428 }
429
430 /// Provide a channel for registering inventory advertisements. Optional.
431 ///
432 /// This channel takes transient remote addresses, which the `PeerSet` uses
433 /// to look up peers that have specific inventory.
434 pub fn with_inventory_collector(
435 mut self,
436 inv_collector: broadcast::Sender<InventoryChange>,
437 ) -> Self {
438 self.inv_collector = Some(inv_collector);
439 self
440 }
441
442 /// Provide a hook for timestamp collection. Optional.
443 ///
444 /// This channel takes `MetaAddr`s, permanent addresses which can be used to
445 /// make outbound connections to peers.
446 pub fn with_address_book_updater(
447 mut self,
448 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
449 ) -> Self {
450 self.address_book_updater = Some(address_book_updater);
451 self
452 }
453
454 /// Provide the services this node advertises to other peers. Optional.
455 ///
456 /// If this is unset, the node will advertise itself as a client.
457 pub fn with_advertised_services(mut self, services: PeerServices) -> Self {
458 self.our_services = Some(services);
459 self
460 }
461
462 /// Provide this node's user agent. Optional.
463 ///
464 /// This must be a valid BIP14 string. If it is unset, the user-agent will be empty.
465 pub fn with_user_agent(mut self, user_agent: String) -> Self {
466 self.user_agent = Some(user_agent);
467 self
468 }
469
470 /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
471 ///
472 /// If this is unset, the minimum accepted protocol version for peer connections is kept
473 /// constant over network upgrade activations.
474 ///
475 /// Use [`NoChainTip`] to explicitly provide no chain tip.
476 pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
477 where
478 NewC: ChainTip + Clone + Send + 'static,
479 {
480 Builder {
481 latest_chain_tip,
482
483 // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
484 config: self.config,
485 inbound_service: self.inbound_service,
486 address_book_updater: self.address_book_updater,
487 our_services: self.our_services,
488 user_agent: self.user_agent,
489 relay: self.relay,
490 inv_collector: self.inv_collector,
491 }
492 }
493
494 /// Whether to request that peers relay transactions to our node. Optional.
495 ///
496 /// If this is unset, the node will not request transactions.
497 pub fn want_transactions(mut self, relay: bool) -> Self {
498 self.relay = Some(relay);
499 self
500 }
501
502 /// Consume this builder and produce a [`Handshake`].
503 ///
504 /// Returns an error only if any mandatory field was unset.
505 pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
506 let config = self.config.ok_or("did not specify config")?;
507 let inbound_service = self
508 .inbound_service
509 .ok_or("did not specify inbound service")?;
510 let inv_collector = self.inv_collector.unwrap_or_else(|| {
511 let (tx, _) = broadcast::channel(100);
512 tx
513 });
514 let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
515 // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
516 // channel. Dropping the receiver means sends will fail, but we don't care.
517 let (tx, _rx) = tokio::sync::mpsc::channel(1);
518 tx
519 });
520 let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
521 let user_agent = self.user_agent.unwrap_or_default();
522 let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
523 let relay = self.relay.unwrap_or(false);
524 let network = config.network.clone();
525 let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, &network);
526
527 Ok(Handshake {
528 config,
529 user_agent,
530 our_services,
531 relay,
532 inbound_service,
533 address_book_updater,
534 inv_collector,
535 minimum_peer_version,
536 nonces,
537 parent_span: Span::current(),
538 })
539 }
540}
541
542impl<S> Handshake<S, NoChainTip>
543where
544 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
545 S::Future: Send,
546{
547 /// Create a builder that configures a [`Handshake`] service.
548 pub fn builder() -> Builder<S, NoChainTip> {
549 // We don't derive `Default` because the derive inserts a `where S:
550 // Default` bound even though `Option<S>` implements `Default` even if
551 // `S` does not.
552 Builder {
553 config: None,
554 our_services: None,
555 user_agent: None,
556 relay: None,
557 inbound_service: None,
558 address_book_updater: None,
559 inv_collector: None,
560 latest_chain_tip: NoChainTip,
561 }
562 }
563}
564
565/// Negotiate the Zcash network protocol version with the remote peer at `connected_addr`, using
566/// the connection `peer_conn`.
567///
568/// We split `Handshake` into its components before calling this function, to avoid infectious
569/// `Sync` bounds on the returned future.
570///
571/// Returns the [`VersionMessage`] sent by the remote peer, and the [`Version`] negotiated with the
572/// remote peer, inside a [`ConnectionInfo`] struct.
573#[allow(clippy::too_many_arguments)]
574pub async fn negotiate_version<PeerTransport>(
575 peer_conn: &mut Framed<PeerTransport, Codec>,
576 connected_addr: &ConnectedAddr,
577 config: Config,
578 nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
579 user_agent: String,
580 our_services: PeerServices,
581 relay: bool,
582 mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
583) -> Result<Arc<ConnectionInfo>, HandshakeError>
584where
585 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
586{
587 // Create a random nonce for this connection
588 let local_nonce = Nonce::default();
589
590 // Insert the nonce for this handshake into the shared nonce set.
591 // Each connection has its own connection state, and handshakes execute concurrently.
592 //
593 // # Correctness
594 //
595 // It is ok to wait for the lock here, because handshakes have a short
596 // timeout, and the async mutex will be released when the task times
597 // out.
598 {
599 let mut locked_nonces = nonces.lock().await;
600
601 // Duplicate nonces are very rare, because they require a 64-bit random number collision,
602 // and the nonce set is limited to a few hundred entries.
603 let is_unique_nonce = locked_nonces.insert(local_nonce);
604 if !is_unique_nonce {
605 return Err(HandshakeError::LocalDuplicateNonce);
606 }
607
608 // # Security
609 //
610 // Limit the amount of memory used for nonces.
611 // Nonces can be left in the set if the connection fails or times out between
612 // the nonce being inserted, and it being removed.
613 //
614 // Zebra has strict connection limits, so we limit the number of nonces to
615 // the configured connection limit.
616 // This is a tradeoff between:
617 // - avoiding memory denial of service attacks which make large numbers of connections,
618 // for example, 100 failed inbound connections takes 1 second.
619 // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
620 // - collision probability: two hundred 64-bit nonces have a very low collision probability
621 // <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
622 while locked_nonces.len() > config.peerset_total_connection_limit() {
623 locked_nonces.shift_remove_index(0);
624 }
625
626 std::mem::drop(locked_nonces);
627 }
628
629 // Don't leak our exact clock skew to our peers. On the other hand,
630 // we can't deviate too much, or zcashd will get confused.
631 // Inspection of the zcashd source code reveals that the timestamp
632 // is only ever used at the end of parsing the version message, in
633 //
634 // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
635 //
636 // AddTimeData is defined in src/timedata.cpp and is a no-op as long
637 // as the difference between the specified timestamp and the
638 // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
639 // to 10 * 60 seconds (10 minutes).
640 //
641 // nTimeOffset is peer metadata that is never used, except for
642 // statistics.
643 //
644 // To try to stay within the range where zcashd will ignore our clock skew,
645 // truncate the timestamp to the nearest 5 minutes.
646 let now = Utc::now().timestamp();
647 let timestamp = Utc
648 .timestamp_opt(now - now.rem_euclid(5 * 60), 0)
649 .single()
650 .expect("in-range number of seconds and valid nanosecond");
651
652 let (their_addr, our_services, our_listen_addr) = match connected_addr {
653 // Version messages require an address, so we use
654 // an unspecified address for Isolated connections
655 Isolated => {
656 let unspec_ipv4 = get_unspecified_ipv4_addr(config.network);
657 (unspec_ipv4.into(), PeerServices::empty(), unspec_ipv4)
658 }
659 _ => {
660 let their_addr = connected_addr
661 .get_transient_addr()
662 .expect("non-Isolated connections have a remote addr");
663
664 // Include the configured external address in our version message, if any, otherwise, include our listen address.
665 let advertise_addr = match config.external_addr {
666 Some(external_addr) => {
667 info!(?their_addr, ?config.listen_addr, "using external address for Version messages");
668 external_addr
669 }
670 None => config.listen_addr,
671 };
672
673 (their_addr, our_services, advertise_addr)
674 }
675 };
676
677 let our_version = VersionMessage {
678 version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
679 services: our_services,
680 timestamp,
681 address_recv: AddrInVersion::new(their_addr, PeerServices::NODE_NETWORK),
682 // TODO: detect external address (#1893)
683 address_from: AddrInVersion::new(our_listen_addr, our_services),
684 nonce: local_nonce,
685 user_agent: user_agent.clone(),
686 start_height: minimum_peer_version.chain_tip_height(),
687 relay,
688 }
689 .into();
690
691 debug!(?our_version, "sending initial version message");
692 peer_conn.send(our_version).await?;
693
694 let mut remote_msg = peer_conn
695 .next()
696 .await
697 .ok_or(HandshakeError::ConnectionClosed)??;
698
699 // Wait for next message if the one we got is not Version
700 let remote: VersionMessage = loop {
701 match remote_msg {
702 Message::Version(version_message) => {
703 debug!(?version_message, "got version message from remote peer");
704 break version_message;
705 }
706 _ => {
707 remote_msg = peer_conn
708 .next()
709 .await
710 .ok_or(HandshakeError::ConnectionClosed)??;
711 debug!(?remote_msg, "ignoring non-version message from remote peer");
712 }
713 }
714 };
715
716 let remote_address_services = remote.address_from.untrusted_services();
717 if remote_address_services != remote.services {
718 info!(
719 ?remote.services,
720 ?remote_address_services,
721 ?remote.user_agent,
722 "peer with inconsistent version services and version address services",
723 );
724 }
725
726 // Check for nonce reuse, indicating self-connection
727 //
728 // # Correctness
729 //
730 // We must wait for the lock before we continue with the connection, to avoid
731 // self-connection. If the connection times out, the async lock will be
732 // released.
733 //
734 // # Security
735 //
736 // We don't remove the nonce here, because peers that observe our network traffic could
737 // maliciously remove nonces, and force us to make self-connections.
738 let nonce_reuse = nonces.lock().await.contains(&remote.nonce);
739 if nonce_reuse {
740 info!(?connected_addr, "rejecting self-connection attempt");
741 Err(HandshakeError::RemoteNonceReuse)?;
742 }
743
744 // # Security
745 //
746 // Reject connections to peers on old versions, because they might not know about all
747 // network upgrades and could lead to chain forks or slower block propagation.
748 let min_version = minimum_peer_version.current();
749
750 if remote.version < min_version {
751 debug!(
752 remote_ip = ?their_addr,
753 ?remote.version,
754 ?min_version,
755 ?remote.user_agent,
756 "disconnecting from peer with obsolete network protocol version",
757 );
758
759 // the value is the number of rejected handshakes, by peer IP and protocol version
760 metrics::counter!(
761 "zcash.net.peers.obsolete",
762 "remote_ip" => their_addr.to_string(),
763 "remote_version" => remote.version.to_string(),
764 "min_version" => min_version.to_string(),
765 "user_agent" => remote.user_agent.clone(),
766 )
767 .increment(1);
768
769 // the value is the remote version of the most recent rejected handshake from each peer
770 metrics::gauge!(
771 "zcash.net.peers.version.obsolete",
772 "remote_ip" => their_addr.to_string(),
773 )
774 .set(remote.version.0 as f64);
775
776 // Disconnect if peer is using an obsolete version.
777 return Err(HandshakeError::ObsoleteVersion(remote.version));
778 }
779
780 let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
781
782 // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
783 let connection_info = Arc::new(ConnectionInfo {
784 connected_addr: *connected_addr,
785 remote,
786 negotiated_version,
787 });
788
789 debug!(
790 remote_ip = ?their_addr,
791 ?connection_info.remote.version,
792 ?negotiated_version,
793 ?min_version,
794 ?connection_info.remote.user_agent,
795 "negotiated network protocol version with peer",
796 );
797
798 // the value is the number of connected handshakes, by peer IP and protocol version
799 metrics::counter!(
800 "zcash.net.peers.connected",
801 "remote_ip" => their_addr.to_string(),
802 "remote_version" => connection_info.remote.version.to_string(),
803 "negotiated_version" => negotiated_version.to_string(),
804 "min_version" => min_version.to_string(),
805 "user_agent" => connection_info.remote.user_agent.clone(),
806 )
807 .increment(1);
808
809 // the value is the remote version of the most recent connected handshake from each peer
810 metrics::gauge!(
811 "zcash.net.peers.version.connected",
812 "remote_ip" => their_addr.to_string(),
813 )
814 .set(connection_info.remote.version.0 as f64);
815
816 peer_conn.send(Message::Verack).await?;
817
818 let mut remote_msg = peer_conn
819 .next()
820 .await
821 .ok_or(HandshakeError::ConnectionClosed)??;
822
823 // Wait for next message if the one we got is not Verack
824 loop {
825 match remote_msg {
826 Message::Verack => {
827 debug!(?remote_msg, "got verack message from remote peer");
828 break;
829 }
830 _ => {
831 remote_msg = peer_conn
832 .next()
833 .await
834 .ok_or(HandshakeError::ConnectionClosed)??;
835 debug!(?remote_msg, "ignoring non-verack message from remote peer");
836 }
837 }
838 }
839
840 Ok(connection_info)
841}
842
843/// A handshake request.
844/// Contains the information needed to handshake with the peer.
845pub struct HandshakeRequest<PeerTransport>
846where
847 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
848{
849 /// The tokio [`TcpStream`](tokio::net::TcpStream) or Tor
850 /// `arti_client::DataStream` to the peer.
851 // Use [`arti_client::DataStream`] when #5492 is done.
852 pub data_stream: PeerTransport,
853
854 /// The address of the peer, and other related information.
855 pub connected_addr: ConnectedAddr,
856
857 /// A connection tracker that reduces the open connection count when dropped.
858 ///
859 /// Used to limit the number of open connections in Zebra.
860 pub connection_tracker: ConnectionTracker,
861}
862
863impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
864where
865 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
866 S::Future: Send,
867 C: ChainTip + Clone + Send + 'static,
868 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
869{
870 type Response = Client;
871 type Error = BoxError;
872 type Future =
873 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
874
875 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
876 Poll::Ready(Ok(()))
877 }
878
879 fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
880 let HandshakeRequest {
881 data_stream,
882 connected_addr,
883 connection_tracker,
884 } = req;
885
886 let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
887 // set the peer connection span's parent to the global span, as it
888 // should exist independently of its creation source (inbound
889 // connection, crawler, initial peer, ...)
890 let connection_span =
891 span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
892
893 // Clone these upfront, so they can be moved into the future.
894 let nonces = self.nonces.clone();
895 let inbound_service = self.inbound_service.clone();
896 let address_book_updater = self.address_book_updater.clone();
897 let inv_collector = self.inv_collector.clone();
898 let config = self.config.clone();
899 let user_agent = self.user_agent.clone();
900 let our_services = self.our_services;
901 let relay = self.relay;
902 let minimum_peer_version = self.minimum_peer_version.clone();
903
904 // # Security
905 //
906 // `zebra_network::init()` implements a connection timeout on this future.
907 // Any code outside this future does not have a timeout.
908 let fut = async move {
909 debug!(
910 addr = ?connected_addr,
911 "negotiating protocol version with remote peer"
912 );
913
914 // Start timing the handshake for metrics
915 let handshake_start = Instant::now();
916
917 let mut peer_conn = Framed::new(
918 data_stream,
919 Codec::builder()
920 .for_network(&config.network)
921 .with_metrics_addr_label(connected_addr.get_transient_addr_label())
922 .finish(),
923 );
924
925 let connection_info = match negotiate_version(
926 &mut peer_conn,
927 &connected_addr,
928 config,
929 nonces,
930 user_agent,
931 our_services,
932 relay,
933 minimum_peer_version,
934 )
935 .await
936 {
937 Ok(info) => {
938 // Record successful handshake duration
939 let duration = handshake_start.elapsed().as_secs_f64();
940 metrics::histogram!(
941 "zcash.net.peer.handshake.duration_seconds",
942 "result" => "success"
943 )
944 .record(duration);
945 info
946 }
947 Err(err) => {
948 // Record failed handshake duration and failure reason
949 let duration = handshake_start.elapsed().as_secs_f64();
950 let reason = match &err {
951 HandshakeError::UnexpectedMessage(_) => "unexpected_message",
952 HandshakeError::RemoteNonceReuse => "nonce_reuse",
953 HandshakeError::LocalDuplicateNonce => "duplicate_nonce",
954 HandshakeError::ConnectionClosed => "connection_closed",
955 HandshakeError::Io(_) => "io_error",
956 HandshakeError::Serialization(_) => "serialization",
957 HandshakeError::ObsoleteVersion(_) => "obsolete_version",
958 HandshakeError::Timeout => "timeout",
959 };
960 metrics::histogram!(
961 "zcash.net.peer.handshake.duration_seconds",
962 "result" => "failure"
963 )
964 .record(duration);
965 metrics::counter!(
966 "zcash.net.peer.handshake.failures.total",
967 "reason" => reason
968 )
969 .increment(1);
970 return Err(err);
971 }
972 };
973
974 let remote_services = connection_info.remote.services;
975
976 // The handshake succeeded: update the peer status from AttemptPending to Responded,
977 // and send initial connection info.
978 if let Some(book_addr) = connected_addr.get_address_book_addr() {
979 // the collector doesn't depend on network activity,
980 // so this await should not hang
981 let _ = address_book_updater
982 .send(MetaAddr::new_connected(
983 book_addr,
984 &remote_services,
985 connected_addr.is_inbound(),
986 ))
987 .await;
988 }
989
990 // Reconfigure the codec to use the negotiated version.
991 //
992 // TODO: The tokio documentation says not to do this while any frames are still being processed.
993 // Since we don't know that here, another way might be to release the tcp
994 // stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
995 let bare_codec = peer_conn.codec_mut();
996 bare_codec.reconfigure_version(connection_info.negotiated_version);
997
998 debug!("constructing client, spawning server");
999
1000 // These channels communicate between the inbound and outbound halves of the connection,
1001 // and between the different connection tasks. We create separate tasks and channels
1002 // for each new connection.
1003 let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
1004 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1005 let error_slot = ErrorSlot::default();
1006
1007 let (peer_tx, peer_rx) = peer_conn.split();
1008
1009 // Instrument the peer's rx and tx streams.
1010
1011 let inner_conn_span = connection_span.clone();
1012 let peer_tx = peer_tx.with(move |msg: Message| {
1013 let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
1014 // Add a metric for outbound messages.
1015 metrics::counter!(
1016 "zcash.net.out.messages",
1017 "command" => msg.command(),
1018 "addr" => connected_addr.get_transient_addr_label(),
1019 )
1020 .increment(1);
1021 // We need to use future::ready rather than an async block here,
1022 // because we need the sink to be Unpin, and the With<Fut, ...>
1023 // returned by .with is Unpin only if Fut is Unpin, and the
1024 // futures generated by async blocks are not Unpin.
1025 future::ready(Ok(msg)).instrument(span)
1026 });
1027
1028 // CORRECTNESS
1029 //
1030 // Ping/Pong messages and every error must update the peer address state via
1031 // the inbound_ts_collector.
1032 //
1033 // The heartbeat task sends regular Ping/Pong messages,
1034 // and it ends the connection if the heartbeat times out.
1035 // So we can just track peer activity based on Ping and Pong.
1036 // (This significantly improves performance, by reducing time system calls.)
1037 let inbound_ts_collector = address_book_updater.clone();
1038 let inbound_inv_collector = inv_collector.clone();
1039 let ts_inner_conn_span = connection_span.clone();
1040 let inv_inner_conn_span = connection_span.clone();
1041 let peer_rx = peer_rx
1042 .then(move |msg| {
1043 // Add a metric for inbound messages and errors.
1044 // Fire a timestamp or failure event.
1045 let inbound_ts_collector = inbound_ts_collector.clone();
1046 let span =
1047 debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
1048
1049 async move {
1050 match &msg {
1051 Ok(msg) => {
1052 metrics::counter!(
1053 "zcash.net.in.messages",
1054 "command" => msg.command(),
1055 "addr" => connected_addr.get_transient_addr_label(),
1056 )
1057 .increment(1);
1058
1059 // # Security
1060 //
1061 // Peer messages are not rate-limited, so we can't send anything
1062 // to a shared channel or do anything expensive here.
1063 }
1064 Err(err) => {
1065 metrics::counter!(
1066 "zebra.net.in.errors",
1067 "error" => err.to_string(),
1068 "addr" => connected_addr.get_transient_addr_label(),
1069 )
1070 .increment(1);
1071
1072 // # Security
1073 //
1074 // Peer errors are rate-limited because:
1075 // - opening connections is rate-limited
1076 // - the number of connections is limited
1077 // - after the first error, the peer is disconnected
1078 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1079 let _ = inbound_ts_collector
1080 .send(MetaAddr::new_errored(book_addr, remote_services))
1081 .await;
1082 }
1083 }
1084 }
1085 msg
1086 }
1087 .instrument(span)
1088 })
1089 .then(move |msg| {
1090 let inbound_inv_collector = inbound_inv_collector.clone();
1091 let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
1092 register_inventory_status(msg, connected_addr, inbound_inv_collector)
1093 .instrument(span)
1094 })
1095 .boxed();
1096
1097 // If we've learned potential peer addresses from the inbound connection remote address
1098 // or the handshake version message, add those addresses to the peer cache for this
1099 // peer.
1100 //
1101 // # Security
1102 //
1103 // We can't add these alternate addresses directly to the address book. If we did,
1104 // malicious peers could interfere with the address book state of other peers by
1105 // providing their addresses in `Version` messages. Or they could fill the address book
1106 // with fake addresses.
1107 //
1108 // These peer addresses are rate-limited because:
1109 // - opening connections is rate-limited
1110 // - these addresses are put in the peer address cache
1111 // - the peer address cache is only used when Zebra requests addresses from that peer
1112 let remote_canonical_addr = connection_info.remote.address_from.addr();
1113 let alternate_addrs = connected_addr
1114 .get_alternate_addrs(remote_canonical_addr)
1115 .map(|addr| {
1116 // Assume the connecting node is a server node, and it's available now.
1117 MetaAddr::new_gossiped_meta_addr(
1118 addr,
1119 PeerServices::NODE_NETWORK,
1120 DateTime32::now(),
1121 )
1122 });
1123
1124 let server = Connection::new(
1125 inbound_service,
1126 server_rx,
1127 error_slot.clone(),
1128 peer_tx,
1129 connection_tracker,
1130 connection_info.clone(),
1131 alternate_addrs.collect(),
1132 );
1133
1134 let connection_task = tokio::spawn(
1135 server
1136 .run(peer_rx)
1137 .instrument(connection_span.clone())
1138 .boxed(),
1139 );
1140
1141 let heartbeat_task = tokio::spawn(
1142 send_periodic_heartbeats_with_shutdown_handle(
1143 connected_addr,
1144 shutdown_rx,
1145 server_tx.clone(),
1146 address_book_updater.clone(),
1147 )
1148 .instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
1149 .boxed(),
1150 );
1151
1152 let client = Client {
1153 connection_info,
1154 shutdown_tx: Some(shutdown_tx),
1155 server_tx,
1156 inv_collector,
1157 error_slot,
1158 connection_task,
1159 heartbeat_task,
1160 };
1161
1162 Ok(client)
1163 };
1164
1165 // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
1166 let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
1167
1168 // Spawn a new task to drive this handshake, forwarding panics to the calling task.
1169 tokio::spawn(fut.instrument(negotiator_span))
1170 .map(
1171 |join_result: Result<
1172 Result<Result<Client, HandshakeError>, error::Elapsed>,
1173 JoinError,
1174 >| {
1175 match join_result {
1176 Ok(Ok(Ok(connection_client))) => Ok(connection_client),
1177 Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
1178 Ok(Err(timeout_error)) => Err(timeout_error.into()),
1179 Err(join_error) => match join_error.try_into_panic() {
1180 // Forward panics to the calling task
1181 Ok(panic_reason) => panic::resume_unwind(panic_reason),
1182 Err(join_error) => Err(join_error.into()),
1183 },
1184 }
1185 },
1186 )
1187 .boxed()
1188 }
1189}
1190
1191/// Register any advertised or missing inventory in `msg` for `connected_addr`.
1192pub(crate) async fn register_inventory_status(
1193 msg: Result<Message, SerializationError>,
1194 connected_addr: ConnectedAddr,
1195 inv_collector: broadcast::Sender<InventoryChange>,
1196) -> Result<Message, SerializationError> {
1197 match (&msg, connected_addr.get_transient_addr()) {
1198 (Ok(Message::Inv(advertised)), Some(transient_addr)) => {
1199 // We ignore inventory messages with more than one
1200 // block, because they are most likely replies to a
1201 // query, rather than a newly gossiped block.
1202 //
1203 // (We process inventory messages with any number of
1204 // transactions.)
1205 //
1206 // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
1207 //
1208 // Note: zcashd has a bug where it merges queued inv messages of
1209 // the same or different types. Zebra compensates by sending `notfound`
1210 // responses to the inv collector. (#2156, #1768)
1211 //
1212 // (We can't split `inv`s, because that fills the inventory registry
1213 // with useless entries that the whole network has, making it large and slow.)
1214 match advertised.as_slice() {
1215 [advertised @ InventoryHash::Block(_)] => {
1216 debug!(
1217 ?advertised,
1218 "registering gossiped advertised block inventory for peer"
1219 );
1220
1221 // The peer set and inv collector use the peer's remote
1222 // address as an identifier
1223 // If all receivers have been dropped, `send` returns an error.
1224 // When that happens, Zebra is shutting down, so we want to ignore this error.
1225 let _ = inv_collector
1226 .send(InventoryChange::new_available(*advertised, transient_addr));
1227 }
1228 advertised => {
1229 let advertised = advertised
1230 .iter()
1231 .filter(|advertised| advertised.unmined_tx_id().is_some());
1232
1233 debug!(
1234 ?advertised,
1235 "registering advertised unmined transaction inventory for peer",
1236 );
1237
1238 if let Some(change) =
1239 InventoryChange::new_available_multi(advertised, transient_addr)
1240 {
1241 // Ignore channel errors that should only happen during shutdown.
1242 let _ = inv_collector.send(change);
1243 }
1244 }
1245 }
1246 }
1247
1248 (Ok(Message::NotFound(missing)), Some(transient_addr)) => {
1249 // Ignore Errors and the unsupported FilteredBlock type
1250 let missing = missing.iter().filter(|missing| {
1251 missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
1252 });
1253
1254 debug!(?missing, "registering missing inventory for peer");
1255
1256 if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
1257 let _ = inv_collector.send(change);
1258 }
1259 }
1260 _ => {}
1261 }
1262
1263 msg
1264}
1265
1266/// Send periodical heartbeats to `server_tx`, and update the peer status through
1267/// `heartbeat_ts_collector`.
1268///
1269/// # Correctness
1270///
1271/// To prevent hangs:
1272/// - every await that depends on the network must have a timeout (or interval)
1273/// - every error/shutdown must update the address book state and return
1274///
1275/// The address book state can be updated via `ClientRequest.tx`, or the
1276/// heartbeat_ts_collector.
1277///
1278/// Returning from this function terminates the connection's heartbeat task.
1279async fn send_periodic_heartbeats_with_shutdown_handle(
1280 connected_addr: ConnectedAddr,
1281 shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
1282 server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1283 heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1284) -> Result<(), BoxError> {
1285 use futures::future::Either;
1286
1287 let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
1288 connected_addr,
1289 server_tx,
1290 heartbeat_ts_collector.clone(),
1291 );
1292
1293 pin_mut!(shutdown_rx);
1294 pin_mut!(heartbeat_run_loop);
1295
1296 // CORRECTNESS
1297 //
1298 // Currently, select prefers the first future if multiple
1299 // futures are ready.
1300 //
1301 // Starvation is impossible here, because interval has a
1302 // slow rate, and shutdown is a oneshot. If both futures
1303 // are ready, we want the shutdown to take priority over
1304 // sending a useless heartbeat.
1305 match future::select(shutdown_rx, heartbeat_run_loop).await {
1306 Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
1307 tracing::trace!("shutting down because Client requested shut down");
1308 handle_heartbeat_shutdown(
1309 PeerError::ClientCancelledHeartbeatTask,
1310 &heartbeat_ts_collector,
1311 &connected_addr,
1312 )
1313 .await
1314 }
1315 Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
1316 tracing::trace!("shutting down because Client was dropped");
1317 handle_heartbeat_shutdown(
1318 PeerError::ClientDropped,
1319 &heartbeat_ts_collector,
1320 &connected_addr,
1321 )
1322 .await
1323 }
1324 Either::Right((result, _unused_shutdown)) => {
1325 tracing::trace!("shutting down due to heartbeat failure");
1326 // heartbeat_timeout() already send an error on the timestamp collector channel
1327
1328 result
1329 }
1330 }
1331}
1332
1333/// Send periodical heartbeats to `server_tx`, and update the peer status through
1334/// `heartbeat_ts_collector`.
1335///
1336/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
1337async fn send_periodic_heartbeats_run_loop(
1338 connected_addr: ConnectedAddr,
1339 mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1340 heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1341) -> Result<(), BoxError> {
1342 // Don't send the first heartbeat immediately - we've just completed the handshake!
1343 let mut interval = tokio::time::interval_at(
1344 Instant::now() + constants::HEARTBEAT_INTERVAL,
1345 constants::HEARTBEAT_INTERVAL,
1346 );
1347 // If the heartbeat is delayed, also delay all future heartbeats.
1348 // (Shorter heartbeat intervals just add load, without any benefit.)
1349 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1350
1351 let mut interval_stream = IntervalStream::new(interval);
1352
1353 while let Some(_instant) = interval_stream.next().await {
1354 // We've reached another heartbeat interval without
1355 // shutting down, so do a heartbeat request.
1356 let ping_sent_at = Instant::now();
1357 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1358 let _ = heartbeat_ts_collector
1359 .send(MetaAddr::new_ping_sent(book_addr, ping_sent_at.into()))
1360 .await;
1361 }
1362
1363 let heartbeat = send_one_heartbeat(&mut server_tx);
1364 let rtt = heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1365
1366 // # Security
1367 //
1368 // Peer heartbeats are rate-limited because:
1369 // - opening connections is rate-limited
1370 // - the number of connections is limited
1371 // - Zebra initiates each heartbeat using a timer
1372 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1373 if let Some(rtt) = rtt {
1374 // the collector doesn't depend on network activity,
1375 // so this await should not hang
1376 let _ = heartbeat_ts_collector
1377 .send(MetaAddr::new_responded(book_addr, Some(rtt)))
1378 .await;
1379 }
1380 }
1381 }
1382
1383 unreachable!("unexpected IntervalStream termination")
1384}
1385
1386/// Send one heartbeat using `server_tx`.
1387async fn send_one_heartbeat(
1388 server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1389) -> Result<Response, BoxError> {
1390 // We just reached a heartbeat interval, so start sending
1391 // a heartbeat.
1392 let (tx, rx) = oneshot::channel();
1393
1394 // Try to send the heartbeat request
1395 let request = Request::Ping(Nonce::default());
1396 tracing::trace!(?request, "queueing heartbeat request");
1397 match server_tx.try_send(ClientRequest {
1398 request,
1399 tx,
1400 // we're not requesting inventory, so we don't need to update the registry
1401 inv_collector: None,
1402 transient_addr: None,
1403 span: tracing::Span::current(),
1404 }) {
1405 Ok(()) => {}
1406 Err(e) => {
1407 if e.is_disconnected() {
1408 Err(PeerError::ConnectionClosed)?;
1409 } else if e.is_full() {
1410 // Send the message when the Client becomes ready.
1411 // If sending takes too long, the heartbeat timeout will elapse
1412 // and close the connection, reducing our load to busy peers.
1413 server_tx.send(e.into_inner()).await?;
1414 } else {
1415 // we need to map unexpected error types to PeerErrors
1416 warn!(?e, "unexpected try_send error");
1417 Err(e)?;
1418 };
1419 }
1420 }
1421
1422 // Flush the heartbeat request from the queue
1423 server_tx.flush().await?;
1424 tracing::trace!("sent heartbeat request");
1425
1426 // Heartbeats are checked internally to the
1427 // connection logic, but we need to wait on the
1428 // response to avoid canceling the request.
1429 let response = rx.await??;
1430 tracing::trace!(?response, "got heartbeat response");
1431
1432 Ok(response)
1433}
1434
1435/// Wrap `fut` in a timeout, handing any inner or outer errors using
1436/// `handle_heartbeat_error`.
1437async fn heartbeat_timeout(
1438 fut: impl Future<Output = Result<Response, BoxError>>,
1439 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1440 connected_addr: &ConnectedAddr,
1441) -> Result<Option<Duration>, BoxError> {
1442 let response = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1443 Ok(inner_result) => {
1444 handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
1445 }
1446 Err(elapsed) => {
1447 handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
1448 }
1449 };
1450
1451 let rtt = match response {
1452 Response::Pong(rtt) => Some(rtt),
1453 _ => None,
1454 };
1455
1456 Ok(rtt)
1457}
1458
1459/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
1460async fn handle_heartbeat_error<T, E>(
1461 result: Result<T, E>,
1462 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1463 connected_addr: &ConnectedAddr,
1464) -> Result<T, E>
1465where
1466 E: std::fmt::Debug,
1467{
1468 match result {
1469 Ok(t) => Ok(t),
1470 Err(err) => {
1471 tracing::debug!(?err, "heartbeat error, shutting down");
1472
1473 // # Security
1474 //
1475 // Peer errors and shutdowns are rate-limited because:
1476 // - opening connections is rate-limited
1477 // - the number of connections is limited
1478 // - after the first error or shutdown, the peer is disconnected
1479 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1480 let _ = address_book_updater
1481 .send(MetaAddr::new_errored(book_addr, None))
1482 .await;
1483 }
1484 Err(err)
1485 }
1486 }
1487}
1488
1489/// Mark `connected_addr` as shut down using `address_book_updater`.
1490async fn handle_heartbeat_shutdown(
1491 peer_error: PeerError,
1492 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1493 connected_addr: &ConnectedAddr,
1494) -> Result<(), BoxError> {
1495 tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
1496
1497 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1498 let _ = address_book_updater
1499 .send(MetaAddr::new_shutdown(book_addr))
1500 .await;
1501 }
1502
1503 Err(peer_error.into())
1504}