solana_validator/
bootstrap.rs

1use {
2    itertools::Itertools,
3    log::*,
4    rand::{seq::SliceRandom, thread_rng, Rng},
5    rayon::prelude::*,
6    solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
7    solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
8    solana_genesis_utils::download_then_check_genesis_hash,
9    solana_gossip::{
10        cluster_info::{ClusterInfo, Node},
11        contact_info::Protocol,
12        crds_value,
13        gossip_service::GossipService,
14        legacy_contact_info::LegacyContactInfo as ContactInfo,
15    },
16    solana_metrics::datapoint_info,
17    solana_rpc_client::rpc_client::RpcClient,
18    solana_runtime::{
19        snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
20        snapshot_utils,
21    },
22    solana_sdk::{
23        clock::Slot,
24        commitment_config::CommitmentConfig,
25        hash::Hash,
26        pubkey::Pubkey,
27        signature::{Keypair, Signer},
28    },
29    solana_streamer::socket::SocketAddrSpace,
30    std::{
31        collections::{hash_map::RandomState, HashMap, HashSet},
32        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
33        path::Path,
34        process::exit,
35        sync::{
36            atomic::{AtomicBool, Ordering},
37            Arc, RwLock,
38        },
39        time::{Duration, Instant},
40    },
41    thiserror::Error,
42};
43
44/// When downloading snapshots, wait at most this long for snapshot hashes from
45/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
46/// known validator.
47const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
48/// If we don't have any alternative peers after this long, better off trying
49/// blacklisted peers again.
50const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
51/// If we can't find a good snapshot download candidate after this time, just
52/// give up.
53const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
54/// If we haven't found any RPC peers after this time, just give up.
55const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
56
57pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
58
59pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
60
61#[derive(Debug)]
62pub struct RpcBootstrapConfig {
63    pub no_genesis_fetch: bool,
64    pub no_snapshot_fetch: bool,
65    pub only_known_rpc: bool,
66    pub max_genesis_archive_unpacked_size: u64,
67    pub check_vote_account: Option<String>,
68    pub incremental_snapshot_fetch: bool,
69}
70
71fn verify_reachable_ports(
72    node: &Node,
73    cluster_entrypoint: &ContactInfo,
74    validator_config: &ValidatorConfig,
75    socket_addr_space: &SocketAddrSpace,
76) -> bool {
77    let verify_address = |addr: &Option<SocketAddr>| -> bool {
78        addr.as_ref()
79            .map(|addr| socket_addr_space.check(addr))
80            .unwrap_or_default()
81    };
82    let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
83
84    if verify_address(&node.info.serve_repair(Protocol::UDP).ok()) {
85        udp_sockets.push(&node.sockets.serve_repair);
86    }
87    if verify_address(&node.info.tpu(Protocol::UDP).ok()) {
88        udp_sockets.extend(node.sockets.tpu.iter());
89        udp_sockets.push(&node.sockets.tpu_quic);
90    }
91    if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) {
92        udp_sockets.extend(node.sockets.tpu_forwards.iter());
93        udp_sockets.push(&node.sockets.tpu_forwards_quic);
94    }
95    if verify_address(&node.info.tpu_vote().ok()) {
96        udp_sockets.extend(node.sockets.tpu_vote.iter());
97    }
98    if verify_address(&node.info.tvu(Protocol::UDP).ok()) {
99        udp_sockets.extend(node.sockets.tvu.iter());
100        udp_sockets.extend(node.sockets.broadcast.iter());
101        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
102    }
103
104    let mut tcp_listeners = vec![];
105    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
106        for (purpose, bind_addr, public_addr) in &[
107            ("RPC", rpc_addr, node.info.rpc()),
108            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
109        ] {
110            if verify_address(&public_addr.as_ref().ok().copied()) {
111                tcp_listeners.push((
112                    bind_addr.port(),
113                    TcpListener::bind(bind_addr).unwrap_or_else(|err| {
114                        error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
115                        exit(1);
116                    }),
117                ));
118            }
119        }
120    }
121
122    if let Some(ip_echo) = &node.sockets.ip_echo {
123        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
124        tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo));
125    }
126
127    solana_net_utils::verify_reachable_ports(
128        &cluster_entrypoint.gossip().unwrap(),
129        tcp_listeners,
130        &udp_sockets,
131    )
132}
133
134fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
135    if let Some(known_validators) = known_validators {
136        known_validators.contains(id)
137    } else {
138        false
139    }
140}
141
142fn start_gossip_node(
143    identity_keypair: Arc<Keypair>,
144    cluster_entrypoints: &[ContactInfo],
145    ledger_path: &Path,
146    gossip_addr: &SocketAddr,
147    gossip_socket: UdpSocket,
148    expected_shred_version: Option<u16>,
149    gossip_validators: Option<HashSet<Pubkey>>,
150    should_check_duplicate_instance: bool,
151    socket_addr_space: SocketAddrSpace,
152) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
153    let contact_info = ClusterInfo::gossip_contact_info(
154        identity_keypair.pubkey(),
155        *gossip_addr,
156        expected_shred_version.unwrap_or(0),
157    );
158    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
159    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
160    cluster_info.restore_contact_info(ledger_path, 0);
161    let cluster_info = Arc::new(cluster_info);
162
163    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
164    let gossip_service = GossipService::new(
165        &cluster_info,
166        None,
167        gossip_socket,
168        gossip_validators,
169        should_check_duplicate_instance,
170        None,
171        gossip_exit_flag.clone(),
172    );
173    (cluster_info, gossip_exit_flag, gossip_service)
174}
175
176fn get_rpc_peers(
177    cluster_info: &ClusterInfo,
178    cluster_entrypoints: &[ContactInfo],
179    validator_config: &ValidatorConfig,
180    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
181    blacklist_timeout: &Instant,
182    retry_reason: &mut Option<String>,
183    bootstrap_config: &RpcBootstrapConfig,
184) -> Vec<ContactInfo> {
185    let shred_version = validator_config
186        .expected_shred_version
187        .unwrap_or_else(|| cluster_info.my_shred_version());
188    if shred_version == 0 {
189        let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
190            cluster_entrypoint
191                .gossip()
192                .ok()
193                .and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr))
194                .map_or(false, |entrypoint| entrypoint.shred_version() == 0)
195        });
196
197        if all_zero_shred_versions {
198            eprintln!("Entrypoint shred version is zero.  Restart with --expected-shred-version");
199            exit(1);
200        }
201        info!("Waiting to adopt entrypoint shred version...");
202        return vec![];
203    }
204
205    info!(
206        "Searching for an RPC service with shred version {shred_version}{}...",
207        retry_reason
208            .as_ref()
209            .map(|s| format!(" (Retrying: {s})"))
210            .unwrap_or_default()
211    );
212
213    let mut rpc_peers = cluster_info
214        .all_rpc_peers()
215        .into_iter()
216        .filter(|contact_info| contact_info.shred_version() == shred_version)
217        .collect::<Vec<_>>();
218
219    if bootstrap_config.only_known_rpc {
220        rpc_peers.retain(|rpc_peer| {
221            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
222        });
223    }
224
225    let rpc_peers_total = rpc_peers.len();
226
227    // Filter out blacklisted nodes
228    let rpc_peers: Vec<_> = rpc_peers
229        .into_iter()
230        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
231        .collect();
232    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
233    let rpc_known_peers = rpc_peers
234        .iter()
235        .filter(|rpc_peer| {
236            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
237        })
238        .count();
239
240    info!("Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, {rpc_peers_blacklisted} blacklisted");
241
242    if rpc_peers_blacklisted == rpc_peers_total {
243        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
244            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
245        {
246            // All nodes are blacklisted and no additional nodes recently discovered.
247            // Remove all nodes from the blacklist and try them again.
248            blacklisted_rpc_nodes.clear();
249            Some("Blacklist timeout expired".to_owned())
250        } else {
251            Some("Wait for known rpc peers".to_owned())
252        };
253        return vec![];
254    }
255    rpc_peers
256}
257
258fn check_vote_account(
259    rpc_client: &RpcClient,
260    identity_pubkey: &Pubkey,
261    vote_account_address: &Pubkey,
262    authorized_voter_pubkeys: &[Pubkey],
263) -> Result<(), String> {
264    let vote_account = rpc_client
265        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
266        .map_err(|err| format!("failed to fetch vote account: {err}"))?
267        .value
268        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
269
270    if vote_account.owner != solana_vote_program::id() {
271        return Err(format!(
272            "not a vote account (owned by {}): {}",
273            vote_account.owner, vote_account_address
274        ));
275    }
276
277    let identity_account = rpc_client
278        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
279        .map_err(|err| format!("failed to fetch identity account: {err}"))?
280        .value
281        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
282
283    let vote_state = solana_vote_program::vote_state::from(&vote_account);
284    if let Some(vote_state) = vote_state {
285        if vote_state.authorized_voters().is_empty() {
286            return Err("Vote account not yet initialized".to_string());
287        }
288
289        if vote_state.node_pubkey != *identity_pubkey {
290            return Err(format!(
291                "vote account's identity ({}) does not match the validator's identity {}).",
292                vote_state.node_pubkey, identity_pubkey
293            ));
294        }
295
296        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
297            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
298                return Err(format!(
299                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
300                ));
301            }
302        }
303    } else {
304        return Err(format!(
305            "invalid vote account data for {vote_account_address}"
306        ));
307    }
308
309    // Maybe we can calculate minimum voting fee; rather than 1 lamport
310    if identity_account.lamports <= 1 {
311        return Err(format!(
312            "underfunded identity account ({}): only {} lamports available",
313            identity_pubkey, identity_account.lamports
314        ));
315    }
316
317    Ok(())
318}
319
320#[derive(Error, Debug)]
321pub enum GetRpcNodeError {
322    #[error("Unable to find any RPC peers")]
323    NoRpcPeersFound,
324
325    #[error("Giving up, did not get newer snapshots from the cluster")]
326    NoNewerSnapshots,
327}
328
329/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
330/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
331/// snapshots to download.
332#[derive(Debug)]
333struct GetRpcNodeResult {
334    rpc_contact_info: ContactInfo,
335    snapshot_hash: Option<SnapshotHash>,
336}
337
338/// Struct to wrap the peers & snapshot hashes together.
339#[derive(Debug, PartialEq, Eq, Clone)]
340struct PeerSnapshotHash {
341    rpc_contact_info: ContactInfo,
342    snapshot_hash: SnapshotHash,
343}
344
345/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
346/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
347#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
348pub struct SnapshotHash {
349    full: (Slot, Hash),
350    incr: Option<(Slot, Hash)>,
351}
352
353pub fn fail_rpc_node(
354    err: String,
355    known_validators: &Option<HashSet<Pubkey, RandomState>>,
356    rpc_id: &Pubkey,
357    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
358) {
359    warn!("{err}");
360    if let Some(ref known_validators) = known_validators {
361        if known_validators.contains(rpc_id) {
362            return;
363        }
364    }
365
366    info!("Excluding {rpc_id} as a future RPC candidate");
367    blacklisted_rpc_nodes.insert(*rpc_id);
368}
369
370fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
371    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
372    cluster_info.save_contact_info();
373    gossip_exit_flag.store(true, Ordering::Relaxed);
374    gossip_service.join().unwrap();
375}
376
377#[allow(clippy::too_many_arguments)]
378pub fn attempt_download_genesis_and_snapshot(
379    rpc_contact_info: &ContactInfo,
380    ledger_path: &Path,
381    validator_config: &mut ValidatorConfig,
382    bootstrap_config: &RpcBootstrapConfig,
383    use_progress_bar: bool,
384    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
385    rpc_client: &RpcClient,
386    full_snapshot_archives_dir: &Path,
387    incremental_snapshot_archives_dir: &Path,
388    maximum_local_snapshot_age: Slot,
389    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
390    minimal_snapshot_download_speed: f32,
391    maximum_snapshot_download_abort: u64,
392    download_abort_count: &mut u64,
393    snapshot_hash: Option<SnapshotHash>,
394    identity_keypair: &Arc<Keypair>,
395    vote_account: &Pubkey,
396    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
397) -> Result<(), String> {
398    download_then_check_genesis_hash(
399        &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
400        ledger_path,
401        &mut validator_config.expected_genesis_hash,
402        bootstrap_config.max_genesis_archive_unpacked_size,
403        bootstrap_config.no_genesis_fetch,
404        use_progress_bar,
405        rpc_client,
406    )?;
407
408    if let Some(gossip) = gossip.take() {
409        shutdown_gossip_service(gossip);
410    }
411
412    let rpc_client_slot = rpc_client
413        .get_slot_with_commitment(CommitmentConfig::finalized())
414        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
415    info!("RPC node root slot: {rpc_client_slot}");
416
417    download_snapshots(
418        full_snapshot_archives_dir,
419        incremental_snapshot_archives_dir,
420        validator_config,
421        bootstrap_config,
422        use_progress_bar,
423        maximum_local_snapshot_age,
424        start_progress,
425        minimal_snapshot_download_speed,
426        maximum_snapshot_download_abort,
427        download_abort_count,
428        snapshot_hash,
429        rpc_contact_info,
430    )?;
431
432    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
433        let rpc_client = RpcClient::new(url);
434        check_vote_account(
435            &rpc_client,
436            &identity_keypair.pubkey(),
437            vote_account,
438            &authorized_voter_keypairs
439                .read()
440                .unwrap()
441                .iter()
442                .map(|k| k.pubkey())
443                .collect::<Vec<_>>(),
444        )
445        .unwrap_or_else(|err| {
446            // Consider failures here to be more likely due to user error (eg,
447            // incorrect `solana-validator` command-line arguments) rather than the
448            // RPC node failing.
449            //
450            // Power users can always use the `--no-check-vote-account` option to
451            // bypass this check entirely
452            error!("{err}");
453            exit(1);
454        });
455    }
456    Ok(())
457}
458
459/// simple ping helper function which returns the time to connect
460fn ping(addr: &SocketAddr) -> Option<Duration> {
461    let start = Instant::now();
462    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
463        Ok(_) => Some(start.elapsed()),
464        Err(_) => None,
465    }
466}
467
468// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
469// used for downloading latest snapshots and/or the genesis block. Guaranteed to
470// find at least one viable node or terminate the process.
471fn get_vetted_rpc_nodes(
472    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
473    cluster_info: &Arc<ClusterInfo>,
474    cluster_entrypoints: &[ContactInfo],
475    validator_config: &ValidatorConfig,
476    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
477    bootstrap_config: &RpcBootstrapConfig,
478) {
479    while vetted_rpc_nodes.is_empty() {
480        let rpc_node_details = match get_rpc_nodes(
481            cluster_info,
482            cluster_entrypoints,
483            validator_config,
484            blacklisted_rpc_nodes,
485            bootstrap_config,
486        ) {
487            Ok(rpc_node_details) => rpc_node_details,
488            Err(err) => {
489                error!(
490                    "Failed to get RPC nodes: {err}. Consider checking system \
491                    clock, removing `--no-port-check`, or adjusting \
492                    `--known-validator ...` arguments as applicable"
493                );
494                exit(1);
495            }
496        };
497
498        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
499        vetted_rpc_nodes.extend(
500            rpc_node_details
501                .into_par_iter()
502                .filter_map(|rpc_node_details| {
503                    let GetRpcNodeResult {
504                        rpc_contact_info,
505                        snapshot_hash,
506                    } = rpc_node_details;
507
508                    info!(
509                        "Using RPC service from node {}: {:?}",
510                        rpc_contact_info.pubkey(),
511                        rpc_contact_info.rpc()
512                    );
513
514                    let rpc_addr = rpc_contact_info.rpc().ok()?;
515                    let ping_time = ping(&rpc_addr);
516
517                    let rpc_client =
518                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
519
520                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
521                })
522                .filter(
523                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
524                        .get_version()
525                    {
526                        Ok(rpc_version) => {
527                            if let Some(ping_time) = ping_time {
528                                info!(
529                                    "RPC node version: {} Ping: {}ms",
530                                    rpc_version.solana_core,
531                                    ping_time.as_millis()
532                                );
533                                true
534                            } else {
535                                fail_rpc_node(
536                                    "Failed to ping RPC".to_string(),
537                                    &validator_config.known_validators,
538                                    rpc_contact_info.pubkey(),
539                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
540                                );
541                                false
542                            }
543                        }
544                        Err(err) => {
545                            fail_rpc_node(
546                                format!("Failed to get RPC node version: {err}"),
547                                &validator_config.known_validators,
548                                rpc_contact_info.pubkey(),
549                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
550                            );
551                            false
552                        }
553                    },
554                )
555                .collect::<Vec<(
556                    ContactInfo,
557                    Option<SnapshotHash>,
558                    RpcClient,
559                    Option<Duration>,
560                )>>()
561                .into_iter()
562                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
563                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
564                    (rpc_contact_info, snapshot_hash, rpc_client)
565                })
566                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
567        );
568        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
569    }
570}
571
572#[allow(clippy::too_many_arguments)]
573pub fn rpc_bootstrap(
574    node: &Node,
575    identity_keypair: &Arc<Keypair>,
576    ledger_path: &Path,
577    full_snapshot_archives_dir: &Path,
578    incremental_snapshot_archives_dir: &Path,
579    vote_account: &Pubkey,
580    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
581    cluster_entrypoints: &[ContactInfo],
582    validator_config: &mut ValidatorConfig,
583    bootstrap_config: RpcBootstrapConfig,
584    do_port_check: bool,
585    use_progress_bar: bool,
586    maximum_local_snapshot_age: Slot,
587    should_check_duplicate_instance: bool,
588    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
589    minimal_snapshot_download_speed: f32,
590    maximum_snapshot_download_abort: u64,
591    socket_addr_space: SocketAddrSpace,
592) {
593    if do_port_check {
594        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
595        order.shuffle(&mut thread_rng());
596        if order.into_iter().all(|i| {
597            !verify_reachable_ports(
598                node,
599                &cluster_entrypoints[i],
600                validator_config,
601                &socket_addr_space,
602            )
603        }) {
604            exit(1);
605        }
606    }
607
608    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
609        return;
610    }
611
612    let total_snapshot_download_time = Instant::now();
613    let mut get_rpc_nodes_time = Duration::new(0, 0);
614    let mut snapshot_download_time = Duration::new(0, 0);
615    let mut blacklisted_rpc_nodes = HashSet::new();
616    let mut gossip = None;
617    let mut vetted_rpc_nodes = vec![];
618    let mut download_abort_count = 0;
619    loop {
620        if gossip.is_none() {
621            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
622
623            gossip = Some(start_gossip_node(
624                identity_keypair.clone(),
625                cluster_entrypoints,
626                ledger_path,
627                &node
628                    .info
629                    .gossip()
630                    .expect("Operator must spin up node with valid gossip address"),
631                node.sockets.gossip.try_clone().unwrap(),
632                validator_config.expected_shred_version,
633                validator_config.gossip_validators.clone(),
634                should_check_duplicate_instance,
635                socket_addr_space,
636            ));
637        }
638
639        let get_rpc_nodes_start = Instant::now();
640        get_vetted_rpc_nodes(
641            &mut vetted_rpc_nodes,
642            &gossip.as_ref().unwrap().0,
643            cluster_entrypoints,
644            validator_config,
645            &mut blacklisted_rpc_nodes,
646            &bootstrap_config,
647        );
648        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
649        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
650
651        let snapshot_download_start = Instant::now();
652        let download_result = attempt_download_genesis_and_snapshot(
653            &rpc_contact_info,
654            ledger_path,
655            validator_config,
656            &bootstrap_config,
657            use_progress_bar,
658            &mut gossip,
659            &rpc_client,
660            full_snapshot_archives_dir,
661            incremental_snapshot_archives_dir,
662            maximum_local_snapshot_age,
663            start_progress,
664            minimal_snapshot_download_speed,
665            maximum_snapshot_download_abort,
666            &mut download_abort_count,
667            snapshot_hash,
668            identity_keypair,
669            vote_account,
670            authorized_voter_keypairs.clone(),
671        );
672        snapshot_download_time += snapshot_download_start.elapsed();
673        match download_result {
674            Ok(()) => break,
675            Err(err) => {
676                fail_rpc_node(
677                    err,
678                    &validator_config.known_validators,
679                    rpc_contact_info.pubkey(),
680                    &mut blacklisted_rpc_nodes,
681                );
682            }
683        }
684    }
685
686    if let Some(gossip) = gossip.take() {
687        shutdown_gossip_service(gossip);
688    }
689
690    datapoint_info!(
691        "bootstrap-snapshot-download",
692        (
693            "total_time_secs",
694            total_snapshot_download_time.elapsed().as_secs(),
695            i64
696        ),
697        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
698        (
699            "snapshot_download_time_secs",
700            snapshot_download_time.as_secs(),
701            i64
702        ),
703        ("download_abort_count", download_abort_count, i64),
704        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
705    );
706}
707
708/// Get RPC peer node candidates to download from.
709///
710/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
711fn get_rpc_nodes(
712    cluster_info: &ClusterInfo,
713    cluster_entrypoints: &[ContactInfo],
714    validator_config: &ValidatorConfig,
715    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
716    bootstrap_config: &RpcBootstrapConfig,
717) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
718    let mut blacklist_timeout = Instant::now();
719    let mut get_rpc_peers_timout = Instant::now();
720    let mut newer_cluster_snapshot_timeout = None;
721    let mut retry_reason = None;
722    loop {
723        // Give gossip some time to populate and not spin on grabbing the crds lock
724        std::thread::sleep(Duration::from_secs(1));
725        info!("\n{}", cluster_info.rpc_info_trace());
726
727        let rpc_peers = get_rpc_peers(
728            cluster_info,
729            cluster_entrypoints,
730            validator_config,
731            blacklisted_rpc_nodes,
732            &blacklist_timeout,
733            &mut retry_reason,
734            bootstrap_config,
735        );
736        if rpc_peers.is_empty() {
737            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
738                return Err(GetRpcNodeError::NoRpcPeersFound);
739            }
740            continue;
741        }
742
743        // Reset timeouts if we found any viable RPC peers.
744        blacklist_timeout = Instant::now();
745        get_rpc_peers_timout = Instant::now();
746        if bootstrap_config.no_snapshot_fetch {
747            let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
748            return Ok(vec![GetRpcNodeResult {
749                rpc_contact_info: random_peer.clone(),
750                snapshot_hash: None,
751            }]);
752        }
753
754        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
755            .as_ref()
756            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
757            .unwrap_or(true)
758        {
759            KnownValidatorsToWaitFor::All
760        } else {
761            KnownValidatorsToWaitFor::Any
762        };
763        let peer_snapshot_hashes = get_peer_snapshot_hashes(
764            cluster_info,
765            &rpc_peers,
766            validator_config.known_validators.as_ref(),
767            known_validators_to_wait_for,
768            bootstrap_config.incremental_snapshot_fetch,
769        );
770        if peer_snapshot_hashes.is_empty() {
771            match newer_cluster_snapshot_timeout {
772                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
773                Some(newer_cluster_snapshot_timeout) => {
774                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
775                        return Err(GetRpcNodeError::NoNewerSnapshots);
776                    }
777                }
778            }
779            retry_reason = Some("No snapshots available".to_owned());
780            continue;
781        } else {
782            let rpc_peers = peer_snapshot_hashes
783                .iter()
784                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
785                .collect::<Vec<_>>();
786            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
787            info!(
788                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
789                final_snapshot_hash
790                    .incr
791                    .map(|(slot, _hash)| slot)
792                    .unwrap_or(final_snapshot_hash.full.0),
793                rpc_peers.len(),
794                if rpc_peers.len() > 1 { "s" } else { "" },
795                rpc_peers,
796            );
797            let rpc_node_results = peer_snapshot_hashes
798                .iter()
799                .map(|peer_snapshot_hash| GetRpcNodeResult {
800                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
801                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
802                })
803                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
804                .collect();
805            return Ok(rpc_node_results);
806        }
807    }
808}
809
810/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
811/// snapshot or an incremental snapshot.
812fn get_highest_local_snapshot_hash(
813    full_snapshot_archives_dir: impl AsRef<Path>,
814    incremental_snapshot_archives_dir: impl AsRef<Path>,
815    incremental_snapshot_fetch: bool,
816) -> Option<(Slot, Hash)> {
817    snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
818        .and_then(|full_snapshot_info| {
819            if incremental_snapshot_fetch {
820                snapshot_utils::get_highest_incremental_snapshot_archive_info(
821                    incremental_snapshot_archives_dir,
822                    full_snapshot_info.slot(),
823                )
824                .map(|incremental_snapshot_info| {
825                    (
826                        incremental_snapshot_info.slot(),
827                        *incremental_snapshot_info.hash(),
828                    )
829                })
830            } else {
831                None
832            }
833            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
834        })
835        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
836}
837
838/// Get peer snapshot hashes
839///
840/// The result is a vector of peers with snapshot hashes that:
841/// 1. match a snapshot hash from the known validators
842/// 2. have the highest incremental snapshot slot
843/// 3. have the highest full snapshot slot of (2)
844fn get_peer_snapshot_hashes(
845    cluster_info: &ClusterInfo,
846    rpc_peers: &[ContactInfo],
847    known_validators: Option<&HashSet<Pubkey>>,
848    known_validators_to_wait_for: KnownValidatorsToWaitFor,
849    incremental_snapshot_fetch: bool,
850) -> Vec<PeerSnapshotHash> {
851    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
852    if let Some(known_validators) = known_validators {
853        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
854            cluster_info,
855            known_validators,
856            known_validators_to_wait_for,
857        );
858        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
859            &known_snapshot_hashes,
860            &mut peer_snapshot_hashes,
861        );
862    }
863    if incremental_snapshot_fetch {
864        // Only filter by highest incremental snapshot slot if we're actually going to download an
865        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
866        // being selected.  For example, if there are two peer snapshot hashes:
867        // (A) full snapshot slot: 100, incremental snapshot slot: 160
868        // (B) full snapshot slot: 150, incremental snapshot slot: None
869        // Then (A) has the highest overall snapshot slot.  But if we're not downlading and
870        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
871        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
872            &mut peer_snapshot_hashes,
873        );
874    }
875    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
876
877    peer_snapshot_hashes
878}
879
880/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
881/// is treated as the base for its set of incremental snapshot hashes.
882type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
883
884/// Get the snapshot hashes from known validators.
885///
886/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
887/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
888/// queried for their individual snapshot hashes, their results will be checked against this
889/// map to verify correctness.
890///
891/// NOTE: Only a single snashot hash is allowed per slot.  If somehow two known validators have
892/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
893/// This applies to both full and incremental snapshot hashes.
894fn get_snapshot_hashes_from_known_validators(
895    cluster_info: &ClusterInfo,
896    known_validators: &HashSet<Pubkey>,
897    known_validators_to_wait_for: KnownValidatorsToWaitFor,
898) -> KnownSnapshotHashes {
899    // Get the snapshot hashes for a node from CRDS
900    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
901
902    if !do_known_validators_have_all_snapshot_hashes(
903        known_validators,
904        known_validators_to_wait_for,
905        get_snapshot_hashes_for_node,
906    ) {
907        debug!(
908            "Snapshot hashes have not been discovered from known validators. \
909            This likely means the gossip tables are not fully populated. \
910            We will sleep and retry..."
911        );
912        return KnownSnapshotHashes::default();
913    }
914
915    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
916}
917
918/// Check if we can discover snapshot hashes for the known validators.
919///
920/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
921/// process will download snapshots.
922///
923/// This function will return false if we do not yet have snapshot hashes from known validators;
924/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
925/// based on the `KnownValidatorsToWaitFor` parameter.
926fn do_known_validators_have_all_snapshot_hashes<'a>(
927    known_validators: impl IntoIterator<Item = &'a Pubkey>,
928    known_validators_to_wait_for: KnownValidatorsToWaitFor,
929    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
930) -> bool {
931    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
932
933    match known_validators_to_wait_for {
934        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
935        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
936    }
937}
938
939/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
940/// of them?
941#[derive(Debug, Copy, Clone, Eq, PartialEq)]
942enum KnownValidatorsToWaitFor {
943    All,
944    Any,
945}
946
947/// Build the known snapshot hashes from a set of nodes.
948///
949/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
950/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
951/// runtime information such as the ClusterInfo or ValidatorConfig.
952fn build_known_snapshot_hashes<'a>(
953    nodes: impl IntoIterator<Item = &'a Pubkey>,
954    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
955) -> KnownSnapshotHashes {
956    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
957
958    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
959    /// but *different* hash as the needle.
960    fn is_any_same_slot_and_different_hash<'a>(
961        needle: &(Slot, Hash),
962        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
963    ) -> bool {
964        haystack
965            .into_iter()
966            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
967    }
968
969    'to_next_node: for node in nodes {
970        let Some(SnapshotHash {
971            full: full_snapshot_hash,
972            incr: incremental_snapshot_hash,
973        }) = get_snapshot_hashes_for_node(node)
974        else {
975            continue 'to_next_node;
976        };
977
978        // Do not add this snapshot hash if there's already a full snapshot hash with the
979        // same slot but with a _different_ hash.
980        // NOTE: Nodes should not produce snapshots at the same slot with _different_
981        // hashes.  So if it happens, keep the first and ignore the rest.
982        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
983            warn!(
984                "Ignoring all snapshot hashes from node {node} since we've seen a different full snapshot hash with this slot.\
985                \nfull snapshot hash: {full_snapshot_hash:?}"
986            );
987            debug!(
988                "known full snapshot hashes: {:#?}",
989                known_snapshot_hashes.keys(),
990            );
991            continue 'to_next_node;
992        }
993
994        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
995        // doesn't already exist.  This is to ensure we don't overwrite existing
996        // incremental snapshot hashes that may be present for this full snapshot hash.
997        let known_incremental_snapshot_hashes =
998            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
999
1000        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1001            // Do not add this snapshot hash if there's already an incremental snapshot
1002            // hash with the same slot, but with a _different_ hash.
1003            // NOTE: Nodes should not produce snapshots at the same slot with _different_
1004            // hashes.  So if it happens, keep the first and ignore the rest.
1005            if is_any_same_slot_and_different_hash(
1006                &incremental_snapshot_hash,
1007                known_incremental_snapshot_hashes.iter(),
1008            ) {
1009                warn!(
1010                    "Ignoring incremental snapshot hash from node {node} since we've seen a different incremental snapshot hash with this slot.\
1011                    \nfull snapshot hash: {full_snapshot_hash:?}\
1012                    \nincremental snapshot hash: {incremental_snapshot_hash:?}"
1013                );
1014                debug!(
1015                    "known incremental snapshot hashes based on this slot: {:#?}",
1016                    known_incremental_snapshot_hashes.iter(),
1017                );
1018                continue 'to_next_node;
1019            }
1020
1021            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1022        };
1023    }
1024
1025    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1026    known_snapshot_hashes
1027}
1028
1029/// Get snapshot hashes from all eligible peers.
1030///
1031/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1032/// This may be just a full snapshot hash, or a combo full snapshot hash and
1033/// incremental snapshot hash.
1034fn get_eligible_peer_snapshot_hashes(
1035    cluster_info: &ClusterInfo,
1036    rpc_peers: &[ContactInfo],
1037) -> Vec<PeerSnapshotHash> {
1038    let peer_snapshot_hashes = rpc_peers
1039        .iter()
1040        .flat_map(|rpc_peer| {
1041            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1042                PeerSnapshotHash {
1043                    rpc_contact_info: rpc_peer.clone(),
1044                    snapshot_hash,
1045                }
1046            })
1047        })
1048        .collect();
1049
1050    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1051    peer_snapshot_hashes
1052}
1053
1054/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1055fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1056    known_snapshot_hashes: &KnownSnapshotHashes,
1057    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1058) {
1059    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1060        known_snapshot_hashes
1061            .get(&peer_snapshot_hash.snapshot_hash.full)
1062            .map(|known_incremental_hashes| {
1063                if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1064                    // If the peer's full snapshot hashes match, but doesn't have any
1065                    // incremental snapshots, that's fine; keep 'em!
1066                    true
1067                } else {
1068                    known_incremental_hashes
1069                        .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1070                }
1071            })
1072            .unwrap_or(false)
1073    });
1074
1075    trace!(
1076        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1077    );
1078}
1079
1080/// Retain the peer snapshot hashes with the highest full snapshot slot
1081fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1082    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1083) {
1084    let highest_full_snapshot_hash = peer_snapshot_hashes
1085        .iter()
1086        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1087        .max_by_key(|(slot, _hash)| *slot);
1088    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1089        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1090        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1091        // will be nothing to compare against within the `retain()` predicate).
1092        return;
1093    };
1094
1095    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1096        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1097    });
1098
1099    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1100}
1101
1102/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1103fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1104    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1105) {
1106    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1107        .iter()
1108        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1109        .max_by_key(|(slot, _hash)| *slot);
1110
1111    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1112        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1113    });
1114
1115    trace!("retain peer snapshot hashes with highest incremental snapshot slot: {peer_snapshot_hashes:?}");
1116}
1117
1118/// Check to see if we can use our local snapshots, otherwise download newer ones.
1119#[allow(clippy::too_many_arguments)]
1120fn download_snapshots(
1121    full_snapshot_archives_dir: &Path,
1122    incremental_snapshot_archives_dir: &Path,
1123    validator_config: &ValidatorConfig,
1124    bootstrap_config: &RpcBootstrapConfig,
1125    use_progress_bar: bool,
1126    maximum_local_snapshot_age: Slot,
1127    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1128    minimal_snapshot_download_speed: f32,
1129    maximum_snapshot_download_abort: u64,
1130    download_abort_count: &mut u64,
1131    snapshot_hash: Option<SnapshotHash>,
1132    rpc_contact_info: &ContactInfo,
1133) -> Result<(), String> {
1134    if snapshot_hash.is_none() {
1135        return Ok(());
1136    }
1137    let SnapshotHash {
1138        full: full_snapshot_hash,
1139        incr: incremental_snapshot_hash,
1140    } = snapshot_hash.unwrap();
1141
1142    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1143    if should_use_local_snapshot(
1144        full_snapshot_archives_dir,
1145        incremental_snapshot_archives_dir,
1146        maximum_local_snapshot_age,
1147        full_snapshot_hash,
1148        incremental_snapshot_hash,
1149        bootstrap_config.incremental_snapshot_fetch,
1150    ) {
1151        return Ok(());
1152    }
1153
1154    // Check and see if we've already got the full snapshot; if not, download it
1155    if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1156        .into_iter()
1157        .any(|snapshot_archive| {
1158            snapshot_archive.slot() == full_snapshot_hash.0
1159                && snapshot_archive.hash().0 == full_snapshot_hash.1
1160        })
1161    {
1162        info!(
1163            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1164            full_snapshot_hash.0, full_snapshot_hash.1
1165        );
1166    } else {
1167        download_snapshot(
1168            full_snapshot_archives_dir,
1169            incremental_snapshot_archives_dir,
1170            validator_config,
1171            bootstrap_config,
1172            use_progress_bar,
1173            start_progress,
1174            minimal_snapshot_download_speed,
1175            maximum_snapshot_download_abort,
1176            download_abort_count,
1177            rpc_contact_info,
1178            full_snapshot_hash,
1179            SnapshotKind::FullSnapshot,
1180        )?;
1181    }
1182
1183    if bootstrap_config.incremental_snapshot_fetch {
1184        // Check and see if we've already got the incremental snapshot; if not, download it
1185        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1186            if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1187                .into_iter()
1188                .any(|snapshot_archive| {
1189                    snapshot_archive.slot() == incremental_snapshot_hash.0
1190                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1191                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1192                })
1193            {
1194                info!(
1195                    "Incremental snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1196                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1197                );
1198            } else {
1199                download_snapshot(
1200                    full_snapshot_archives_dir,
1201                    incremental_snapshot_archives_dir,
1202                    validator_config,
1203                    bootstrap_config,
1204                    use_progress_bar,
1205                    start_progress,
1206                    minimal_snapshot_download_speed,
1207                    maximum_snapshot_download_abort,
1208                    download_abort_count,
1209                    rpc_contact_info,
1210                    incremental_snapshot_hash,
1211                    SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1212                )?;
1213            }
1214        }
1215    }
1216
1217    Ok(())
1218}
1219
1220/// Download a snapshot
1221#[allow(clippy::too_many_arguments)]
1222fn download_snapshot(
1223    full_snapshot_archives_dir: &Path,
1224    incremental_snapshot_archives_dir: &Path,
1225    validator_config: &ValidatorConfig,
1226    bootstrap_config: &RpcBootstrapConfig,
1227    use_progress_bar: bool,
1228    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1229    minimal_snapshot_download_speed: f32,
1230    maximum_snapshot_download_abort: u64,
1231    download_abort_count: &mut u64,
1232    rpc_contact_info: &ContactInfo,
1233    desired_snapshot_hash: (Slot, Hash),
1234    snapshot_kind: SnapshotKind,
1235) -> Result<(), String> {
1236    let maximum_full_snapshot_archives_to_retain = validator_config
1237        .snapshot_config
1238        .maximum_full_snapshot_archives_to_retain;
1239    let maximum_incremental_snapshot_archives_to_retain = validator_config
1240        .snapshot_config
1241        .maximum_incremental_snapshot_archives_to_retain;
1242
1243    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1244        slot: desired_snapshot_hash.0,
1245        rpc_addr: rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1246    };
1247    let desired_snapshot_hash = (
1248        desired_snapshot_hash.0,
1249        solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1250    );
1251    download_snapshot_archive(
1252        &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1253        full_snapshot_archives_dir,
1254        incremental_snapshot_archives_dir,
1255        desired_snapshot_hash,
1256        snapshot_kind,
1257        maximum_full_snapshot_archives_to_retain,
1258        maximum_incremental_snapshot_archives_to_retain,
1259        use_progress_bar,
1260        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1261            debug!("Download progress: {download_progress:?}");
1262            if download_progress.last_throughput < minimal_snapshot_download_speed
1263                && download_progress.notification_count <= 1
1264                && download_progress.percentage_done <= 2_f32
1265                && download_progress.estimated_remaining_time > 60_f32
1266                && *download_abort_count < maximum_snapshot_download_abort
1267            {
1268                if let Some(ref known_validators) = validator_config.known_validators {
1269                    if known_validators.contains(rpc_contact_info.pubkey())
1270                        && known_validators.len() == 1
1271                        && bootstrap_config.only_known_rpc
1272                    {
1273                        warn!(
1274                            "The snapshot download is too slow, throughput: {} < min speed {} \
1275                            bytes/sec, but will NOT abort and try a different node as it is the \
1276                            only known validator and the --only-known-rpc flag is set. \
1277                            Abort count: {}, Progress detail: {:?}",
1278                            download_progress.last_throughput,
1279                            minimal_snapshot_download_speed,
1280                            download_abort_count,
1281                            download_progress,
1282                        );
1283                        return true; // Do not abort download from the one-and-only known validator
1284                    }
1285                }
1286                warn!(
1287                    "The snapshot download is too slow, throughput: {} < min speed {} \
1288                    bytes/sec, will abort and try a different node. \
1289                    Abort count: {}, Progress detail: {:?}",
1290                    download_progress.last_throughput,
1291                    minimal_snapshot_download_speed,
1292                    download_abort_count,
1293                    download_progress,
1294                );
1295                *download_abort_count += 1;
1296                false
1297            } else {
1298                true
1299            }
1300        })),
1301    )
1302}
1303
1304/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1305/// will be downloaded.
1306fn should_use_local_snapshot(
1307    full_snapshot_archives_dir: &Path,
1308    incremental_snapshot_archives_dir: &Path,
1309    maximum_local_snapshot_age: Slot,
1310    full_snapshot_hash: (Slot, Hash),
1311    incremental_snapshot_hash: Option<(Slot, Hash)>,
1312    incremental_snapshot_fetch: bool,
1313) -> bool {
1314    let cluster_snapshot_slot = incremental_snapshot_hash
1315        .map(|(slot, _)| slot)
1316        .unwrap_or(full_snapshot_hash.0);
1317
1318    match get_highest_local_snapshot_hash(
1319        full_snapshot_archives_dir,
1320        incremental_snapshot_archives_dir,
1321        incremental_snapshot_fetch,
1322    ) {
1323        None => {
1324            info!("Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a local snapshot.");
1325            false
1326        }
1327        Some((local_snapshot_slot, _)) => {
1328            if local_snapshot_slot
1329                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1330            {
1331                info!("Reusing local snapshot at slot {local_snapshot_slot} instead of downloading a snapshot for slot {cluster_snapshot_slot}.");
1332                true
1333            } else {
1334                info!("Local snapshot from slot {local_snapshot_slot} is too old. Downloading a newer snapshot for slot {cluster_snapshot_slot}.");
1335                false
1336            }
1337        }
1338    }
1339}
1340
1341/// Get the node's highest snapshot hashes from CRDS
1342fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1343    cluster_info.get_snapshot_hashes_for_node(node).map(
1344        |crds_value::SnapshotHashes {
1345             full, incremental, ..
1346         }| {
1347            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1348            SnapshotHash {
1349                full,
1350                incr: highest_incremental_snapshot_hash,
1351            }
1352        },
1353    )
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358    use super::*;
1359
1360    impl PeerSnapshotHash {
1361        fn new(
1362            rpc_contact_info: ContactInfo,
1363            full_snapshot_hash: (Slot, Hash),
1364            incremental_snapshot_hash: Option<(Slot, Hash)>,
1365        ) -> Self {
1366            Self {
1367                rpc_contact_info,
1368                snapshot_hash: SnapshotHash {
1369                    full: full_snapshot_hash,
1370                    incr: incremental_snapshot_hash,
1371                },
1372            }
1373        }
1374    }
1375
1376    fn default_contact_info_for_tests() -> ContactInfo {
1377        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1378    }
1379
1380    #[test]
1381    fn test_build_known_snapshot_hashes() {
1382        solana_logger::setup();
1383        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1384        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1385
1386        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1387        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1388
1389        // simulate a set of known validators with various snapshot hashes
1390        let oracle = {
1391            let mut oracle = HashMap::new();
1392
1393            for (full, incr) in [
1394                // only a full snapshot
1395                (full_snapshot_hash1, None),
1396                // full and incremental snapshots
1397                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1398                // full and incremental snapshots, with different incremental hash
1399                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1400                // ...and now with different full hashes
1401                (full_snapshot_hash2, None),
1402                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1403                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1404            ] {
1405                // also simulate multiple known validators having the same snapshot hashes
1406                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1407                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1408                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1409            }
1410
1411            // no snapshots at all
1412            oracle.insert(Pubkey::new_unique(), None);
1413            oracle.insert(Pubkey::new_unique(), None);
1414            oracle.insert(Pubkey::new_unique(), None);
1415
1416            oracle
1417        };
1418
1419        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1420
1421        let known_snapshot_hashes =
1422            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1423
1424        // ensure there's only one full snapshot hash, since they all used the same slot and there
1425        // can be only one snapshot hash per slot
1426        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1427        assert_eq!(known_full_snapshot_hashes.len(), 1);
1428        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1429
1430        // and for the same reasons, ensure there is only one incremental snapshot hash
1431        let known_incremental_snapshot_hashes =
1432            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1433        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1434        let known_incremental_snapshot_hash =
1435            known_incremental_snapshot_hashes.iter().next().unwrap();
1436
1437        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1438        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1439        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1440        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1441        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1442        // "first" when building the known snapshot hashes.
1443        assert!(
1444            known_full_snapshot_hash == &full_snapshot_hash1
1445                || known_full_snapshot_hash == &full_snapshot_hash2
1446        );
1447        assert!(
1448            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1449                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1450        );
1451    }
1452
1453    #[test]
1454    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1455        let known_snapshot_hashes: KnownSnapshotHashes = [
1456            (
1457                (200_000, Hash::new_unique()),
1458                [
1459                    (200_200, Hash::new_unique()),
1460                    (200_400, Hash::new_unique()),
1461                    (200_600, Hash::new_unique()),
1462                    (200_800, Hash::new_unique()),
1463                ]
1464                .iter()
1465                .cloned()
1466                .collect(),
1467            ),
1468            (
1469                (300_000, Hash::new_unique()),
1470                [
1471                    (300_200, Hash::new_unique()),
1472                    (300_400, Hash::new_unique()),
1473                    (300_600, Hash::new_unique()),
1474                ]
1475                .iter()
1476                .cloned()
1477                .collect(),
1478            ),
1479        ]
1480        .iter()
1481        .cloned()
1482        .collect();
1483
1484        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1485        let known_full_snapshot_hash = known_snapshot_hash.0;
1486        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1487
1488        let contact_info = default_contact_info_for_tests();
1489        let peer_snapshot_hashes = vec![
1490            // bad full snapshot hash, no incremental snapshot hash
1491            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1492            // bad everything
1493            PeerSnapshotHash::new(
1494                contact_info.clone(),
1495                (111_000, Hash::default()),
1496                Some((111_111, Hash::default())),
1497            ),
1498            // good full snapshot hash, no incremental snapshot hash
1499            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1500            // bad full snapshot hash, good (not possible) incremental snapshot hash
1501            PeerSnapshotHash::new(
1502                contact_info.clone(),
1503                (111_000, Hash::default()),
1504                Some(*known_incremental_snapshot_hash),
1505            ),
1506            // good full snapshot hash, bad incremental snapshot hash
1507            PeerSnapshotHash::new(
1508                contact_info.clone(),
1509                *known_full_snapshot_hash,
1510                Some((111_111, Hash::default())),
1511            ),
1512            // good everything
1513            PeerSnapshotHash::new(
1514                contact_info.clone(),
1515                *known_full_snapshot_hash,
1516                Some(*known_incremental_snapshot_hash),
1517            ),
1518        ];
1519
1520        let expected = vec![
1521            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1522            PeerSnapshotHash::new(
1523                contact_info,
1524                *known_full_snapshot_hash,
1525                Some(*known_incremental_snapshot_hash),
1526            ),
1527        ];
1528        let mut actual = peer_snapshot_hashes;
1529        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1530            &known_snapshot_hashes,
1531            &mut actual,
1532        );
1533        assert_eq!(expected, actual);
1534    }
1535
1536    #[test]
1537    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1538        let contact_info = default_contact_info_for_tests();
1539        let peer_snapshot_hashes = vec![
1540            // old
1541            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1542            PeerSnapshotHash::new(
1543                contact_info.clone(),
1544                (100_000, Hash::default()),
1545                Some((100_100, Hash::default())),
1546            ),
1547            PeerSnapshotHash::new(
1548                contact_info.clone(),
1549                (100_000, Hash::default()),
1550                Some((100_200, Hash::default())),
1551            ),
1552            PeerSnapshotHash::new(
1553                contact_info.clone(),
1554                (100_000, Hash::default()),
1555                Some((100_300, Hash::default())),
1556            ),
1557            // new
1558            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1559            PeerSnapshotHash::new(
1560                contact_info.clone(),
1561                (200_000, Hash::default()),
1562                Some((200_100, Hash::default())),
1563            ),
1564            PeerSnapshotHash::new(
1565                contact_info.clone(),
1566                (200_000, Hash::default()),
1567                Some((200_200, Hash::default())),
1568            ),
1569            PeerSnapshotHash::new(
1570                contact_info.clone(),
1571                (200_000, Hash::default()),
1572                Some((200_300, Hash::default())),
1573            ),
1574        ];
1575
1576        let expected = vec![
1577            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1578            PeerSnapshotHash::new(
1579                contact_info.clone(),
1580                (200_000, Hash::default()),
1581                Some((200_100, Hash::default())),
1582            ),
1583            PeerSnapshotHash::new(
1584                contact_info.clone(),
1585                (200_000, Hash::default()),
1586                Some((200_200, Hash::default())),
1587            ),
1588            PeerSnapshotHash::new(
1589                contact_info,
1590                (200_000, Hash::default()),
1591                Some((200_300, Hash::default())),
1592            ),
1593        ];
1594        let mut actual = peer_snapshot_hashes;
1595        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1596        assert_eq!(expected, actual);
1597    }
1598
1599    #[test]
1600    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1601        let contact_info = default_contact_info_for_tests();
1602        let peer_snapshot_hashes = vec![
1603            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1604            PeerSnapshotHash::new(
1605                contact_info.clone(),
1606                (200_000, Hash::default()),
1607                Some((200_100, Hash::default())),
1608            ),
1609            PeerSnapshotHash::new(
1610                contact_info.clone(),
1611                (200_000, Hash::default()),
1612                Some((200_200, Hash::default())),
1613            ),
1614            PeerSnapshotHash::new(
1615                contact_info.clone(),
1616                (200_000, Hash::default()),
1617                Some((200_300, Hash::default())),
1618            ),
1619            PeerSnapshotHash::new(
1620                contact_info.clone(),
1621                (200_000, Hash::default()),
1622                Some((200_010, Hash::default())),
1623            ),
1624            PeerSnapshotHash::new(
1625                contact_info.clone(),
1626                (200_000, Hash::default()),
1627                Some((200_020, Hash::default())),
1628            ),
1629            PeerSnapshotHash::new(
1630                contact_info.clone(),
1631                (200_000, Hash::default()),
1632                Some((200_030, Hash::default())),
1633            ),
1634        ];
1635
1636        let expected = vec![PeerSnapshotHash::new(
1637            contact_info,
1638            (200_000, Hash::default()),
1639            Some((200_300, Hash::default())),
1640        )];
1641        let mut actual = peer_snapshot_hashes;
1642        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1643        assert_eq!(expected, actual);
1644    }
1645
1646    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1647    /// there are *zero* peers with incremental snapshots.
1648    #[test]
1649    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1650        let contact_info = default_contact_info_for_tests();
1651        let peer_snapshot_hashes = vec![
1652            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1653            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1654            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1655        ];
1656
1657        let expected = peer_snapshot_hashes.clone();
1658        let mut actual = peer_snapshot_hashes;
1659        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1660        assert_eq!(expected, actual);
1661    }
1662
1663    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1664    /// peer snapshot hashes input is empty.
1665    #[test]
1666    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1667        {
1668            let mut actual = vec![];
1669            let expected = actual.clone();
1670            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1671            assert_eq!(expected, actual);
1672        }
1673        {
1674            let mut actual = vec![];
1675            let expected = actual.clone();
1676            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1677            assert_eq!(expected, actual);
1678        }
1679    }
1680}