Skip to main content

zebra_network/peer_set/
initialize.rs

1//! A peer set whose size is dynamically determined by resource constraints.
2//!
3//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
4//!
5//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
6
7use std::{
8    collections::{BTreeMap, HashMap, HashSet},
9    convert::Infallible,
10    net::{IpAddr, SocketAddr},
11    pin::Pin,
12    sync::Arc,
13    time::Duration,
14};
15
16use futures::{
17    future::{self, FutureExt},
18    sink::SinkExt,
19    stream::{FuturesUnordered, StreamExt},
20    Future, TryFutureExt,
21};
22use indexmap::IndexMap;
23use rand::seq::SliceRandom;
24use tokio::{
25    net::{TcpListener, TcpStream},
26    sync::{broadcast, mpsc, watch},
27    time::{sleep, Instant},
28};
29use tokio_stream::wrappers::IntervalStream;
30use tower::{
31    buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
32};
33use tracing_futures::Instrument;
34
35use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
36
37use crate::{
38    address_book_updater::{AddressBookUpdater, MIN_CHANNEL_SIZE},
39    constants,
40    meta_addr::{MetaAddr, MetaAddrChange},
41    peer::{
42        self, address_is_valid_for_inbound_listeners, HandshakeRequest, MinimumPeerVersion,
43        OutboundConnectorRequest, PeerPreference,
44    },
45    peer_cache_updater::peer_cache_updater,
46    peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
47    AddressBook, BoxError, Config, PeerSocketAddr, Request, Response,
48};
49
50#[cfg(test)]
51mod tests;
52
53mod recent_by_ip;
54
55/// A successful outbound peer connection attempt or inbound connection handshake.
56///
57/// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections
58/// should be sent on the channel. Errors should be logged or ignored.
59///
60/// We don't allow any errors in this type, because:
61/// - The connection limits don't include failed connections
62/// - tower::Discover interprets an error as stream termination
63type DiscoveredPeer = (PeerSocketAddr, peer::Client);
64
65/// Initialize a peer set, using a network `config`, `inbound_service`,
66/// and `latest_chain_tip`.
67///
68/// The peer set abstracts away peer management to provide a
69/// [`tower::Service`] representing "the network" that load-balances requests
70/// over available peers.  The peer set automatically crawls the network to
71/// find more peer addresses and opportunistically connects to new peers.
72///
73/// Each peer connection's message handling is isolated from other
74/// connections, unlike in `zcashd`.  The peer connection first attempts to
75/// interpret inbound messages as part of a response to a previously-issued
76/// request.  Otherwise, inbound messages are interpreted as requests and sent
77/// to the supplied `inbound_service`.
78///
79/// Wrapping the `inbound_service` in [`tower::load_shed`] middleware will
80/// cause the peer set to shrink when the inbound service is unable to keep up
81/// with the volume of inbound requests.
82///
83/// Use [`NoChainTip`][1] to explicitly provide no chain tip receiver.
84///
85/// In addition to returning a service for outbound requests, this method
86/// returns a shared [`AddressBook`] updated with last-seen timestamps for
87/// connected peers. The shared address book should be accessed using a
88/// [blocking thread](https://docs.rs/tokio/1.15.0/tokio/task/index.html#blocking-and-yielding),
89/// to avoid async task deadlocks.
90///
91/// # Panics
92///
93/// If `config.config.peerset_initial_target_size` is zero.
94/// (zebra-network expects to be able to connect to at least one peer.)
95///
96/// [1]: zebra_chain::chain_tip::NoChainTip
97pub async fn init<S, C>(
98    config: Config,
99    inbound_service: S,
100    latest_chain_tip: C,
101    user_agent: String,
102) -> (
103    Buffer<BoxService<Request, Response, BoxError>, Request>,
104    Arc<std::sync::Mutex<AddressBook>>,
105    mpsc::Sender<(PeerSocketAddr, u32)>,
106)
107where
108    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
109    S::Future: Send + 'static,
110    C: ChainTip + Clone + Send + Sync + 'static,
111{
112    let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
113
114    let (
115        address_book,
116        bans_receiver,
117        address_book_updater,
118        address_metrics,
119        address_book_updater_guard,
120    ) = AddressBookUpdater::spawn(&config, listen_addr);
121
122    let (misbehavior_tx, mut misbehavior_rx) = mpsc::channel(
123        // Leave enough room for a misbehaviour update on every peer connection
124        // before the channel is drained.
125        config
126            .peerset_total_connection_limit()
127            .max(MIN_CHANNEL_SIZE),
128    );
129
130    let misbehaviour_updater = address_book_updater.clone();
131    tokio::spawn(
132        async move {
133            let mut misbehaviors: HashMap<PeerSocketAddr, u32> = HashMap::new();
134            // Batch misbehaviour updates so peers can't keep the address book mutex locked
135            // by repeatedly sending invalid blocks or transactions.
136            let mut flush_timer =
137                IntervalStream::new(tokio::time::interval(Duration::from_secs(30)));
138
139            loop {
140                tokio::select! {
141                    msg = misbehavior_rx.recv() => match msg {
142                        Some((peer_addr, score_increment)) => *misbehaviors
143                            .entry(peer_addr)
144                            .or_default()
145                            += score_increment,
146                        None => break,
147                    },
148
149                    _ = flush_timer.next() => {
150                        for (addr, score_increment) in misbehaviors.drain() {
151                            let _ = misbehaviour_updater
152                                .send(MetaAddr::new_misbehavior(addr, score_increment))
153                                .await;
154                        }
155                    },
156                };
157            }
158
159            tracing::warn!("exiting misbehavior update batch task");
160        }
161        .in_current_span(),
162    );
163
164    // Create a broadcast channel for peer inventory advertisements.
165    // If it reaches capacity, this channel drops older inventory advertisements.
166    //
167    // When Zebra is at the chain tip with an up-to-date mempool,
168    // we expect to have at most 1 new transaction per connected peer,
169    // and 1-2 new blocks across the entire network.
170    // (The block syncer and mempool crawler handle bulk fetches of blocks and transactions.)
171    let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());
172
173    // Construct services that handle inbound handshakes and perform outbound
174    // handshakes. These use the same handshake service internally to detect
175    // self-connection attempts. Both are decorated with a tower TimeoutLayer to
176    // enforce timeouts as specified in the Config.
177    let (listen_handshaker, outbound_connector) = {
178        use tower::timeout::TimeoutLayer;
179        let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
180        use crate::protocol::external::types::PeerServices;
181        let hs = peer::Handshake::builder()
182            .with_config(config.clone())
183            .with_inbound_service(inbound_service)
184            .with_inventory_collector(inv_sender)
185            .with_address_book_updater(address_book_updater.clone())
186            .with_advertised_services(PeerServices::NODE_NETWORK)
187            .with_user_agent(user_agent)
188            .with_latest_chain_tip(latest_chain_tip.clone())
189            .want_transactions(true)
190            .finish()
191            .expect("configured all required parameters");
192        (
193            hs_timeout.layer(hs.clone()),
194            hs_timeout.layer(peer::Connector::new(hs)),
195        )
196    };
197
198    // Create an mpsc channel for peer changes,
199    // based on the maximum number of inbound and outbound peers.
200    //
201    // The connection limit does not apply to errors,
202    // so they need to be handled before sending to this channel.
203    let (peerset_tx, peerset_rx) =
204        futures::channel::mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit());
205
206    let discovered_peers = peerset_rx.map(|(address, client)| {
207        Result::<_, Infallible>::Ok(Change::Insert(address, client.into()))
208    });
209
210    // Create an mpsc channel for peerset demand signaling,
211    // based on the maximum number of outbound peers.
212    let (mut demand_tx, demand_rx) =
213        futures::channel::mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());
214
215    // Create a oneshot to send background task JoinHandles to the peer set
216    let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
217
218    // Connect the rx end to a PeerSet, wrapping new peers in load instruments.
219    let peer_set = PeerSet::new(
220        &config,
221        discovered_peers,
222        demand_tx.clone(),
223        handle_rx,
224        inv_receiver,
225        bans_receiver.clone(),
226        address_metrics,
227        MinimumPeerVersion::new(latest_chain_tip, &config.network),
228        None,
229    );
230    let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
231
232    // Connect peerset_tx to the 3 peer sources:
233    //
234    // 1. Incoming peer connections, via a listener.
235    let listen_fut = accept_inbound_connections(
236        config.clone(),
237        tcp_listener,
238        constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL,
239        listen_handshaker,
240        peerset_tx.clone(),
241        bans_receiver,
242    );
243    let listen_guard = tokio::spawn(listen_fut.in_current_span());
244
245    // 2. Initial peers, specified in the config and cached on disk.
246    let initial_peers_fut = add_initial_peers(
247        config.clone(),
248        outbound_connector.clone(),
249        peerset_tx.clone(),
250        address_book_updater.clone(),
251    );
252    let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
253
254    // 3. Outgoing peers we connect to in response to load.
255    let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
256
257    // Wait for the initial seed peer count
258    let mut active_outbound_connections = initial_peers_join
259        .wait_for_panics()
260        .await
261        .expect("unexpected error connecting to initial peers");
262    let active_initial_peer_count = active_outbound_connections.update_count();
263
264    // We need to await candidates.update() here,
265    // because zcashd rate-limits `addr`/`addrv2` messages per connection,
266    // and if we only have one initial peer,
267    // we need to ensure that its `Response::Addr` is used by the crawler.
268    //
269    // TODO: this might not be needed after we added the Connection peer address cache,
270    //       try removing it in a future release?
271    info!(
272        ?active_initial_peer_count,
273        "sending initial request for peers"
274    );
275    let _ = candidates.update_initial(active_initial_peer_count).await;
276
277    // Compute remaining connections to open.
278    let demand_count = config
279        .peerset_initial_target_size
280        .saturating_sub(active_outbound_connections.update_count());
281
282    for _ in 0..demand_count {
283        let _ = demand_tx.try_send(MorePeers);
284    }
285
286    // Start the peer crawler
287    let crawl_fut = crawl_and_dial(
288        config.clone(),
289        demand_tx,
290        demand_rx,
291        candidates,
292        outbound_connector,
293        peerset_tx,
294        active_outbound_connections,
295        address_book_updater,
296    );
297    let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
298
299    // Start the peer disk cache updater
300    let peer_cache_updater_fut = peer_cache_updater(config, address_book.clone());
301    let peer_cache_updater_guard = tokio::spawn(peer_cache_updater_fut.in_current_span());
302
303    handle_tx
304        .send(vec![
305            listen_guard,
306            crawl_guard,
307            address_book_updater_guard,
308            peer_cache_updater_guard,
309        ])
310        .unwrap();
311
312    (peer_set, address_book, misbehavior_tx)
313}
314
315/// Use the provided `outbound_connector` to connect to the configured DNS seeder and
316/// disk cache initial peers, then send the resulting peer connections over `peerset_tx`.
317///
318/// Also sends every initial peer address to the `address_book_updater`.
319#[instrument(skip(config, outbound_connector, peerset_tx, address_book_updater))]
320async fn add_initial_peers<S>(
321    config: Config,
322    outbound_connector: S,
323    mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
324    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
325) -> Result<ActiveConnectionCounter, BoxError>
326where
327    S: Service<
328            OutboundConnectorRequest,
329            Response = (PeerSocketAddr, peer::Client),
330            Error = BoxError,
331        > + Clone
332        + Send
333        + 'static,
334    S::Future: Send + 'static,
335{
336    let initial_peers = limit_initial_peers(&config, address_book_updater).await;
337
338    let mut handshake_success_total: usize = 0;
339    let mut handshake_error_total: usize = 0;
340
341    let mut active_outbound_connections = ActiveConnectionCounter::new_counter_with(
342        config.peerset_outbound_connection_limit(),
343        "Outbound Connections",
344    );
345
346    // TODO: update when we add Tor peers or other kinds of addresses.
347    let ipv4_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv4()).count();
348    let ipv6_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv6()).count();
349    info!(
350        ?ipv4_peer_count,
351        ?ipv6_peer_count,
352        "connecting to initial peer set"
353    );
354
355    // # Security
356    //
357    // Resists distributed denial of service attacks by making sure that
358    // new peer connections are initiated at least `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL` apart.
359    //
360    // # Correctness
361    //
362    // Each `FuturesUnordered` can hold one `Buffer` or `Batch` reservation for
363    // an indefinite period. We can use `FuturesUnordered` without filling
364    // the underlying network buffers, because we immediately drive this
365    // single `FuturesUnordered` to completion, and handshakes have a short timeout.
366    let mut handshakes: FuturesUnordered<_> = initial_peers
367        .into_iter()
368        .enumerate()
369        .map(|(i, addr)| {
370            let connection_tracker = active_outbound_connections.track_connection();
371            let req = OutboundConnectorRequest {
372                addr,
373                connection_tracker,
374            };
375            let outbound_connector = outbound_connector.clone();
376
377            // Spawn a new task to make the outbound connection.
378            tokio::spawn(
379                async move {
380                    // Only spawn one outbound connector per
381                    // `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`,
382                    // by sleeping for the interval multiplied by the peer's index in the list.
383                    sleep(
384                        constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32),
385                    )
386                    .await;
387
388                    // As soon as we create the connector future,
389                    // the handshake starts running as a spawned task.
390                    outbound_connector
391                        .oneshot(req)
392                        .map_err(move |e| (addr, e))
393                        .await
394                }
395                .in_current_span(),
396            )
397            .wait_for_panics()
398        })
399        .collect();
400
401    while let Some(handshake_result) = handshakes.next().await {
402        match handshake_result {
403            Ok(change) => {
404                handshake_success_total += 1;
405                debug!(
406                    ?handshake_success_total,
407                    ?handshake_error_total,
408                    ?change,
409                    "an initial peer handshake succeeded"
410                );
411
412                // The connection limit makes sure this send doesn't block
413                peerset_tx.send(change).await?;
414            }
415            Err((addr, ref e)) => {
416                handshake_error_total += 1;
417
418                // this is verbose, but it's better than just hanging with no output when there are errors
419                let mut expected_error = false;
420                if let Some(io_error) = e.downcast_ref::<tokio::io::Error>() {
421                    // Some systems only have IPv4, or only have IPv6,
422                    // so these errors are not particularly interesting.
423                    if io_error.kind() == tokio::io::ErrorKind::AddrNotAvailable {
424                        expected_error = true;
425                    }
426                }
427
428                if expected_error {
429                    debug!(
430                        successes = ?handshake_success_total,
431                        errors = ?handshake_error_total,
432                        ?addr,
433                        ?e,
434                        "an initial peer connection failed"
435                    );
436                } else {
437                    info!(
438                        successes = ?handshake_success_total,
439                        errors = ?handshake_error_total,
440                        ?addr,
441                        %e,
442                        "an initial peer connection failed"
443                    );
444                }
445            }
446        }
447
448        // Security: Let other tasks run after each connection is processed.
449        //
450        // Avoids remote peers starving other Zebra tasks using initial connection successes or errors.
451        tokio::task::yield_now().await;
452    }
453
454    let outbound_connections = active_outbound_connections.update_count();
455    info!(
456        ?handshake_success_total,
457        ?handshake_error_total,
458        ?outbound_connections,
459        "finished connecting to initial seed and disk cache peers"
460    );
461
462    Ok(active_outbound_connections)
463}
464
465/// Limit the number of `initial_peers` addresses entries to the configured
466/// `peerset_initial_target_size`.
467///
468/// Returns randomly chosen entries from the provided set of addresses,
469/// in a random order.
470///
471/// Also sends every initial peer to the `address_book_updater`.
472async fn limit_initial_peers(
473    config: &Config,
474    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
475) -> HashSet<PeerSocketAddr> {
476    let all_peers: HashSet<PeerSocketAddr> = config.initial_peers().await;
477    let mut preferred_peers: BTreeMap<PeerPreference, Vec<PeerSocketAddr>> = BTreeMap::new();
478
479    let all_peers_count = all_peers.len();
480    if all_peers_count > config.peerset_initial_target_size {
481        info!(
482            "limiting the initial peers list from {} to {}",
483            all_peers_count, config.peerset_initial_target_size,
484        );
485    }
486
487    // Filter out invalid initial peers, and prioritise valid peers for initial connections.
488    // (This treats initial peers the same way we treat gossiped peers.)
489    for peer_addr in all_peers {
490        let preference = PeerPreference::new(peer_addr, config.network.clone());
491
492        match preference {
493            Ok(preference) => preferred_peers
494                .entry(preference)
495                .or_default()
496                .push(peer_addr),
497            Err(error) => info!(
498                ?peer_addr,
499                ?error,
500                "invalid initial peer from DNS seeder, configured IP address, or disk cache",
501            ),
502        }
503    }
504
505    // Send every initial peer to the address book, in preferred order.
506    // (This treats initial peers the same way we treat gossiped peers.)
507    //
508    // # Security
509    //
510    // Initial peers are limited because:
511    // - the number of initial peers is limited
512    // - this code only runs once at startup
513    for peer in preferred_peers.values().flatten() {
514        let peer_addr = MetaAddr::new_initial_peer(*peer);
515        // `send` only waits when the channel is full.
516        // The address book updater runs in its own thread, so we will only wait for a short time.
517        let _ = address_book_updater.send(peer_addr).await;
518    }
519
520    // Split out the `initial_peers` that will be shuffled and returned,
521    // choosing preferred peers first.
522    let mut initial_peers: HashSet<PeerSocketAddr> = HashSet::new();
523    for better_peers in preferred_peers.values() {
524        let mut better_peers = better_peers.clone();
525        let (chosen_peers, _unused_peers) = better_peers.partial_shuffle(
526            &mut rand::thread_rng(),
527            config.peerset_initial_target_size - initial_peers.len(),
528        );
529
530        initial_peers.extend(chosen_peers.iter());
531
532        if initial_peers.len() >= config.peerset_initial_target_size {
533            break;
534        }
535    }
536
537    initial_peers
538}
539
540/// Open a peer connection listener on `config.listen_addr`,
541/// returning the opened [`TcpListener`], and the address it is bound to.
542///
543/// If the listener is configured to use an automatically chosen port (port `0`),
544/// then the returned address will contain the actual port.
545///
546/// # Panics
547///
548/// If opening the listener fails.
549#[instrument(skip(config), fields(addr = ?config.listen_addr))]
550pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
551    // Warn if we're configured using the wrong network port.
552    if let Err(wrong_addr) =
553        address_is_valid_for_inbound_listeners(config.listen_addr, config.network.clone())
554    {
555        warn!(
556            "We are configured with address {} on {:?}, but it could cause network issues. \
557             The default port for {:?} is {}. Error: {wrong_addr:?}",
558            config.listen_addr,
559            config.network,
560            config.network,
561            config.network.default_port(),
562        );
563    }
564
565    info!(
566        "Trying to open Zcash protocol endpoint at {}...",
567        config.listen_addr
568    );
569    let listener_result = TcpListener::bind(config.listen_addr).await;
570
571    let listener = match listener_result {
572        Ok(l) => l,
573        Err(e) => panic!(
574            "Opening Zcash network protocol listener {:?} failed: {e:?}. \
575             Hint: Check if another zebrad or zcashd process is running. \
576             Try changing the network listen_addr in the Zebra config.",
577            config.listen_addr,
578        ),
579    };
580
581    let local_addr = listener
582        .local_addr()
583        .expect("unexpected missing local addr for open listener");
584    info!("Opened Zcash protocol endpoint at {}", local_addr);
585
586    (listener, local_addr)
587}
588
589/// Listens for peer connections on `addr`, then sets up each connection as a
590/// Zcash peer.
591///
592/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
593/// the [`peer::Client`] result over `peerset_tx`.
594///
595/// Limits the number of active inbound connections based on `config`,
596/// and waits `min_inbound_peer_connection_interval` between connections.
597#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
598async fn accept_inbound_connections<S>(
599    config: Config,
600    listener: TcpListener,
601    min_inbound_peer_connection_interval: Duration,
602    handshaker: S,
603    peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
604    bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
605) -> Result<(), BoxError>
606where
607    S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
608        + Clone,
609    S::Future: Send + 'static,
610{
611    let mut recent_inbound_connections =
612        recent_by_ip::RecentByIp::new(None, Some(config.max_connections_per_ip));
613
614    let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with(
615        config.peerset_inbound_connection_limit(),
616        "Inbound Connections",
617    );
618
619    let mut handshakes: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
620        FuturesUnordered::new();
621    // Keeping an unresolved future in the pool means the stream never terminates.
622    handshakes.push(future::pending().boxed());
623
624    loop {
625        // Check for panics in finished tasks, before accepting new connections
626        let inbound_result = tokio::select! {
627            biased;
628            next_handshake_res = handshakes.next() => match next_handshake_res {
629                // The task has already sent the peer change to the peer set.
630                Some(()) => continue,
631                None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
632            },
633
634            // This future must wait until new connections are available: it can't have a timeout.
635            inbound_result = listener.accept() => inbound_result,
636        };
637
638        if let Ok((tcp_stream, addr)) = inbound_result {
639            let addr: PeerSocketAddr = addr.into();
640
641            if bans_receiver.borrow().clone().contains_key(&addr.ip()) {
642                debug!(?addr, "banned inbound connection attempt");
643                std::mem::drop(tcp_stream);
644                continue;
645            }
646
647            if active_inbound_connections.update_count()
648                >= config.peerset_inbound_connection_limit()
649                || recent_inbound_connections.is_past_limit_or_add(addr.ip())
650            {
651                // Too many open inbound connections or pending handshakes already.
652                // Close the connection.
653                std::mem::drop(tcp_stream);
654                // Allow invalid connections to be cleared quickly,
655                // but still put a limit on our CPU and network usage from failed connections.
656                tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
657                continue;
658            }
659
660            // The peer already opened a connection to us.
661            // So we want to increment the connection count as soon as possible.
662            let connection_tracker = active_inbound_connections.track_connection();
663            debug!(
664                inbound_connections = ?active_inbound_connections.update_count(),
665                "handshaking on an open inbound peer connection"
666            );
667
668            let handshake_task = accept_inbound_handshake(
669                addr,
670                handshaker.clone(),
671                tcp_stream,
672                connection_tracker,
673                peerset_tx.clone(),
674            )
675            .await?
676            .wait_for_panics();
677
678            handshakes.push(handshake_task);
679
680            // Rate-limit inbound connection handshakes.
681            // But sleep longer after a successful connection,
682            // so we can clear out failed connections at a higher rate.
683            //
684            // If there is a flood of connections,
685            // this stops Zebra overloading the network with handshake data.
686            //
687            // Zebra can't control how many queued connections are waiting,
688            // but most OSes also limit the number of queued inbound connections on a listener port.
689            tokio::time::sleep(min_inbound_peer_connection_interval).await;
690        } else {
691            // Allow invalid connections to be cleared quickly,
692            // but still put a limit on our CPU and network usage from failed connections.
693            debug!(?inbound_result, "error accepting inbound connection");
694            tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
695        }
696
697        // Security: Let other tasks run after each connection is processed.
698        //
699        // Avoids remote peers starving other Zebra tasks using inbound connection successes or
700        // errors.
701        //
702        // Preventing a denial of service is important in this code, so we want to sleep *and* make
703        // the next connection after other tasks have run. (Sleeps are not guaranteed to do that.)
704        tokio::task::yield_now().await;
705    }
706}
707
708/// Set up a new inbound connection as a Zcash peer.
709///
710/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
711/// the [`peer::Client`] result over `peerset_tx`.
712//
713// TODO: when we support inbound proxies, distinguish between proxied listeners and
714//       direct listeners in the span generated by this instrument macro
715#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
716async fn accept_inbound_handshake<S>(
717    addr: PeerSocketAddr,
718    mut handshaker: S,
719    tcp_stream: TcpStream,
720    connection_tracker: ConnectionTracker,
721    peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
722) -> Result<tokio::task::JoinHandle<()>, BoxError>
723where
724    S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
725        + Clone,
726    S::Future: Send + 'static,
727{
728    let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
729
730    debug!("got incoming connection");
731
732    // # Correctness
733    //
734    // Holding the drop guard returned by Span::enter across .await points will
735    // result in incorrect traces if it yields.
736    //
737    // This await is okay because the handshaker's `poll_ready` method always returns Ready.
738    handshaker.ready().await?;
739
740    // Construct a handshake future but do not drive it yet....
741    let handshake = handshaker.call(HandshakeRequest {
742        data_stream: tcp_stream,
743        connected_addr,
744        connection_tracker,
745    });
746    // ... instead, spawn a new task to handle this connection
747    let mut peerset_tx = peerset_tx.clone();
748
749    let handshake_task = tokio::spawn(
750        async move {
751            let handshake_result = handshake.await;
752
753            if let Ok(client) = handshake_result {
754                // The connection limit makes sure this send doesn't block
755                let _ = peerset_tx.send((addr, client)).await;
756            } else {
757                debug!(?handshake_result, "error handshaking with inbound peer");
758            }
759        }
760        .in_current_span(),
761    );
762
763    Ok(handshake_task)
764}
765
766/// An action that the peer crawler can take.
767enum CrawlerAction {
768    /// Drop the demand signal because there are too many pending handshakes.
769    DemandDrop,
770    /// Initiate a handshake to the next candidate peer in response to demand.
771    ///
772    /// If there are no available candidates, crawl existing peers.
773    DemandHandshakeOrCrawl,
774    /// Crawl existing peers for more peers in response to a timer `tick`.
775    TimerCrawl { tick: Instant },
776    /// Clear a finished handshake.
777    HandshakeFinished,
778    /// Clear a finished demand crawl (DemandHandshakeOrCrawl with no peers).
779    DemandCrawlFinished,
780    /// Clear a finished TimerCrawl.
781    TimerCrawlFinished,
782}
783
784/// Given a channel `demand_rx` that signals a need for new peers, try to find
785/// and connect to new peers, and send the resulting `peer::Client`s through the
786/// `peerset_tx` channel.
787///
788/// Crawl for new peers every `config.crawl_new_peer_interval`.
789/// Also crawl whenever there is demand, but no new peers in `candidates`.
790/// After crawling, try to connect to one new peer using `outbound_connector`.
791///
792/// If a handshake fails, restore the unused demand signal by sending it to
793/// `demand_tx`.
794///
795/// The crawler terminates when `candidates.update()` or `peerset_tx` returns a
796/// permanent internal error. Transient errors and individual peer errors should
797/// be handled within the crawler.
798///
799/// Uses `active_outbound_connections` to limit the number of active outbound connections
800/// across both the initial peers and crawler. The limit is based on `config`.
801#[allow(clippy::too_many_arguments)]
802#[instrument(
803    skip(
804        config,
805        demand_tx,
806        demand_rx,
807        candidates,
808        outbound_connector,
809        peerset_tx,
810        active_outbound_connections,
811        address_book_updater,
812    ),
813    fields(
814        new_peer_interval = ?config.crawl_new_peer_interval,
815    )
816)]
817async fn crawl_and_dial<C, S>(
818    config: Config,
819    demand_tx: futures::channel::mpsc::Sender<MorePeers>,
820    mut demand_rx: futures::channel::mpsc::Receiver<MorePeers>,
821    candidates: CandidateSet<S>,
822    outbound_connector: C,
823    peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
824    mut active_outbound_connections: ActiveConnectionCounter,
825    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
826) -> Result<(), BoxError>
827where
828    C: Service<
829            OutboundConnectorRequest,
830            Response = (PeerSocketAddr, peer::Client),
831            Error = BoxError,
832        > + Clone
833        + Send
834        + 'static,
835    C::Future: Send + 'static,
836    S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
837    S::Future: Send + 'static,
838{
839    use CrawlerAction::*;
840
841    info!(
842        crawl_new_peer_interval = ?config.crawl_new_peer_interval,
843        outbound_connections = ?active_outbound_connections.update_count(),
844        "starting the peer address crawler",
845    );
846
847    // # Concurrency
848    //
849    // Allow tasks using the candidate set to be spawned, so they can run concurrently.
850    // Previously, Zebra has had deadlocks and long hangs caused by running dependent
851    // candidate set futures in the same async task.
852    let candidates = Arc::new(futures::lock::Mutex::new(candidates));
853
854    // This contains both crawl and handshake tasks.
855    let mut handshakes: FuturesUnordered<
856        Pin<Box<dyn Future<Output = Result<CrawlerAction, BoxError>> + Send>>,
857    > = FuturesUnordered::new();
858    // <FuturesUnordered as Stream> returns None when empty.
859    // Keeping an unresolved future in the pool means the stream never terminates.
860    handshakes.push(future::pending().boxed());
861
862    let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval);
863    // If the crawl is delayed, also delay all future crawls.
864    // (Shorter intervals just add load, without any benefit.)
865    crawl_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
866
867    let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick });
868
869    // # Concurrency
870    //
871    // To avoid hangs and starvation, the crawler must spawn a separate task for each crawl
872    // and handshake, so they can make progress independently (and avoid deadlocking each other).
873    loop {
874        metrics::gauge!("crawler.in_flight_handshakes").set(
875            handshakes
876                .len()
877                .checked_sub(1)
878                .expect("the pool always contains an unresolved future") as f64,
879        );
880
881        let crawler_action = tokio::select! {
882            biased;
883            // Check for completed handshakes first, because the rest of the app needs them.
884            // Pending handshakes are limited by the connection limit.
885            next_handshake_res = handshakes.next() => next_handshake_res.expect(
886                "handshakes never terminates, because it contains a future that never resolves"
887            ),
888            // The timer is rate-limited
889            next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")),
890            // Turn any new demand into an action, based on the crawler's current state.
891            //
892            // # Concurrency
893            //
894            // Demand is potentially unlimited, so it must go last in a biased select!.
895            next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{
896                if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
897                    // Too many open outbound connections or pending handshakes already
898                    DemandDrop
899                } else {
900                    DemandHandshakeOrCrawl
901                }
902            })
903        };
904
905        match crawler_action {
906            // Dummy actions
907            Ok(DemandDrop) => {
908                // This is set to trace level because when the peerset is
909                // congested it can generate a lot of demand signal very rapidly.
910                trace!("too many open connections or in-flight handshakes, dropping demand signal");
911            }
912
913            // Spawned tasks
914            Ok(DemandHandshakeOrCrawl) => {
915                let candidates = candidates.clone();
916                let outbound_connector = outbound_connector.clone();
917                let peerset_tx = peerset_tx.clone();
918                let address_book_updater = address_book_updater.clone();
919                let demand_tx = demand_tx.clone();
920
921                // Increment the connection count before we spawn the connection.
922                let outbound_connection_tracker = active_outbound_connections.track_connection();
923                let outbound_connections = active_outbound_connections.update_count();
924                debug!(?outbound_connections, "opening an outbound peer connection");
925
926                // Spawn each handshake or crawl into an independent task, so handshakes can make
927                // progress while crawls are running.
928                //
929                // # Concurrency
930                //
931                // The peer crawler must be able to make progress even if some handshakes are
932                // rate-limited. So the async mutex and next peer timeout are awaited inside the
933                // spawned task.
934                let handshake_or_crawl_handle = tokio::spawn(
935                    async move {
936                        // Try to get the next available peer for a handshake.
937                        //
938                        // candidates.next() has a short timeout, and briefly holds the address
939                        // book lock, so it shouldn't hang.
940                        //
941                        // Hold the lock for as short a time as possible.
942                        let candidate = { candidates.lock().await.next().await };
943
944                        if let Some(candidate) = candidate {
945                            // we don't need to spawn here, because there's nothing running concurrently
946                            dial(
947                                candidate,
948                                outbound_connector,
949                                outbound_connection_tracker,
950                                outbound_connections,
951                                peerset_tx,
952                                address_book_updater,
953                                demand_tx,
954                            )
955                            .await?;
956
957                            Ok(HandshakeFinished)
958                        } else {
959                            // There weren't any peers, so try to get more peers.
960                            debug!("demand for peers but no available candidates");
961
962                            crawl(candidates, demand_tx, false).await?;
963
964                            Ok(DemandCrawlFinished)
965                        }
966                    }
967                    .in_current_span(),
968                )
969                .wait_for_panics();
970
971                handshakes.push(handshake_or_crawl_handle);
972            }
973            Ok(TimerCrawl { tick }) => {
974                let candidates = candidates.clone();
975                let demand_tx = demand_tx.clone();
976                let should_always_dial = active_outbound_connections.update_count() == 0;
977
978                let crawl_handle = tokio::spawn(
979                    async move {
980                        debug!(
981                            ?tick,
982                            "crawling for more peers in response to the crawl timer"
983                        );
984
985                        crawl(candidates, demand_tx, should_always_dial).await?;
986
987                        Ok(TimerCrawlFinished)
988                    }
989                    .in_current_span(),
990                )
991                .wait_for_panics();
992
993                handshakes.push(crawl_handle);
994            }
995
996            // Completed spawned tasks
997            Ok(HandshakeFinished) => {
998                // Already logged in dial()
999            }
1000            Ok(DemandCrawlFinished) => {
1001                // This is set to trace level because when the peerset is
1002                // congested it can generate a lot of demand signal very rapidly.
1003                trace!("demand-based crawl finished");
1004            }
1005            Ok(TimerCrawlFinished) => {
1006                debug!("timer-based crawl finished");
1007            }
1008
1009            // Fatal errors and shutdowns
1010            Err(error) => {
1011                info!(?error, "crawler task exiting due to an error");
1012                return Err(error);
1013            }
1014        }
1015
1016        // Security: Let other tasks run after each crawler action is processed.
1017        //
1018        // Avoids remote peers starving other Zebra tasks using outbound connection errors.
1019        tokio::task::yield_now().await;
1020    }
1021}
1022
1023/// Try to get more peers using `candidates`, then queue a connection attempt using `demand_tx`.
1024/// If there were no new peers and `should_always_dial` is false, the connection attempt is skipped.
1025#[instrument(skip(candidates, demand_tx))]
1026async fn crawl<S>(
1027    candidates: Arc<futures::lock::Mutex<CandidateSet<S>>>,
1028    mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1029    should_always_dial: bool,
1030) -> Result<(), BoxError>
1031where
1032    S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
1033    S::Future: Send + 'static,
1034{
1035    // update() has timeouts, and briefly holds the address book
1036    // lock, so it shouldn't hang.
1037    // Try to get new peers, holding the lock for as short a time as possible.
1038    let result = {
1039        let result = candidates.lock().await.update().await;
1040        std::mem::drop(candidates);
1041        result
1042    };
1043    let more_peers = match result {
1044        Ok(more_peers) => more_peers.or_else(|| should_always_dial.then_some(MorePeers)),
1045        Err(e) => {
1046            info!(
1047                ?e,
1048                "candidate set returned an error, is Zebra shutting down?"
1049            );
1050            return Err(e);
1051        }
1052    };
1053
1054    // If we got more peers, try to connect to a new peer on our next loop.
1055    //
1056    // # Security
1057    //
1058    // Update attempts are rate-limited by the candidate set,
1059    // and we only try peers if there was actually an update.
1060    //
1061    // So if all peers have had a recent attempt, and there was recent update
1062    // with no peers, the channel will drain. This prevents useless update attempt
1063    // loops.
1064    if let Some(more_peers) = more_peers {
1065        if let Err(send_error) = demand_tx.try_send(more_peers) {
1066            if send_error.is_disconnected() {
1067                // Zebra is shutting down
1068                return Err(send_error.into());
1069            }
1070        }
1071    }
1072
1073    Ok(())
1074}
1075
1076/// Try to connect to `candidate` using `outbound_connector`.
1077/// Uses `outbound_connection_tracker` to track the active connection count.
1078///
1079/// On success, sends peers to `peerset_tx`.
1080/// On failure, marks the peer as failed in the address book,
1081/// then re-adds demand to `demand_tx`.
1082#[instrument(skip(
1083    outbound_connector,
1084    outbound_connection_tracker,
1085    outbound_connections,
1086    peerset_tx,
1087    address_book_updater,
1088    demand_tx
1089))]
1090async fn dial<C>(
1091    candidate: MetaAddr,
1092    mut outbound_connector: C,
1093    outbound_connection_tracker: ConnectionTracker,
1094    outbound_connections: usize,
1095    mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
1096    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1097    mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1098) -> Result<(), BoxError>
1099where
1100    C: Service<
1101            OutboundConnectorRequest,
1102            Response = (PeerSocketAddr, peer::Client),
1103            Error = BoxError,
1104        > + Clone
1105        + Send
1106        + 'static,
1107    C::Future: Send + 'static,
1108{
1109    // If Zebra only has a few connections, we log connection failures at info level,
1110    // so users can diagnose and fix the problem. This defines the threshold for info logs.
1111    const MAX_CONNECTIONS_FOR_INFO_LOG: usize = 5;
1112
1113    // # Correctness
1114    //
1115    // To avoid hangs, the dialer must only await:
1116    // - functions that return immediately, or
1117    // - functions that have a reasonable timeout
1118
1119    debug!(?candidate.addr, "attempting outbound connection in response to demand");
1120
1121    // the connector is always ready, so this can't hang
1122    let outbound_connector = outbound_connector.ready().await?;
1123
1124    let req = OutboundConnectorRequest {
1125        addr: candidate.addr,
1126        connection_tracker: outbound_connection_tracker,
1127    };
1128
1129    // the handshake has timeouts, so it shouldn't hang
1130    let handshake_result = outbound_connector.call(req).map(Into::into).await;
1131
1132    match handshake_result {
1133        Ok((address, client)) => {
1134            debug!(?candidate.addr, "successfully dialed new peer");
1135
1136            // The connection limit makes sure this send doesn't block.
1137            peerset_tx.send((address, client)).await?;
1138        }
1139        // The connection was never opened, or it failed the handshake and was dropped.
1140        Err(error) => {
1141            // Silence verbose info logs in production, but keep logs if the number of connections is low.
1142            // Also silence them completely in tests.
1143            if outbound_connections <= MAX_CONNECTIONS_FOR_INFO_LOG && !cfg!(test) {
1144                info!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1145            } else {
1146                debug!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1147            }
1148            report_failed(address_book_updater.clone(), candidate).await;
1149
1150            // The demand signal that was taken out of the queue to attempt to connect to the
1151            // failed candidate never turned into a connection, so add it back.
1152            //
1153            // # Security
1154            //
1155            // Handshake failures are rate-limited by peer attempt timeouts.
1156            if let Err(send_error) = demand_tx.try_send(MorePeers) {
1157                if send_error.is_disconnected() {
1158                    // Zebra is shutting down
1159                    return Err(send_error.into());
1160                }
1161            }
1162        }
1163    }
1164
1165    Ok(())
1166}
1167
1168/// Mark `addr` as a failed peer to `address_book_updater`.
1169#[instrument(skip(address_book_updater))]
1170async fn report_failed(
1171    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1172    addr: MetaAddr,
1173) {
1174    // The connection info is the same as what's already in the address book.
1175    let addr = MetaAddr::new_errored(addr.addr, None);
1176
1177    // Ignore send errors on Zebra shutdown.
1178    let _ = address_book_updater.send(addr).await;
1179}