agave_validator/
bootstrap.rs

1use {
2    agave_snapshots::{
3        paths as snapshot_paths, snapshot_archive_info::SnapshotArchiveInfoGetter as _,
4        SnapshotKind,
5    },
6    itertools::Itertools,
7    log::*,
8    rand::{seq::SliceRandom, thread_rng, Rng},
9    rayon::prelude::*,
10    solana_account::ReadableAccount,
11    solana_clock::Slot,
12    solana_commitment_config::CommitmentConfig,
13    solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
14    solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
15    solana_genesis_utils::download_then_check_genesis_hash,
16    solana_gossip::{
17        cluster_info::ClusterInfo,
18        contact_info::{ContactInfo, Protocol},
19        crds_data,
20        gossip_service::GossipService,
21        node::Node,
22    },
23    solana_hash::Hash,
24    solana_keypair::Keypair,
25    solana_metrics::datapoint_info,
26    solana_pubkey::Pubkey,
27    solana_rpc_client::rpc_client::RpcClient,
28    solana_signer::Signer,
29    solana_streamer::socket::SocketAddrSpace,
30    solana_vote_program::vote_state::VoteStateV4,
31    std::{
32        collections::{hash_map::RandomState, HashMap, HashSet},
33        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
34        path::Path,
35        process::exit,
36        sync::{
37            atomic::{AtomicBool, Ordering},
38            Arc, RwLock,
39        },
40        time::{Duration, Instant},
41    },
42    thiserror::Error,
43};
44
45/// When downloading snapshots, wait at most this long for snapshot hashes from
46/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
47/// known validator.
48const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
49/// If we don't have any alternative peers after this long, better off trying
50/// blacklisted peers again.
51const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
52/// If we can't find a good snapshot download candidate after this time, just
53/// give up.
54const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
55/// If we haven't found any RPC peers after this time, just give up.
56const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
57
58pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
59
60pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
61
62#[derive(Debug, PartialEq, Clone)]
63pub struct RpcBootstrapConfig {
64    pub no_genesis_fetch: bool,
65    pub no_snapshot_fetch: bool,
66    pub only_known_rpc: bool,
67    pub max_genesis_archive_unpacked_size: u64,
68    pub check_vote_account: Option<String>,
69    pub incremental_snapshot_fetch: bool,
70}
71
72fn verify_reachable_ports(
73    node: &Node,
74    cluster_entrypoint: &ContactInfo,
75    validator_config: &ValidatorConfig,
76    socket_addr_space: &SocketAddrSpace,
77) -> bool {
78    let verify_address = |addr: &Option<SocketAddr>| -> bool {
79        addr.as_ref()
80            .map(|addr| socket_addr_space.check(addr))
81            .unwrap_or_default()
82    };
83
84    let mut udp_sockets = vec![&node.sockets.repair];
85    udp_sockets.extend(node.sockets.gossip.iter());
86
87    if verify_address(&node.info.serve_repair(Protocol::UDP)) {
88        udp_sockets.push(&node.sockets.serve_repair);
89    }
90    if verify_address(&node.info.tpu(Protocol::UDP)) {
91        udp_sockets.extend(node.sockets.tpu.iter());
92        udp_sockets.extend(&node.sockets.tpu_quic);
93    }
94    if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
95        udp_sockets.extend(node.sockets.tpu_forwards.iter());
96        udp_sockets.extend(&node.sockets.tpu_forwards_quic);
97    }
98    if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
99        udp_sockets.extend(node.sockets.tpu_vote.iter());
100    }
101    if verify_address(&node.info.tvu(Protocol::UDP)) {
102        udp_sockets.extend(node.sockets.tvu.iter());
103        udp_sockets.extend(node.sockets.broadcast.iter());
104        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
105    }
106    if !solana_net_utils::verify_all_reachable_udp(
107        &cluster_entrypoint.gossip().unwrap(),
108        &udp_sockets,
109    ) {
110        return false;
111    }
112
113    let mut tcp_listeners = vec![];
114    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
115        for (purpose, bind_addr, public_addr) in &[
116            ("RPC", rpc_addr, node.info.rpc()),
117            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
118        ] {
119            if verify_address(public_addr) {
120                tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
121                    error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
122                    exit(1);
123                }));
124            }
125        }
126    }
127
128    if let Some(ip_echo) = &node.sockets.ip_echo {
129        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
130        tcp_listeners.push(ip_echo);
131    }
132
133    solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
134}
135
136fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
137    if let Some(known_validators) = known_validators {
138        known_validators.contains(id)
139    } else {
140        false
141    }
142}
143
144fn start_gossip_node(
145    identity_keypair: Arc<Keypair>,
146    cluster_entrypoints: &[ContactInfo],
147    ledger_path: &Path,
148    gossip_addr: &SocketAddr,
149    gossip_sockets: Arc<[UdpSocket]>,
150    expected_shred_version: u16,
151    gossip_validators: Option<HashSet<Pubkey>>,
152    should_check_duplicate_instance: bool,
153    socket_addr_space: SocketAddrSpace,
154) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
155    let contact_info = ClusterInfo::gossip_contact_info(
156        identity_keypair.pubkey(),
157        *gossip_addr,
158        expected_shred_version,
159    );
160    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
161    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
162    cluster_info.restore_contact_info(ledger_path, 0);
163    let cluster_info = Arc::new(cluster_info);
164
165    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
166    let gossip_service = GossipService::new(
167        &cluster_info,
168        None,
169        gossip_sockets,
170        gossip_validators,
171        should_check_duplicate_instance,
172        None,
173        gossip_exit_flag.clone(),
174    );
175    (cluster_info, gossip_exit_flag, gossip_service)
176}
177
178fn get_rpc_peers(
179    cluster_info: &ClusterInfo,
180    validator_config: &ValidatorConfig,
181    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
182    blacklist_timeout: &Instant,
183    retry_reason: &mut Option<String>,
184    bootstrap_config: &RpcBootstrapConfig,
185) -> Vec<ContactInfo> {
186    let shred_version = validator_config
187        .expected_shred_version
188        .unwrap_or_else(|| cluster_info.my_shred_version());
189
190    info!(
191        "Searching for an RPC service with shred version {shred_version}{}...",
192        retry_reason
193            .as_ref()
194            .map(|s| format!(" (Retrying: {s})"))
195            .unwrap_or_default()
196    );
197
198    let mut rpc_peers = cluster_info.rpc_peers();
199    if bootstrap_config.only_known_rpc {
200        rpc_peers.retain(|rpc_peer| {
201            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
202        });
203    }
204
205    let rpc_peers_total = rpc_peers.len();
206
207    // Filter out blacklisted nodes
208    let rpc_peers: Vec<_> = rpc_peers
209        .into_iter()
210        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
211        .collect();
212    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
213    let rpc_known_peers = rpc_peers
214        .iter()
215        .filter(|rpc_peer| {
216            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
217        })
218        .count();
219
220    info!(
221        "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
222         {rpc_peers_blacklisted} blacklisted"
223    );
224
225    if rpc_peers_blacklisted == rpc_peers_total {
226        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
227            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
228        {
229            // All nodes are blacklisted and no additional nodes recently discovered.
230            // Remove all nodes from the blacklist and try them again.
231            blacklisted_rpc_nodes.clear();
232            Some("Blacklist timeout expired".to_owned())
233        } else {
234            Some("Wait for known rpc peers".to_owned())
235        };
236        return vec![];
237    }
238    rpc_peers
239}
240
241fn check_vote_account(
242    rpc_client: &RpcClient,
243    identity_pubkey: &Pubkey,
244    vote_account_address: &Pubkey,
245    authorized_voter_pubkeys: &[Pubkey],
246) -> Result<(), String> {
247    let vote_account = rpc_client
248        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
249        .map_err(|err| format!("failed to fetch vote account: {err}"))?
250        .value
251        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
252
253    if vote_account.owner != solana_vote_program::id() {
254        return Err(format!(
255            "not a vote account (owned by {}): {}",
256            vote_account.owner, vote_account_address
257        ));
258    }
259
260    let identity_account = rpc_client
261        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
262        .map_err(|err| format!("failed to fetch identity account: {err}"))?
263        .value
264        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
265
266    let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
267    if let Some(vote_state) = vote_state {
268        if vote_state.authorized_voters.is_empty() {
269            return Err("Vote account not yet initialized".to_string());
270        }
271
272        if vote_state.node_pubkey != *identity_pubkey {
273            return Err(format!(
274                "vote account's identity ({}) does not match the validator's identity {}).",
275                vote_state.node_pubkey, identity_pubkey
276            ));
277        }
278
279        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
280            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
281                return Err(format!(
282                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
283                ));
284            }
285        }
286    } else {
287        return Err(format!(
288            "invalid vote account data for {vote_account_address}"
289        ));
290    }
291
292    // Maybe we can calculate minimum voting fee; rather than 1 lamport
293    if identity_account.lamports <= 1 {
294        return Err(format!(
295            "underfunded identity account ({}): only {} lamports available",
296            identity_pubkey, identity_account.lamports
297        ));
298    }
299
300    Ok(())
301}
302
303#[derive(Error, Debug)]
304pub enum GetRpcNodeError {
305    #[error("Unable to find any RPC peers")]
306    NoRpcPeersFound,
307
308    #[error("Giving up, did not get newer snapshots from the cluster")]
309    NoNewerSnapshots,
310}
311
312/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
313/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
314/// snapshots to download.
315#[derive(Debug)]
316struct GetRpcNodeResult {
317    rpc_contact_info: ContactInfo,
318    snapshot_hash: Option<SnapshotHash>,
319}
320
321/// Struct to wrap the peers & snapshot hashes together.
322#[derive(Debug, PartialEq, Eq, Clone)]
323struct PeerSnapshotHash {
324    rpc_contact_info: ContactInfo,
325    snapshot_hash: SnapshotHash,
326}
327
328/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
329/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
330#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
331pub struct SnapshotHash {
332    full: (Slot, Hash),
333    incr: Option<(Slot, Hash)>,
334}
335
336pub fn fail_rpc_node(
337    err: String,
338    known_validators: &Option<HashSet<Pubkey, RandomState>>,
339    rpc_id: &Pubkey,
340    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
341) {
342    warn!("{err}");
343    if let Some(ref known_validators) = known_validators {
344        if known_validators.contains(rpc_id) {
345            return;
346        }
347    }
348
349    info!("Excluding {rpc_id} as a future RPC candidate");
350    blacklisted_rpc_nodes.insert(*rpc_id);
351}
352
353fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
354    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
355    cluster_info.save_contact_info();
356    gossip_exit_flag.store(true, Ordering::Relaxed);
357    gossip_service.join().unwrap();
358}
359
360#[allow(clippy::too_many_arguments)]
361pub fn attempt_download_genesis_and_snapshot(
362    rpc_contact_info: &ContactInfo,
363    ledger_path: &Path,
364    validator_config: &mut ValidatorConfig,
365    bootstrap_config: &RpcBootstrapConfig,
366    use_progress_bar: bool,
367    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
368    rpc_client: &RpcClient,
369    maximum_local_snapshot_age: Slot,
370    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
371    minimal_snapshot_download_speed: f32,
372    maximum_snapshot_download_abort: u64,
373    download_abort_count: &mut u64,
374    snapshot_hash: Option<SnapshotHash>,
375    identity_keypair: &Arc<Keypair>,
376    vote_account: &Pubkey,
377    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
378) -> Result<(), String> {
379    download_then_check_genesis_hash(
380        &rpc_contact_info
381            .rpc()
382            .ok_or_else(|| String::from("Invalid RPC address"))?,
383        ledger_path,
384        &mut validator_config.expected_genesis_hash,
385        bootstrap_config.max_genesis_archive_unpacked_size,
386        bootstrap_config.no_genesis_fetch,
387        use_progress_bar,
388        rpc_client,
389    )?;
390
391    if let Some(gossip) = gossip.take() {
392        shutdown_gossip_service(gossip);
393    }
394
395    let rpc_client_slot = rpc_client
396        .get_slot_with_commitment(CommitmentConfig::finalized())
397        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
398    info!("RPC node root slot: {rpc_client_slot}");
399
400    download_snapshots(
401        validator_config,
402        bootstrap_config,
403        use_progress_bar,
404        maximum_local_snapshot_age,
405        start_progress,
406        minimal_snapshot_download_speed,
407        maximum_snapshot_download_abort,
408        download_abort_count,
409        snapshot_hash,
410        rpc_contact_info,
411    )?;
412
413    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
414        let rpc_client = RpcClient::new(url);
415        check_vote_account(
416            &rpc_client,
417            &identity_keypair.pubkey(),
418            vote_account,
419            &authorized_voter_keypairs
420                .read()
421                .unwrap()
422                .iter()
423                .map(|k| k.pubkey())
424                .collect::<Vec<_>>(),
425        )
426        .unwrap_or_else(|err| {
427            // Consider failures here to be more likely due to user error (eg,
428            // incorrect `agave-validator` command-line arguments) rather than the
429            // RPC node failing.
430            //
431            // Power users can always use the `--no-check-vote-account` option to
432            // bypass this check entirely
433            error!("{err}");
434            exit(1);
435        });
436    }
437    Ok(())
438}
439
440/// simple ping helper function which returns the time to connect
441fn ping(addr: &SocketAddr) -> Option<Duration> {
442    let start = Instant::now();
443    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
444        Ok(_) => Some(start.elapsed()),
445        Err(_) => None,
446    }
447}
448
449// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
450// used for downloading latest snapshots and/or the genesis block. Guaranteed to
451// find at least one viable node or terminate the process.
452fn get_vetted_rpc_nodes(
453    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
454    cluster_info: &Arc<ClusterInfo>,
455    validator_config: &ValidatorConfig,
456    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
457    bootstrap_config: &RpcBootstrapConfig,
458) {
459    while vetted_rpc_nodes.is_empty() {
460        let rpc_node_details = match get_rpc_nodes(
461            cluster_info,
462            validator_config,
463            blacklisted_rpc_nodes,
464            bootstrap_config,
465        ) {
466            Ok(rpc_node_details) => rpc_node_details,
467            Err(err) => {
468                error!(
469                    "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
470                     `--no-port-check`, or adjusting `--known-validator ...` arguments as \
471                     applicable"
472                );
473                exit(1);
474            }
475        };
476
477        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
478        vetted_rpc_nodes.extend(
479            rpc_node_details
480                .into_par_iter()
481                .filter_map(|rpc_node_details| {
482                    let GetRpcNodeResult {
483                        rpc_contact_info,
484                        snapshot_hash,
485                    } = rpc_node_details;
486
487                    info!(
488                        "Using RPC service from node {}: {:?}",
489                        rpc_contact_info.pubkey(),
490                        rpc_contact_info.rpc()
491                    );
492
493                    let rpc_addr = rpc_contact_info.rpc()?;
494                    let ping_time = ping(&rpc_addr);
495
496                    let rpc_client =
497                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
498
499                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
500                })
501                .filter(
502                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
503                        .get_version()
504                    {
505                        Ok(rpc_version) => {
506                            if let Some(ping_time) = ping_time {
507                                info!(
508                                    "RPC node version: {} Ping: {}ms",
509                                    rpc_version.solana_core,
510                                    ping_time.as_millis()
511                                );
512                                true
513                            } else {
514                                fail_rpc_node(
515                                    "Failed to ping RPC".to_string(),
516                                    &validator_config.known_validators,
517                                    rpc_contact_info.pubkey(),
518                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
519                                );
520                                false
521                            }
522                        }
523                        Err(err) => {
524                            fail_rpc_node(
525                                format!("Failed to get RPC node version: {err}"),
526                                &validator_config.known_validators,
527                                rpc_contact_info.pubkey(),
528                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
529                            );
530                            false
531                        }
532                    },
533                )
534                .collect::<Vec<(
535                    ContactInfo,
536                    Option<SnapshotHash>,
537                    RpcClient,
538                    Option<Duration>,
539                )>>()
540                .into_iter()
541                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
542                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
543                    (rpc_contact_info, snapshot_hash, rpc_client)
544                })
545                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
546        );
547        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
548    }
549}
550
551#[allow(clippy::too_many_arguments)]
552pub fn rpc_bootstrap(
553    node: &Node,
554    identity_keypair: &Arc<Keypair>,
555    ledger_path: &Path,
556    vote_account: &Pubkey,
557    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
558    cluster_entrypoints: &[ContactInfo],
559    validator_config: &mut ValidatorConfig,
560    bootstrap_config: RpcBootstrapConfig,
561    do_port_check: bool,
562    use_progress_bar: bool,
563    maximum_local_snapshot_age: Slot,
564    should_check_duplicate_instance: bool,
565    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
566    minimal_snapshot_download_speed: f32,
567    maximum_snapshot_download_abort: u64,
568    socket_addr_space: SocketAddrSpace,
569) {
570    if do_port_check {
571        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
572        order.shuffle(&mut thread_rng());
573        if order.into_iter().all(|i| {
574            !verify_reachable_ports(
575                node,
576                &cluster_entrypoints[i],
577                validator_config,
578                &socket_addr_space,
579            )
580        }) {
581            exit(1);
582        }
583    }
584
585    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
586        return;
587    }
588
589    let total_snapshot_download_time = Instant::now();
590    let mut get_rpc_nodes_time = Duration::new(0, 0);
591    let mut snapshot_download_time = Duration::new(0, 0);
592    let mut blacklisted_rpc_nodes = HashSet::new();
593    let mut gossip = None;
594    let mut vetted_rpc_nodes = vec![];
595    let mut download_abort_count = 0;
596    loop {
597        if gossip.is_none() {
598            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
599
600            gossip = Some(start_gossip_node(
601                identity_keypair.clone(),
602                cluster_entrypoints,
603                ledger_path,
604                &node
605                    .info
606                    .gossip()
607                    .expect("Operator must spin up node with valid gossip address"),
608                node.sockets.gossip.clone(),
609                validator_config
610                    .expected_shred_version
611                    .expect("expected_shred_version should not be None"),
612                validator_config.gossip_validators.clone(),
613                should_check_duplicate_instance,
614                socket_addr_space,
615            ));
616        }
617
618        let get_rpc_nodes_start = Instant::now();
619        get_vetted_rpc_nodes(
620            &mut vetted_rpc_nodes,
621            &gossip.as_ref().unwrap().0,
622            validator_config,
623            &mut blacklisted_rpc_nodes,
624            &bootstrap_config,
625        );
626        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
627        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
628
629        let snapshot_download_start = Instant::now();
630        let download_result = attempt_download_genesis_and_snapshot(
631            &rpc_contact_info,
632            ledger_path,
633            validator_config,
634            &bootstrap_config,
635            use_progress_bar,
636            &mut gossip,
637            &rpc_client,
638            maximum_local_snapshot_age,
639            start_progress,
640            minimal_snapshot_download_speed,
641            maximum_snapshot_download_abort,
642            &mut download_abort_count,
643            snapshot_hash,
644            identity_keypair,
645            vote_account,
646            authorized_voter_keypairs.clone(),
647        );
648        snapshot_download_time += snapshot_download_start.elapsed();
649        match download_result {
650            Ok(()) => break,
651            Err(err) => {
652                fail_rpc_node(
653                    err,
654                    &validator_config.known_validators,
655                    rpc_contact_info.pubkey(),
656                    &mut blacklisted_rpc_nodes,
657                );
658            }
659        }
660    }
661
662    if let Some(gossip) = gossip.take() {
663        shutdown_gossip_service(gossip);
664    }
665
666    datapoint_info!(
667        "bootstrap-snapshot-download",
668        (
669            "total_time_secs",
670            total_snapshot_download_time.elapsed().as_secs(),
671            i64
672        ),
673        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
674        (
675            "snapshot_download_time_secs",
676            snapshot_download_time.as_secs(),
677            i64
678        ),
679        ("download_abort_count", download_abort_count, i64),
680        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
681    );
682}
683
684/// Get RPC peer node candidates to download from.
685///
686/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
687fn get_rpc_nodes(
688    cluster_info: &ClusterInfo,
689    validator_config: &ValidatorConfig,
690    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
691    bootstrap_config: &RpcBootstrapConfig,
692) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
693    let mut blacklist_timeout = Instant::now();
694    let mut get_rpc_peers_timout = Instant::now();
695    let mut newer_cluster_snapshot_timeout = None;
696    let mut retry_reason = None;
697    loop {
698        // Give gossip some time to populate and not spin on grabbing the crds lock
699        std::thread::sleep(Duration::from_secs(1));
700        info!("\n{}", cluster_info.rpc_info_trace());
701
702        let rpc_peers = get_rpc_peers(
703            cluster_info,
704            validator_config,
705            blacklisted_rpc_nodes,
706            &blacklist_timeout,
707            &mut retry_reason,
708            bootstrap_config,
709        );
710        if rpc_peers.is_empty() {
711            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
712                return Err(GetRpcNodeError::NoRpcPeersFound);
713            }
714            continue;
715        }
716
717        // Reset timeouts if we found any viable RPC peers.
718        blacklist_timeout = Instant::now();
719        get_rpc_peers_timout = Instant::now();
720        if bootstrap_config.no_snapshot_fetch {
721            let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
722            return Ok(vec![GetRpcNodeResult {
723                rpc_contact_info: random_peer.clone(),
724                snapshot_hash: None,
725            }]);
726        }
727
728        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
729            .as_ref()
730            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
731            .unwrap_or(true)
732        {
733            KnownValidatorsToWaitFor::All
734        } else {
735            KnownValidatorsToWaitFor::Any
736        };
737        let peer_snapshot_hashes = get_peer_snapshot_hashes(
738            cluster_info,
739            &rpc_peers,
740            validator_config.known_validators.as_ref(),
741            known_validators_to_wait_for,
742            bootstrap_config.incremental_snapshot_fetch,
743        );
744        if peer_snapshot_hashes.is_empty() {
745            match newer_cluster_snapshot_timeout {
746                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
747                Some(newer_cluster_snapshot_timeout) => {
748                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
749                        return Err(GetRpcNodeError::NoNewerSnapshots);
750                    }
751                }
752            }
753            retry_reason = Some("No snapshots available".to_owned());
754            continue;
755        } else {
756            let rpc_peers = peer_snapshot_hashes
757                .iter()
758                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
759                .collect::<Vec<_>>();
760            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
761            info!(
762                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
763                final_snapshot_hash
764                    .incr
765                    .map(|(slot, _hash)| slot)
766                    .unwrap_or(final_snapshot_hash.full.0),
767                rpc_peers.len(),
768                if rpc_peers.len() > 1 { "s" } else { "" },
769                rpc_peers,
770            );
771            let rpc_node_results = peer_snapshot_hashes
772                .iter()
773                .map(|peer_snapshot_hash| GetRpcNodeResult {
774                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
775                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
776                })
777                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
778                .collect();
779            return Ok(rpc_node_results);
780        }
781    }
782}
783
784/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
785/// snapshot or an incremental snapshot.
786fn get_highest_local_snapshot_hash(
787    full_snapshot_archives_dir: impl AsRef<Path>,
788    incremental_snapshot_archives_dir: impl AsRef<Path>,
789    incremental_snapshot_fetch: bool,
790) -> Option<(Slot, Hash)> {
791    snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
792        .and_then(|full_snapshot_info| {
793            if incremental_snapshot_fetch {
794                snapshot_paths::get_highest_incremental_snapshot_archive_info(
795                    incremental_snapshot_archives_dir,
796                    full_snapshot_info.slot(),
797                )
798                .map(|incremental_snapshot_info| {
799                    (
800                        incremental_snapshot_info.slot(),
801                        *incremental_snapshot_info.hash(),
802                    )
803                })
804            } else {
805                None
806            }
807            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
808        })
809        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
810}
811
812/// Get peer snapshot hashes
813///
814/// The result is a vector of peers with snapshot hashes that:
815/// 1. match a snapshot hash from the known validators
816/// 2. have the highest incremental snapshot slot
817/// 3. have the highest full snapshot slot of (2)
818fn get_peer_snapshot_hashes(
819    cluster_info: &ClusterInfo,
820    rpc_peers: &[ContactInfo],
821    known_validators: Option<&HashSet<Pubkey>>,
822    known_validators_to_wait_for: KnownValidatorsToWaitFor,
823    incremental_snapshot_fetch: bool,
824) -> Vec<PeerSnapshotHash> {
825    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
826    if let Some(known_validators) = known_validators {
827        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
828            cluster_info,
829            known_validators,
830            known_validators_to_wait_for,
831        );
832        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
833            &known_snapshot_hashes,
834            &mut peer_snapshot_hashes,
835        );
836    }
837    if incremental_snapshot_fetch {
838        // Only filter by highest incremental snapshot slot if we're actually going to download an
839        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
840        // being selected.  For example, if there are two peer snapshot hashes:
841        // (A) full snapshot slot: 100, incremental snapshot slot: 160
842        // (B) full snapshot slot: 150, incremental snapshot slot: None
843        // Then (A) has the highest overall snapshot slot.  But if we're not downloading and
844        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
845        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
846            &mut peer_snapshot_hashes,
847        );
848    }
849    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
850
851    peer_snapshot_hashes
852}
853
854/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
855/// is treated as the base for its set of incremental snapshot hashes.
856type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
857
858/// Get the snapshot hashes from known validators.
859///
860/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
861/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
862/// queried for their individual snapshot hashes, their results will be checked against this
863/// map to verify correctness.
864///
865/// NOTE: Only a single snashot hash is allowed per slot.  If somehow two known validators have
866/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
867/// This applies to both full and incremental snapshot hashes.
868fn get_snapshot_hashes_from_known_validators(
869    cluster_info: &ClusterInfo,
870    known_validators: &HashSet<Pubkey>,
871    known_validators_to_wait_for: KnownValidatorsToWaitFor,
872) -> KnownSnapshotHashes {
873    // Get the snapshot hashes for a node from CRDS
874    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
875
876    if !do_known_validators_have_all_snapshot_hashes(
877        known_validators,
878        known_validators_to_wait_for,
879        get_snapshot_hashes_for_node,
880    ) {
881        debug!(
882            "Snapshot hashes have not been discovered from known validators. This likely means \
883             the gossip tables are not fully populated. We will sleep and retry..."
884        );
885        return KnownSnapshotHashes::default();
886    }
887
888    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
889}
890
891/// Check if we can discover snapshot hashes for the known validators.
892///
893/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
894/// process will download snapshots.
895///
896/// This function will return false if we do not yet have snapshot hashes from known validators;
897/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
898/// based on the `KnownValidatorsToWaitFor` parameter.
899fn do_known_validators_have_all_snapshot_hashes<'a>(
900    known_validators: impl IntoIterator<Item = &'a Pubkey>,
901    known_validators_to_wait_for: KnownValidatorsToWaitFor,
902    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
903) -> bool {
904    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
905
906    match known_validators_to_wait_for {
907        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
908        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
909    }
910}
911
912/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
913/// of them?
914#[derive(Debug, Copy, Clone, Eq, PartialEq)]
915enum KnownValidatorsToWaitFor {
916    All,
917    Any,
918}
919
920/// Build the known snapshot hashes from a set of nodes.
921///
922/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
923/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
924/// runtime information such as the ClusterInfo or ValidatorConfig.
925fn build_known_snapshot_hashes<'a>(
926    nodes: impl IntoIterator<Item = &'a Pubkey>,
927    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
928) -> KnownSnapshotHashes {
929    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
930
931    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
932    /// but *different* hash as the needle.
933    fn is_any_same_slot_and_different_hash<'a>(
934        needle: &(Slot, Hash),
935        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
936    ) -> bool {
937        haystack
938            .into_iter()
939            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
940    }
941
942    'to_next_node: for node in nodes {
943        let Some(SnapshotHash {
944            full: full_snapshot_hash,
945            incr: incremental_snapshot_hash,
946        }) = get_snapshot_hashes_for_node(node)
947        else {
948            continue 'to_next_node;
949        };
950
951        // Do not add this snapshot hash if there's already a full snapshot hash with the
952        // same slot but with a _different_ hash.
953        // NOTE: Nodes should not produce snapshots at the same slot with _different_
954        // hashes.  So if it happens, keep the first and ignore the rest.
955        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
956            warn!(
957                "Ignoring all snapshot hashes from node {node} since we've seen a different full \
958                 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
959            );
960            debug!(
961                "known full snapshot hashes: {:#?}",
962                known_snapshot_hashes.keys(),
963            );
964            continue 'to_next_node;
965        }
966
967        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
968        // doesn't already exist.  This is to ensure we don't overwrite existing
969        // incremental snapshot hashes that may be present for this full snapshot hash.
970        let known_incremental_snapshot_hashes =
971            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
972
973        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
974            // Do not add this snapshot hash if there's already an incremental snapshot
975            // hash with the same slot, but with a _different_ hash.
976            // NOTE: Nodes should not produce snapshots at the same slot with _different_
977            // hashes.  So if it happens, keep the first and ignore the rest.
978            if is_any_same_slot_and_different_hash(
979                &incremental_snapshot_hash,
980                known_incremental_snapshot_hashes.iter(),
981            ) {
982                warn!(
983                    "Ignoring incremental snapshot hash from node {node} since we've seen a \
984                     different incremental snapshot hash with this slot. full snapshot hash: \
985                     {full_snapshot_hash:?}, incremental snapshot hash: \
986                     {incremental_snapshot_hash:?}"
987                );
988                debug!(
989                    "known incremental snapshot hashes based on this slot: {:#?}",
990                    known_incremental_snapshot_hashes.iter(),
991                );
992                continue 'to_next_node;
993            }
994
995            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
996        };
997    }
998
999    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1000    known_snapshot_hashes
1001}
1002
1003/// Get snapshot hashes from all eligible peers.
1004///
1005/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1006/// This may be just a full snapshot hash, or a combo full snapshot hash and
1007/// incremental snapshot hash.
1008fn get_eligible_peer_snapshot_hashes(
1009    cluster_info: &ClusterInfo,
1010    rpc_peers: &[ContactInfo],
1011) -> Vec<PeerSnapshotHash> {
1012    let peer_snapshot_hashes = rpc_peers
1013        .iter()
1014        .flat_map(|rpc_peer| {
1015            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1016                PeerSnapshotHash {
1017                    rpc_contact_info: rpc_peer.clone(),
1018                    snapshot_hash,
1019                }
1020            })
1021        })
1022        .collect();
1023
1024    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1025    peer_snapshot_hashes
1026}
1027
1028/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1029fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1030    known_snapshot_hashes: &KnownSnapshotHashes,
1031    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1032) {
1033    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1034        known_snapshot_hashes
1035            .get(&peer_snapshot_hash.snapshot_hash.full)
1036            .map(|known_incremental_hashes| {
1037                if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1038                    // If the peer's full snapshot hashes match, but doesn't have any
1039                    // incremental snapshots, that's fine; keep 'em!
1040                    true
1041                } else {
1042                    known_incremental_hashes
1043                        .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1044                }
1045            })
1046            .unwrap_or(false)
1047    });
1048
1049    trace!(
1050        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1051    );
1052}
1053
1054/// Retain the peer snapshot hashes with the highest full snapshot slot
1055fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1056    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1057) {
1058    let highest_full_snapshot_hash = peer_snapshot_hashes
1059        .iter()
1060        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1061        .max_by_key(|(slot, _hash)| *slot);
1062    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1063        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1064        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1065        // will be nothing to compare against within the `retain()` predicate).
1066        return;
1067    };
1068
1069    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1070        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1071    });
1072
1073    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1074}
1075
1076/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1077fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1078    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1079) {
1080    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1081        .iter()
1082        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1083        .max_by_key(|(slot, _hash)| *slot);
1084
1085    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1086        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1087    });
1088
1089    trace!(
1090        "retain peer snapshot hashes with highest incremental snapshot slot: \
1091         {peer_snapshot_hashes:?}"
1092    );
1093}
1094
1095/// Check to see if we can use our local snapshots, otherwise download newer ones.
1096#[allow(clippy::too_many_arguments)]
1097fn download_snapshots(
1098    validator_config: &ValidatorConfig,
1099    bootstrap_config: &RpcBootstrapConfig,
1100    use_progress_bar: bool,
1101    maximum_local_snapshot_age: Slot,
1102    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1103    minimal_snapshot_download_speed: f32,
1104    maximum_snapshot_download_abort: u64,
1105    download_abort_count: &mut u64,
1106    snapshot_hash: Option<SnapshotHash>,
1107    rpc_contact_info: &ContactInfo,
1108) -> Result<(), String> {
1109    if snapshot_hash.is_none() {
1110        return Ok(());
1111    }
1112    let SnapshotHash {
1113        full: full_snapshot_hash,
1114        incr: incremental_snapshot_hash,
1115    } = snapshot_hash.unwrap();
1116    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1117    let incremental_snapshot_archives_dir = &validator_config
1118        .snapshot_config
1119        .incremental_snapshot_archives_dir;
1120
1121    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1122    if should_use_local_snapshot(
1123        full_snapshot_archives_dir,
1124        incremental_snapshot_archives_dir,
1125        maximum_local_snapshot_age,
1126        full_snapshot_hash,
1127        incremental_snapshot_hash,
1128        bootstrap_config.incremental_snapshot_fetch,
1129    ) {
1130        return Ok(());
1131    }
1132
1133    // Check and see if we've already got the full snapshot; if not, download it
1134    if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
1135        .into_iter()
1136        .any(|snapshot_archive| {
1137            snapshot_archive.slot() == full_snapshot_hash.0
1138                && snapshot_archive.hash().0 == full_snapshot_hash.1
1139        })
1140    {
1141        info!(
1142            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1143            full_snapshot_hash.0, full_snapshot_hash.1
1144        );
1145    } else {
1146        download_snapshot(
1147            validator_config,
1148            bootstrap_config,
1149            use_progress_bar,
1150            start_progress,
1151            minimal_snapshot_download_speed,
1152            maximum_snapshot_download_abort,
1153            download_abort_count,
1154            rpc_contact_info,
1155            full_snapshot_hash,
1156            SnapshotKind::FullSnapshot,
1157        )?;
1158    }
1159
1160    if bootstrap_config.incremental_snapshot_fetch {
1161        // Check and see if we've already got the incremental snapshot; if not, download it
1162        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1163            if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1164                .into_iter()
1165                .any(|snapshot_archive| {
1166                    snapshot_archive.slot() == incremental_snapshot_hash.0
1167                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1168                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1169                })
1170            {
1171                info!(
1172                    "Incremental snapshot archive already exists locally. Skipping download. \
1173                     slot: {}, hash: {}",
1174                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1175                );
1176            } else {
1177                download_snapshot(
1178                    validator_config,
1179                    bootstrap_config,
1180                    use_progress_bar,
1181                    start_progress,
1182                    minimal_snapshot_download_speed,
1183                    maximum_snapshot_download_abort,
1184                    download_abort_count,
1185                    rpc_contact_info,
1186                    incremental_snapshot_hash,
1187                    SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1188                )?;
1189            }
1190        }
1191    }
1192
1193    Ok(())
1194}
1195
1196/// Download a snapshot
1197#[allow(clippy::too_many_arguments)]
1198fn download_snapshot(
1199    validator_config: &ValidatorConfig,
1200    bootstrap_config: &RpcBootstrapConfig,
1201    use_progress_bar: bool,
1202    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1203    minimal_snapshot_download_speed: f32,
1204    maximum_snapshot_download_abort: u64,
1205    download_abort_count: &mut u64,
1206    rpc_contact_info: &ContactInfo,
1207    desired_snapshot_hash: (Slot, Hash),
1208    snapshot_kind: SnapshotKind,
1209) -> Result<(), String> {
1210    let maximum_full_snapshot_archives_to_retain = validator_config
1211        .snapshot_config
1212        .maximum_full_snapshot_archives_to_retain;
1213    let maximum_incremental_snapshot_archives_to_retain = validator_config
1214        .snapshot_config
1215        .maximum_incremental_snapshot_archives_to_retain;
1216    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1217    let incremental_snapshot_archives_dir = &validator_config
1218        .snapshot_config
1219        .incremental_snapshot_archives_dir;
1220
1221    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1222        slot: desired_snapshot_hash.0,
1223        rpc_addr: rpc_contact_info
1224            .rpc()
1225            .ok_or_else(|| String::from("Invalid RPC address"))?,
1226    };
1227    let desired_snapshot_hash = (
1228        desired_snapshot_hash.0,
1229        agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1230    );
1231    download_snapshot_archive(
1232        &rpc_contact_info
1233            .rpc()
1234            .ok_or_else(|| String::from("Invalid RPC address"))?,
1235        full_snapshot_archives_dir,
1236        incremental_snapshot_archives_dir,
1237        desired_snapshot_hash,
1238        snapshot_kind,
1239        maximum_full_snapshot_archives_to_retain,
1240        maximum_incremental_snapshot_archives_to_retain,
1241        use_progress_bar,
1242        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1243            debug!("Download progress: {download_progress:?}");
1244            if download_progress.last_throughput < minimal_snapshot_download_speed
1245                && download_progress.notification_count <= 1
1246                && download_progress.percentage_done <= 2_f32
1247                && download_progress.estimated_remaining_time > 60_f32
1248                && *download_abort_count < maximum_snapshot_download_abort
1249            {
1250                if let Some(ref known_validators) = validator_config.known_validators {
1251                    if known_validators.contains(rpc_contact_info.pubkey())
1252                        && known_validators.len() == 1
1253                        && bootstrap_config.only_known_rpc
1254                    {
1255                        warn!(
1256                            "The snapshot download is too slow, throughput: {} < min speed {} \
1257                             bytes/sec, but will NOT abort and try a different node as it is the \
1258                             only known validator and the --only-known-rpc flag is set. Abort \
1259                             count: {}, Progress detail: {:?}",
1260                            download_progress.last_throughput,
1261                            minimal_snapshot_download_speed,
1262                            download_abort_count,
1263                            download_progress,
1264                        );
1265                        return true; // Do not abort download from the one-and-only known validator
1266                    }
1267                }
1268                warn!(
1269                    "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1270                     will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1271                    download_progress.last_throughput,
1272                    minimal_snapshot_download_speed,
1273                    download_abort_count,
1274                    download_progress,
1275                );
1276                *download_abort_count += 1;
1277                false
1278            } else {
1279                true
1280            }
1281        })),
1282    )
1283}
1284
1285/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1286/// will be downloaded.
1287fn should_use_local_snapshot(
1288    full_snapshot_archives_dir: &Path,
1289    incremental_snapshot_archives_dir: &Path,
1290    maximum_local_snapshot_age: Slot,
1291    full_snapshot_hash: (Slot, Hash),
1292    incremental_snapshot_hash: Option<(Slot, Hash)>,
1293    incremental_snapshot_fetch: bool,
1294) -> bool {
1295    let cluster_snapshot_slot = incremental_snapshot_hash
1296        .map(|(slot, _)| slot)
1297        .unwrap_or(full_snapshot_hash.0);
1298
1299    match get_highest_local_snapshot_hash(
1300        full_snapshot_archives_dir,
1301        incremental_snapshot_archives_dir,
1302        incremental_snapshot_fetch,
1303    ) {
1304        None => {
1305            info!(
1306                "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1307                 local snapshot."
1308            );
1309            false
1310        }
1311        Some((local_snapshot_slot, _)) => {
1312            if local_snapshot_slot
1313                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1314            {
1315                info!(
1316                    "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1317                     a snapshot for slot {cluster_snapshot_slot}."
1318                );
1319                true
1320            } else {
1321                info!(
1322                    "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1323                     newer snapshot for slot {cluster_snapshot_slot}."
1324                );
1325                false
1326            }
1327        }
1328    }
1329}
1330
1331/// Get the node's highest snapshot hashes from CRDS
1332fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1333    cluster_info.get_snapshot_hashes_for_node(node).map(
1334        |crds_data::SnapshotHashes {
1335             full, incremental, ..
1336         }| {
1337            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1338            SnapshotHash {
1339                full,
1340                incr: highest_incremental_snapshot_hash,
1341            }
1342        },
1343    )
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348    use super::*;
1349
1350    impl PeerSnapshotHash {
1351        fn new(
1352            rpc_contact_info: ContactInfo,
1353            full_snapshot_hash: (Slot, Hash),
1354            incremental_snapshot_hash: Option<(Slot, Hash)>,
1355        ) -> Self {
1356            Self {
1357                rpc_contact_info,
1358                snapshot_hash: SnapshotHash {
1359                    full: full_snapshot_hash,
1360                    incr: incremental_snapshot_hash,
1361                },
1362            }
1363        }
1364    }
1365
1366    fn default_contact_info_for_tests() -> ContactInfo {
1367        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1368    }
1369
1370    #[test]
1371    fn test_build_known_snapshot_hashes() {
1372        agave_logger::setup();
1373        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1374        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1375
1376        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1377        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1378
1379        // simulate a set of known validators with various snapshot hashes
1380        let oracle = {
1381            let mut oracle = HashMap::new();
1382
1383            for (full, incr) in [
1384                // only a full snapshot
1385                (full_snapshot_hash1, None),
1386                // full and incremental snapshots
1387                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1388                // full and incremental snapshots, with different incremental hash
1389                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1390                // ...and now with different full hashes
1391                (full_snapshot_hash2, None),
1392                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1393                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1394            ] {
1395                // also simulate multiple known validators having the same snapshot hashes
1396                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1397                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1398                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1399            }
1400
1401            // no snapshots at all
1402            oracle.insert(Pubkey::new_unique(), None);
1403            oracle.insert(Pubkey::new_unique(), None);
1404            oracle.insert(Pubkey::new_unique(), None);
1405
1406            oracle
1407        };
1408
1409        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1410
1411        let known_snapshot_hashes =
1412            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1413
1414        // ensure there's only one full snapshot hash, since they all used the same slot and there
1415        // can be only one snapshot hash per slot
1416        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1417        assert_eq!(known_full_snapshot_hashes.len(), 1);
1418        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1419
1420        // and for the same reasons, ensure there is only one incremental snapshot hash
1421        let known_incremental_snapshot_hashes =
1422            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1423        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1424        let known_incremental_snapshot_hash =
1425            known_incremental_snapshot_hashes.iter().next().unwrap();
1426
1427        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1428        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1429        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1430        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1431        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1432        // "first" when building the known snapshot hashes.
1433        assert!(
1434            known_full_snapshot_hash == &full_snapshot_hash1
1435                || known_full_snapshot_hash == &full_snapshot_hash2
1436        );
1437        assert!(
1438            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1439                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1440        );
1441    }
1442
1443    #[test]
1444    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1445        let known_snapshot_hashes: KnownSnapshotHashes = [
1446            (
1447                (200_000, Hash::new_unique()),
1448                [
1449                    (200_200, Hash::new_unique()),
1450                    (200_400, Hash::new_unique()),
1451                    (200_600, Hash::new_unique()),
1452                    (200_800, Hash::new_unique()),
1453                ]
1454                .iter()
1455                .cloned()
1456                .collect(),
1457            ),
1458            (
1459                (300_000, Hash::new_unique()),
1460                [
1461                    (300_200, Hash::new_unique()),
1462                    (300_400, Hash::new_unique()),
1463                    (300_600, Hash::new_unique()),
1464                ]
1465                .iter()
1466                .cloned()
1467                .collect(),
1468            ),
1469        ]
1470        .iter()
1471        .cloned()
1472        .collect();
1473
1474        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1475        let known_full_snapshot_hash = known_snapshot_hash.0;
1476        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1477
1478        let contact_info = default_contact_info_for_tests();
1479        let peer_snapshot_hashes = vec![
1480            // bad full snapshot hash, no incremental snapshot hash
1481            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1482            // bad everything
1483            PeerSnapshotHash::new(
1484                contact_info.clone(),
1485                (111_000, Hash::default()),
1486                Some((111_111, Hash::default())),
1487            ),
1488            // good full snapshot hash, no incremental snapshot hash
1489            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1490            // bad full snapshot hash, good (not possible) incremental snapshot hash
1491            PeerSnapshotHash::new(
1492                contact_info.clone(),
1493                (111_000, Hash::default()),
1494                Some(*known_incremental_snapshot_hash),
1495            ),
1496            // good full snapshot hash, bad incremental snapshot hash
1497            PeerSnapshotHash::new(
1498                contact_info.clone(),
1499                *known_full_snapshot_hash,
1500                Some((111_111, Hash::default())),
1501            ),
1502            // good everything
1503            PeerSnapshotHash::new(
1504                contact_info.clone(),
1505                *known_full_snapshot_hash,
1506                Some(*known_incremental_snapshot_hash),
1507            ),
1508        ];
1509
1510        let expected = vec![
1511            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1512            PeerSnapshotHash::new(
1513                contact_info,
1514                *known_full_snapshot_hash,
1515                Some(*known_incremental_snapshot_hash),
1516            ),
1517        ];
1518        let mut actual = peer_snapshot_hashes;
1519        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1520            &known_snapshot_hashes,
1521            &mut actual,
1522        );
1523        assert_eq!(expected, actual);
1524    }
1525
1526    #[test]
1527    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1528        let contact_info = default_contact_info_for_tests();
1529        let peer_snapshot_hashes = vec![
1530            // old
1531            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1532            PeerSnapshotHash::new(
1533                contact_info.clone(),
1534                (100_000, Hash::default()),
1535                Some((100_100, Hash::default())),
1536            ),
1537            PeerSnapshotHash::new(
1538                contact_info.clone(),
1539                (100_000, Hash::default()),
1540                Some((100_200, Hash::default())),
1541            ),
1542            PeerSnapshotHash::new(
1543                contact_info.clone(),
1544                (100_000, Hash::default()),
1545                Some((100_300, Hash::default())),
1546            ),
1547            // new
1548            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1549            PeerSnapshotHash::new(
1550                contact_info.clone(),
1551                (200_000, Hash::default()),
1552                Some((200_100, Hash::default())),
1553            ),
1554            PeerSnapshotHash::new(
1555                contact_info.clone(),
1556                (200_000, Hash::default()),
1557                Some((200_200, Hash::default())),
1558            ),
1559            PeerSnapshotHash::new(
1560                contact_info.clone(),
1561                (200_000, Hash::default()),
1562                Some((200_300, Hash::default())),
1563            ),
1564        ];
1565
1566        let expected = vec![
1567            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1568            PeerSnapshotHash::new(
1569                contact_info.clone(),
1570                (200_000, Hash::default()),
1571                Some((200_100, Hash::default())),
1572            ),
1573            PeerSnapshotHash::new(
1574                contact_info.clone(),
1575                (200_000, Hash::default()),
1576                Some((200_200, Hash::default())),
1577            ),
1578            PeerSnapshotHash::new(
1579                contact_info,
1580                (200_000, Hash::default()),
1581                Some((200_300, Hash::default())),
1582            ),
1583        ];
1584        let mut actual = peer_snapshot_hashes;
1585        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1586        assert_eq!(expected, actual);
1587    }
1588
1589    #[test]
1590    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1591        let contact_info = default_contact_info_for_tests();
1592        let peer_snapshot_hashes = vec![
1593            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1594            PeerSnapshotHash::new(
1595                contact_info.clone(),
1596                (200_000, Hash::default()),
1597                Some((200_100, Hash::default())),
1598            ),
1599            PeerSnapshotHash::new(
1600                contact_info.clone(),
1601                (200_000, Hash::default()),
1602                Some((200_200, Hash::default())),
1603            ),
1604            PeerSnapshotHash::new(
1605                contact_info.clone(),
1606                (200_000, Hash::default()),
1607                Some((200_300, Hash::default())),
1608            ),
1609            PeerSnapshotHash::new(
1610                contact_info.clone(),
1611                (200_000, Hash::default()),
1612                Some((200_010, Hash::default())),
1613            ),
1614            PeerSnapshotHash::new(
1615                contact_info.clone(),
1616                (200_000, Hash::default()),
1617                Some((200_020, Hash::default())),
1618            ),
1619            PeerSnapshotHash::new(
1620                contact_info.clone(),
1621                (200_000, Hash::default()),
1622                Some((200_030, Hash::default())),
1623            ),
1624        ];
1625
1626        let expected = vec![PeerSnapshotHash::new(
1627            contact_info,
1628            (200_000, Hash::default()),
1629            Some((200_300, Hash::default())),
1630        )];
1631        let mut actual = peer_snapshot_hashes;
1632        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1633        assert_eq!(expected, actual);
1634    }
1635
1636    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1637    /// there are *zero* peers with incremental snapshots.
1638    #[test]
1639    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1640        let contact_info = default_contact_info_for_tests();
1641        let peer_snapshot_hashes = vec![
1642            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1643            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1644            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1645        ];
1646
1647        let expected = peer_snapshot_hashes.clone();
1648        let mut actual = peer_snapshot_hashes;
1649        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1650        assert_eq!(expected, actual);
1651    }
1652
1653    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1654    /// peer snapshot hashes input is empty.
1655    #[test]
1656    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1657        {
1658            let mut actual = vec![];
1659            let expected = actual.clone();
1660            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1661            assert_eq!(expected, actual);
1662        }
1663        {
1664            let mut actual = vec![];
1665            let expected = actual.clone();
1666            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1667            assert_eq!(expected, actual);
1668        }
1669    }
1670}