agave_validator/
bootstrap.rs

1use {
2    itertools::Itertools,
3    log::*,
4    rand::{seq::SliceRandom, thread_rng, Rng},
5    rayon::prelude::*,
6    solana_clock::Slot,
7    solana_commitment_config::CommitmentConfig,
8    solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
9    solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
10    solana_genesis_utils::download_then_check_genesis_hash,
11    solana_gossip::{
12        cluster_info::ClusterInfo,
13        contact_info::{ContactInfo, Protocol},
14        crds_data,
15        gossip_service::GossipService,
16        node::Node,
17    },
18    solana_hash::Hash,
19    solana_keypair::Keypair,
20    solana_metrics::datapoint_info,
21    solana_pubkey::Pubkey,
22    solana_rpc_client::rpc_client::RpcClient,
23    solana_runtime::{
24        snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
25        snapshot_utils,
26    },
27    solana_signer::Signer,
28    solana_streamer::socket::SocketAddrSpace,
29    std::{
30        collections::{hash_map::RandomState, HashMap, HashSet},
31        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
32        path::Path,
33        process::exit,
34        sync::{
35            atomic::{AtomicBool, Ordering},
36            Arc, RwLock,
37        },
38        time::{Duration, Instant},
39    },
40    thiserror::Error,
41};
42
43/// When downloading snapshots, wait at most this long for snapshot hashes from
44/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
45/// known validator.
46const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
47/// If we don't have any alternative peers after this long, better off trying
48/// blacklisted peers again.
49const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
50/// If we can't find a good snapshot download candidate after this time, just
51/// give up.
52const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
53/// If we haven't found any RPC peers after this time, just give up.
54const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
55
56pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
57
58pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
59
60#[derive(Debug, PartialEq, Clone)]
61pub struct RpcBootstrapConfig {
62    pub no_genesis_fetch: bool,
63    pub no_snapshot_fetch: bool,
64    pub only_known_rpc: bool,
65    pub max_genesis_archive_unpacked_size: u64,
66    pub check_vote_account: Option<String>,
67    pub incremental_snapshot_fetch: bool,
68}
69
70fn verify_reachable_ports(
71    node: &Node,
72    cluster_entrypoint: &ContactInfo,
73    validator_config: &ValidatorConfig,
74    socket_addr_space: &SocketAddrSpace,
75) -> bool {
76    let verify_address = |addr: &Option<SocketAddr>| -> bool {
77        addr.as_ref()
78            .map(|addr| socket_addr_space.check(addr))
79            .unwrap_or_default()
80    };
81
82    let mut udp_sockets = vec![&node.sockets.repair];
83    udp_sockets.extend(node.sockets.gossip.iter());
84
85    if verify_address(&node.info.serve_repair(Protocol::UDP)) {
86        udp_sockets.push(&node.sockets.serve_repair);
87    }
88    if verify_address(&node.info.tpu(Protocol::UDP)) {
89        udp_sockets.extend(node.sockets.tpu.iter());
90        udp_sockets.extend(&node.sockets.tpu_quic);
91    }
92    if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
93        udp_sockets.extend(node.sockets.tpu_forwards.iter());
94        udp_sockets.extend(&node.sockets.tpu_forwards_quic);
95    }
96    if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
97        udp_sockets.extend(node.sockets.tpu_vote.iter());
98    }
99    if verify_address(&node.info.tvu(Protocol::UDP)) {
100        udp_sockets.extend(node.sockets.tvu.iter());
101        udp_sockets.extend(node.sockets.broadcast.iter());
102        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
103    }
104    if !solana_net_utils::verify_all_reachable_udp(
105        &cluster_entrypoint.gossip().unwrap(),
106        &udp_sockets,
107    ) {
108        return false;
109    }
110
111    let mut tcp_listeners = vec![];
112    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
113        for (purpose, bind_addr, public_addr) in &[
114            ("RPC", rpc_addr, node.info.rpc()),
115            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
116        ] {
117            if verify_address(public_addr) {
118                tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
119                    error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
120                    exit(1);
121                }));
122            }
123        }
124    }
125
126    if let Some(ip_echo) = &node.sockets.ip_echo {
127        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
128        tcp_listeners.push(ip_echo);
129    }
130
131    solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
132}
133
134fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
135    if let Some(known_validators) = known_validators {
136        known_validators.contains(id)
137    } else {
138        false
139    }
140}
141
142fn start_gossip_node(
143    identity_keypair: Arc<Keypair>,
144    cluster_entrypoints: &[ContactInfo],
145    ledger_path: &Path,
146    gossip_addr: &SocketAddr,
147    gossip_sockets: Arc<[UdpSocket]>,
148    expected_shred_version: u16,
149    gossip_validators: Option<HashSet<Pubkey>>,
150    should_check_duplicate_instance: bool,
151    socket_addr_space: SocketAddrSpace,
152) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
153    let contact_info = ClusterInfo::gossip_contact_info(
154        identity_keypair.pubkey(),
155        *gossip_addr,
156        expected_shred_version,
157    );
158    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
159    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
160    cluster_info.restore_contact_info(ledger_path, 0);
161    let cluster_info = Arc::new(cluster_info);
162
163    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
164    let gossip_service = GossipService::new(
165        &cluster_info,
166        None,
167        gossip_sockets,
168        gossip_validators,
169        should_check_duplicate_instance,
170        None,
171        gossip_exit_flag.clone(),
172    );
173    (cluster_info, gossip_exit_flag, gossip_service)
174}
175
176fn get_rpc_peers(
177    cluster_info: &ClusterInfo,
178    validator_config: &ValidatorConfig,
179    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
180    blacklist_timeout: &Instant,
181    retry_reason: &mut Option<String>,
182    bootstrap_config: &RpcBootstrapConfig,
183) -> Vec<ContactInfo> {
184    let shred_version = validator_config
185        .expected_shred_version
186        .unwrap_or_else(|| cluster_info.my_shred_version());
187
188    info!(
189        "Searching for an RPC service with shred version {shred_version}{}...",
190        retry_reason
191            .as_ref()
192            .map(|s| format!(" (Retrying: {s})"))
193            .unwrap_or_default()
194    );
195
196    let mut rpc_peers = cluster_info.rpc_peers();
197    if bootstrap_config.only_known_rpc {
198        rpc_peers.retain(|rpc_peer| {
199            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
200        });
201    }
202
203    let rpc_peers_total = rpc_peers.len();
204
205    // Filter out blacklisted nodes
206    let rpc_peers: Vec<_> = rpc_peers
207        .into_iter()
208        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
209        .collect();
210    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
211    let rpc_known_peers = rpc_peers
212        .iter()
213        .filter(|rpc_peer| {
214            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
215        })
216        .count();
217
218    info!(
219        "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
220         {rpc_peers_blacklisted} blacklisted"
221    );
222
223    if rpc_peers_blacklisted == rpc_peers_total {
224        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
225            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
226        {
227            // All nodes are blacklisted and no additional nodes recently discovered.
228            // Remove all nodes from the blacklist and try them again.
229            blacklisted_rpc_nodes.clear();
230            Some("Blacklist timeout expired".to_owned())
231        } else {
232            Some("Wait for known rpc peers".to_owned())
233        };
234        return vec![];
235    }
236    rpc_peers
237}
238
239fn check_vote_account(
240    rpc_client: &RpcClient,
241    identity_pubkey: &Pubkey,
242    vote_account_address: &Pubkey,
243    authorized_voter_pubkeys: &[Pubkey],
244) -> Result<(), String> {
245    let vote_account = rpc_client
246        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
247        .map_err(|err| format!("failed to fetch vote account: {err}"))?
248        .value
249        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
250
251    if vote_account.owner != solana_vote_program::id() {
252        return Err(format!(
253            "not a vote account (owned by {}): {}",
254            vote_account.owner, vote_account_address
255        ));
256    }
257
258    let identity_account = rpc_client
259        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
260        .map_err(|err| format!("failed to fetch identity account: {err}"))?
261        .value
262        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
263
264    let vote_state = solana_vote_program::vote_state::from(&vote_account);
265    if let Some(vote_state) = vote_state {
266        if vote_state.authorized_voters().is_empty() {
267            return Err("Vote account not yet initialized".to_string());
268        }
269
270        if vote_state.node_pubkey != *identity_pubkey {
271            return Err(format!(
272                "vote account's identity ({}) does not match the validator's identity {}).",
273                vote_state.node_pubkey, identity_pubkey
274            ));
275        }
276
277        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
278            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
279                return Err(format!(
280                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
281                ));
282            }
283        }
284    } else {
285        return Err(format!(
286            "invalid vote account data for {vote_account_address}"
287        ));
288    }
289
290    // Maybe we can calculate minimum voting fee; rather than 1 lamport
291    if identity_account.lamports <= 1 {
292        return Err(format!(
293            "underfunded identity account ({}): only {} lamports available",
294            identity_pubkey, identity_account.lamports
295        ));
296    }
297
298    Ok(())
299}
300
301#[derive(Error, Debug)]
302pub enum GetRpcNodeError {
303    #[error("Unable to find any RPC peers")]
304    NoRpcPeersFound,
305
306    #[error("Giving up, did not get newer snapshots from the cluster")]
307    NoNewerSnapshots,
308}
309
310/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
311/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
312/// snapshots to download.
313#[derive(Debug)]
314struct GetRpcNodeResult {
315    rpc_contact_info: ContactInfo,
316    snapshot_hash: Option<SnapshotHash>,
317}
318
319/// Struct to wrap the peers & snapshot hashes together.
320#[derive(Debug, PartialEq, Eq, Clone)]
321struct PeerSnapshotHash {
322    rpc_contact_info: ContactInfo,
323    snapshot_hash: SnapshotHash,
324}
325
326/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
327/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
328#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
329pub struct SnapshotHash {
330    full: (Slot, Hash),
331    incr: Option<(Slot, Hash)>,
332}
333
334pub fn fail_rpc_node(
335    err: String,
336    known_validators: &Option<HashSet<Pubkey, RandomState>>,
337    rpc_id: &Pubkey,
338    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
339) {
340    warn!("{err}");
341    if let Some(ref known_validators) = known_validators {
342        if known_validators.contains(rpc_id) {
343            return;
344        }
345    }
346
347    info!("Excluding {rpc_id} as a future RPC candidate");
348    blacklisted_rpc_nodes.insert(*rpc_id);
349}
350
351fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
352    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
353    cluster_info.save_contact_info();
354    gossip_exit_flag.store(true, Ordering::Relaxed);
355    gossip_service.join().unwrap();
356}
357
358#[allow(clippy::too_many_arguments)]
359pub fn attempt_download_genesis_and_snapshot(
360    rpc_contact_info: &ContactInfo,
361    ledger_path: &Path,
362    validator_config: &mut ValidatorConfig,
363    bootstrap_config: &RpcBootstrapConfig,
364    use_progress_bar: bool,
365    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
366    rpc_client: &RpcClient,
367    maximum_local_snapshot_age: Slot,
368    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
369    minimal_snapshot_download_speed: f32,
370    maximum_snapshot_download_abort: u64,
371    download_abort_count: &mut u64,
372    snapshot_hash: Option<SnapshotHash>,
373    identity_keypair: &Arc<Keypair>,
374    vote_account: &Pubkey,
375    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
376) -> Result<(), String> {
377    download_then_check_genesis_hash(
378        &rpc_contact_info
379            .rpc()
380            .ok_or_else(|| String::from("Invalid RPC address"))?,
381        ledger_path,
382        &mut validator_config.expected_genesis_hash,
383        bootstrap_config.max_genesis_archive_unpacked_size,
384        bootstrap_config.no_genesis_fetch,
385        use_progress_bar,
386        rpc_client,
387    )?;
388
389    if let Some(gossip) = gossip.take() {
390        shutdown_gossip_service(gossip);
391    }
392
393    let rpc_client_slot = rpc_client
394        .get_slot_with_commitment(CommitmentConfig::finalized())
395        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
396    info!("RPC node root slot: {rpc_client_slot}");
397
398    download_snapshots(
399        validator_config,
400        bootstrap_config,
401        use_progress_bar,
402        maximum_local_snapshot_age,
403        start_progress,
404        minimal_snapshot_download_speed,
405        maximum_snapshot_download_abort,
406        download_abort_count,
407        snapshot_hash,
408        rpc_contact_info,
409    )?;
410
411    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
412        let rpc_client = RpcClient::new(url);
413        check_vote_account(
414            &rpc_client,
415            &identity_keypair.pubkey(),
416            vote_account,
417            &authorized_voter_keypairs
418                .read()
419                .unwrap()
420                .iter()
421                .map(|k| k.pubkey())
422                .collect::<Vec<_>>(),
423        )
424        .unwrap_or_else(|err| {
425            // Consider failures here to be more likely due to user error (eg,
426            // incorrect `agave-validator` command-line arguments) rather than the
427            // RPC node failing.
428            //
429            // Power users can always use the `--no-check-vote-account` option to
430            // bypass this check entirely
431            error!("{err}");
432            exit(1);
433        });
434    }
435    Ok(())
436}
437
438/// simple ping helper function which returns the time to connect
439fn ping(addr: &SocketAddr) -> Option<Duration> {
440    let start = Instant::now();
441    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
442        Ok(_) => Some(start.elapsed()),
443        Err(_) => None,
444    }
445}
446
447// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
448// used for downloading latest snapshots and/or the genesis block. Guaranteed to
449// find at least one viable node or terminate the process.
450fn get_vetted_rpc_nodes(
451    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
452    cluster_info: &Arc<ClusterInfo>,
453    validator_config: &ValidatorConfig,
454    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
455    bootstrap_config: &RpcBootstrapConfig,
456) {
457    while vetted_rpc_nodes.is_empty() {
458        let rpc_node_details = match get_rpc_nodes(
459            cluster_info,
460            validator_config,
461            blacklisted_rpc_nodes,
462            bootstrap_config,
463        ) {
464            Ok(rpc_node_details) => rpc_node_details,
465            Err(err) => {
466                error!(
467                    "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
468                     `--no-port-check`, or adjusting `--known-validator ...` arguments as \
469                     applicable"
470                );
471                exit(1);
472            }
473        };
474
475        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
476        vetted_rpc_nodes.extend(
477            rpc_node_details
478                .into_par_iter()
479                .filter_map(|rpc_node_details| {
480                    let GetRpcNodeResult {
481                        rpc_contact_info,
482                        snapshot_hash,
483                    } = rpc_node_details;
484
485                    info!(
486                        "Using RPC service from node {}: {:?}",
487                        rpc_contact_info.pubkey(),
488                        rpc_contact_info.rpc()
489                    );
490
491                    let rpc_addr = rpc_contact_info.rpc()?;
492                    let ping_time = ping(&rpc_addr);
493
494                    let rpc_client =
495                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
496
497                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
498                })
499                .filter(
500                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
501                        .get_version()
502                    {
503                        Ok(rpc_version) => {
504                            if let Some(ping_time) = ping_time {
505                                info!(
506                                    "RPC node version: {} Ping: {}ms",
507                                    rpc_version.solana_core,
508                                    ping_time.as_millis()
509                                );
510                                true
511                            } else {
512                                fail_rpc_node(
513                                    "Failed to ping RPC".to_string(),
514                                    &validator_config.known_validators,
515                                    rpc_contact_info.pubkey(),
516                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
517                                );
518                                false
519                            }
520                        }
521                        Err(err) => {
522                            fail_rpc_node(
523                                format!("Failed to get RPC node version: {err}"),
524                                &validator_config.known_validators,
525                                rpc_contact_info.pubkey(),
526                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
527                            );
528                            false
529                        }
530                    },
531                )
532                .collect::<Vec<(
533                    ContactInfo,
534                    Option<SnapshotHash>,
535                    RpcClient,
536                    Option<Duration>,
537                )>>()
538                .into_iter()
539                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
540                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
541                    (rpc_contact_info, snapshot_hash, rpc_client)
542                })
543                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
544        );
545        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
546    }
547}
548
549#[allow(clippy::too_many_arguments)]
550pub fn rpc_bootstrap(
551    node: &Node,
552    identity_keypair: &Arc<Keypair>,
553    ledger_path: &Path,
554    vote_account: &Pubkey,
555    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
556    cluster_entrypoints: &[ContactInfo],
557    validator_config: &mut ValidatorConfig,
558    bootstrap_config: RpcBootstrapConfig,
559    do_port_check: bool,
560    use_progress_bar: bool,
561    maximum_local_snapshot_age: Slot,
562    should_check_duplicate_instance: bool,
563    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
564    minimal_snapshot_download_speed: f32,
565    maximum_snapshot_download_abort: u64,
566    socket_addr_space: SocketAddrSpace,
567) {
568    if do_port_check {
569        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
570        order.shuffle(&mut thread_rng());
571        if order.into_iter().all(|i| {
572            !verify_reachable_ports(
573                node,
574                &cluster_entrypoints[i],
575                validator_config,
576                &socket_addr_space,
577            )
578        }) {
579            exit(1);
580        }
581    }
582
583    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
584        return;
585    }
586
587    let total_snapshot_download_time = Instant::now();
588    let mut get_rpc_nodes_time = Duration::new(0, 0);
589    let mut snapshot_download_time = Duration::new(0, 0);
590    let mut blacklisted_rpc_nodes = HashSet::new();
591    let mut gossip = None;
592    let mut vetted_rpc_nodes = vec![];
593    let mut download_abort_count = 0;
594    loop {
595        if gossip.is_none() {
596            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
597
598            gossip = Some(start_gossip_node(
599                identity_keypair.clone(),
600                cluster_entrypoints,
601                ledger_path,
602                &node
603                    .info
604                    .gossip()
605                    .expect("Operator must spin up node with valid gossip address"),
606                node.sockets.gossip.clone(),
607                validator_config
608                    .expected_shred_version
609                    .expect("expected_shred_version should not be None"),
610                validator_config.gossip_validators.clone(),
611                should_check_duplicate_instance,
612                socket_addr_space,
613            ));
614        }
615
616        let get_rpc_nodes_start = Instant::now();
617        get_vetted_rpc_nodes(
618            &mut vetted_rpc_nodes,
619            &gossip.as_ref().unwrap().0,
620            validator_config,
621            &mut blacklisted_rpc_nodes,
622            &bootstrap_config,
623        );
624        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
625        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
626
627        let snapshot_download_start = Instant::now();
628        let download_result = attempt_download_genesis_and_snapshot(
629            &rpc_contact_info,
630            ledger_path,
631            validator_config,
632            &bootstrap_config,
633            use_progress_bar,
634            &mut gossip,
635            &rpc_client,
636            maximum_local_snapshot_age,
637            start_progress,
638            minimal_snapshot_download_speed,
639            maximum_snapshot_download_abort,
640            &mut download_abort_count,
641            snapshot_hash,
642            identity_keypair,
643            vote_account,
644            authorized_voter_keypairs.clone(),
645        );
646        snapshot_download_time += snapshot_download_start.elapsed();
647        match download_result {
648            Ok(()) => break,
649            Err(err) => {
650                fail_rpc_node(
651                    err,
652                    &validator_config.known_validators,
653                    rpc_contact_info.pubkey(),
654                    &mut blacklisted_rpc_nodes,
655                );
656            }
657        }
658    }
659
660    if let Some(gossip) = gossip.take() {
661        shutdown_gossip_service(gossip);
662    }
663
664    datapoint_info!(
665        "bootstrap-snapshot-download",
666        (
667            "total_time_secs",
668            total_snapshot_download_time.elapsed().as_secs(),
669            i64
670        ),
671        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
672        (
673            "snapshot_download_time_secs",
674            snapshot_download_time.as_secs(),
675            i64
676        ),
677        ("download_abort_count", download_abort_count, i64),
678        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
679    );
680}
681
682/// Get RPC peer node candidates to download from.
683///
684/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
685fn get_rpc_nodes(
686    cluster_info: &ClusterInfo,
687    validator_config: &ValidatorConfig,
688    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
689    bootstrap_config: &RpcBootstrapConfig,
690) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
691    let mut blacklist_timeout = Instant::now();
692    let mut get_rpc_peers_timout = Instant::now();
693    let mut newer_cluster_snapshot_timeout = None;
694    let mut retry_reason = None;
695    loop {
696        // Give gossip some time to populate and not spin on grabbing the crds lock
697        std::thread::sleep(Duration::from_secs(1));
698        info!("\n{}", cluster_info.rpc_info_trace());
699
700        let rpc_peers = get_rpc_peers(
701            cluster_info,
702            validator_config,
703            blacklisted_rpc_nodes,
704            &blacklist_timeout,
705            &mut retry_reason,
706            bootstrap_config,
707        );
708        if rpc_peers.is_empty() {
709            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
710                return Err(GetRpcNodeError::NoRpcPeersFound);
711            }
712            continue;
713        }
714
715        // Reset timeouts if we found any viable RPC peers.
716        blacklist_timeout = Instant::now();
717        get_rpc_peers_timout = Instant::now();
718        if bootstrap_config.no_snapshot_fetch {
719            let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
720            return Ok(vec![GetRpcNodeResult {
721                rpc_contact_info: random_peer.clone(),
722                snapshot_hash: None,
723            }]);
724        }
725
726        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
727            .as_ref()
728            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
729            .unwrap_or(true)
730        {
731            KnownValidatorsToWaitFor::All
732        } else {
733            KnownValidatorsToWaitFor::Any
734        };
735        let peer_snapshot_hashes = get_peer_snapshot_hashes(
736            cluster_info,
737            &rpc_peers,
738            validator_config.known_validators.as_ref(),
739            known_validators_to_wait_for,
740            bootstrap_config.incremental_snapshot_fetch,
741        );
742        if peer_snapshot_hashes.is_empty() {
743            match newer_cluster_snapshot_timeout {
744                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
745                Some(newer_cluster_snapshot_timeout) => {
746                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
747                        return Err(GetRpcNodeError::NoNewerSnapshots);
748                    }
749                }
750            }
751            retry_reason = Some("No snapshots available".to_owned());
752            continue;
753        } else {
754            let rpc_peers = peer_snapshot_hashes
755                .iter()
756                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
757                .collect::<Vec<_>>();
758            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
759            info!(
760                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
761                final_snapshot_hash
762                    .incr
763                    .map(|(slot, _hash)| slot)
764                    .unwrap_or(final_snapshot_hash.full.0),
765                rpc_peers.len(),
766                if rpc_peers.len() > 1 { "s" } else { "" },
767                rpc_peers,
768            );
769            let rpc_node_results = peer_snapshot_hashes
770                .iter()
771                .map(|peer_snapshot_hash| GetRpcNodeResult {
772                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
773                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
774                })
775                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
776                .collect();
777            return Ok(rpc_node_results);
778        }
779    }
780}
781
782/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
783/// snapshot or an incremental snapshot.
784fn get_highest_local_snapshot_hash(
785    full_snapshot_archives_dir: impl AsRef<Path>,
786    incremental_snapshot_archives_dir: impl AsRef<Path>,
787    incremental_snapshot_fetch: bool,
788) -> Option<(Slot, Hash)> {
789    snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
790        .and_then(|full_snapshot_info| {
791            if incremental_snapshot_fetch {
792                snapshot_utils::get_highest_incremental_snapshot_archive_info(
793                    incremental_snapshot_archives_dir,
794                    full_snapshot_info.slot(),
795                )
796                .map(|incremental_snapshot_info| {
797                    (
798                        incremental_snapshot_info.slot(),
799                        *incremental_snapshot_info.hash(),
800                    )
801                })
802            } else {
803                None
804            }
805            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
806        })
807        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
808}
809
810/// Get peer snapshot hashes
811///
812/// The result is a vector of peers with snapshot hashes that:
813/// 1. match a snapshot hash from the known validators
814/// 2. have the highest incremental snapshot slot
815/// 3. have the highest full snapshot slot of (2)
816fn get_peer_snapshot_hashes(
817    cluster_info: &ClusterInfo,
818    rpc_peers: &[ContactInfo],
819    known_validators: Option<&HashSet<Pubkey>>,
820    known_validators_to_wait_for: KnownValidatorsToWaitFor,
821    incremental_snapshot_fetch: bool,
822) -> Vec<PeerSnapshotHash> {
823    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
824    if let Some(known_validators) = known_validators {
825        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
826            cluster_info,
827            known_validators,
828            known_validators_to_wait_for,
829        );
830        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
831            &known_snapshot_hashes,
832            &mut peer_snapshot_hashes,
833        );
834    }
835    if incremental_snapshot_fetch {
836        // Only filter by highest incremental snapshot slot if we're actually going to download an
837        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
838        // being selected.  For example, if there are two peer snapshot hashes:
839        // (A) full snapshot slot: 100, incremental snapshot slot: 160
840        // (B) full snapshot slot: 150, incremental snapshot slot: None
841        // Then (A) has the highest overall snapshot slot.  But if we're not downloading and
842        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
843        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
844            &mut peer_snapshot_hashes,
845        );
846    }
847    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
848
849    peer_snapshot_hashes
850}
851
852/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
853/// is treated as the base for its set of incremental snapshot hashes.
854type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
855
856/// Get the snapshot hashes from known validators.
857///
858/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
859/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
860/// queried for their individual snapshot hashes, their results will be checked against this
861/// map to verify correctness.
862///
863/// NOTE: Only a single snashot hash is allowed per slot.  If somehow two known validators have
864/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
865/// This applies to both full and incremental snapshot hashes.
866fn get_snapshot_hashes_from_known_validators(
867    cluster_info: &ClusterInfo,
868    known_validators: &HashSet<Pubkey>,
869    known_validators_to_wait_for: KnownValidatorsToWaitFor,
870) -> KnownSnapshotHashes {
871    // Get the snapshot hashes for a node from CRDS
872    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
873
874    if !do_known_validators_have_all_snapshot_hashes(
875        known_validators,
876        known_validators_to_wait_for,
877        get_snapshot_hashes_for_node,
878    ) {
879        debug!(
880            "Snapshot hashes have not been discovered from known validators. This likely means \
881             the gossip tables are not fully populated. We will sleep and retry..."
882        );
883        return KnownSnapshotHashes::default();
884    }
885
886    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
887}
888
889/// Check if we can discover snapshot hashes for the known validators.
890///
891/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
892/// process will download snapshots.
893///
894/// This function will return false if we do not yet have snapshot hashes from known validators;
895/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
896/// based on the `KnownValidatorsToWaitFor` parameter.
897fn do_known_validators_have_all_snapshot_hashes<'a>(
898    known_validators: impl IntoIterator<Item = &'a Pubkey>,
899    known_validators_to_wait_for: KnownValidatorsToWaitFor,
900    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
901) -> bool {
902    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
903
904    match known_validators_to_wait_for {
905        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
906        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
907    }
908}
909
910/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
911/// of them?
912#[derive(Debug, Copy, Clone, Eq, PartialEq)]
913enum KnownValidatorsToWaitFor {
914    All,
915    Any,
916}
917
918/// Build the known snapshot hashes from a set of nodes.
919///
920/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
921/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
922/// runtime information such as the ClusterInfo or ValidatorConfig.
923fn build_known_snapshot_hashes<'a>(
924    nodes: impl IntoIterator<Item = &'a Pubkey>,
925    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
926) -> KnownSnapshotHashes {
927    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
928
929    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
930    /// but *different* hash as the needle.
931    fn is_any_same_slot_and_different_hash<'a>(
932        needle: &(Slot, Hash),
933        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
934    ) -> bool {
935        haystack
936            .into_iter()
937            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
938    }
939
940    'to_next_node: for node in nodes {
941        let Some(SnapshotHash {
942            full: full_snapshot_hash,
943            incr: incremental_snapshot_hash,
944        }) = get_snapshot_hashes_for_node(node)
945        else {
946            continue 'to_next_node;
947        };
948
949        // Do not add this snapshot hash if there's already a full snapshot hash with the
950        // same slot but with a _different_ hash.
951        // NOTE: Nodes should not produce snapshots at the same slot with _different_
952        // hashes.  So if it happens, keep the first and ignore the rest.
953        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
954            warn!(
955                "Ignoring all snapshot hashes from node {node} since we've seen a different full \
956                 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
957            );
958            debug!(
959                "known full snapshot hashes: {:#?}",
960                known_snapshot_hashes.keys(),
961            );
962            continue 'to_next_node;
963        }
964
965        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
966        // doesn't already exist.  This is to ensure we don't overwrite existing
967        // incremental snapshot hashes that may be present for this full snapshot hash.
968        let known_incremental_snapshot_hashes =
969            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
970
971        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
972            // Do not add this snapshot hash if there's already an incremental snapshot
973            // hash with the same slot, but with a _different_ hash.
974            // NOTE: Nodes should not produce snapshots at the same slot with _different_
975            // hashes.  So if it happens, keep the first and ignore the rest.
976            if is_any_same_slot_and_different_hash(
977                &incremental_snapshot_hash,
978                known_incremental_snapshot_hashes.iter(),
979            ) {
980                warn!(
981                    "Ignoring incremental snapshot hash from node {node} since we've seen a \
982                     different incremental snapshot hash with this slot. full snapshot hash: \
983                     {full_snapshot_hash:?}, incremental snapshot hash: \
984                     {incremental_snapshot_hash:?}"
985                );
986                debug!(
987                    "known incremental snapshot hashes based on this slot: {:#?}",
988                    known_incremental_snapshot_hashes.iter(),
989                );
990                continue 'to_next_node;
991            }
992
993            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
994        };
995    }
996
997    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
998    known_snapshot_hashes
999}
1000
1001/// Get snapshot hashes from all eligible peers.
1002///
1003/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1004/// This may be just a full snapshot hash, or a combo full snapshot hash and
1005/// incremental snapshot hash.
1006fn get_eligible_peer_snapshot_hashes(
1007    cluster_info: &ClusterInfo,
1008    rpc_peers: &[ContactInfo],
1009) -> Vec<PeerSnapshotHash> {
1010    let peer_snapshot_hashes = rpc_peers
1011        .iter()
1012        .flat_map(|rpc_peer| {
1013            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1014                PeerSnapshotHash {
1015                    rpc_contact_info: rpc_peer.clone(),
1016                    snapshot_hash,
1017                }
1018            })
1019        })
1020        .collect();
1021
1022    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1023    peer_snapshot_hashes
1024}
1025
1026/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1027fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1028    known_snapshot_hashes: &KnownSnapshotHashes,
1029    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1030) {
1031    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1032        known_snapshot_hashes
1033            .get(&peer_snapshot_hash.snapshot_hash.full)
1034            .map(|known_incremental_hashes| {
1035                if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1036                    // If the peer's full snapshot hashes match, but doesn't have any
1037                    // incremental snapshots, that's fine; keep 'em!
1038                    true
1039                } else {
1040                    known_incremental_hashes
1041                        .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1042                }
1043            })
1044            .unwrap_or(false)
1045    });
1046
1047    trace!(
1048        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1049    );
1050}
1051
1052/// Retain the peer snapshot hashes with the highest full snapshot slot
1053fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1054    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1055) {
1056    let highest_full_snapshot_hash = peer_snapshot_hashes
1057        .iter()
1058        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1059        .max_by_key(|(slot, _hash)| *slot);
1060    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1061        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1062        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1063        // will be nothing to compare against within the `retain()` predicate).
1064        return;
1065    };
1066
1067    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1068        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1069    });
1070
1071    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1072}
1073
1074/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1075fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1076    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1077) {
1078    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1079        .iter()
1080        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1081        .max_by_key(|(slot, _hash)| *slot);
1082
1083    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1084        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1085    });
1086
1087    trace!(
1088        "retain peer snapshot hashes with highest incremental snapshot slot: \
1089         {peer_snapshot_hashes:?}"
1090    );
1091}
1092
1093/// Check to see if we can use our local snapshots, otherwise download newer ones.
1094#[allow(clippy::too_many_arguments)]
1095fn download_snapshots(
1096    validator_config: &ValidatorConfig,
1097    bootstrap_config: &RpcBootstrapConfig,
1098    use_progress_bar: bool,
1099    maximum_local_snapshot_age: Slot,
1100    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1101    minimal_snapshot_download_speed: f32,
1102    maximum_snapshot_download_abort: u64,
1103    download_abort_count: &mut u64,
1104    snapshot_hash: Option<SnapshotHash>,
1105    rpc_contact_info: &ContactInfo,
1106) -> Result<(), String> {
1107    if snapshot_hash.is_none() {
1108        return Ok(());
1109    }
1110    let SnapshotHash {
1111        full: full_snapshot_hash,
1112        incr: incremental_snapshot_hash,
1113    } = snapshot_hash.unwrap();
1114    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1115    let incremental_snapshot_archives_dir = &validator_config
1116        .snapshot_config
1117        .incremental_snapshot_archives_dir;
1118
1119    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1120    if should_use_local_snapshot(
1121        full_snapshot_archives_dir,
1122        incremental_snapshot_archives_dir,
1123        maximum_local_snapshot_age,
1124        full_snapshot_hash,
1125        incremental_snapshot_hash,
1126        bootstrap_config.incremental_snapshot_fetch,
1127    ) {
1128        return Ok(());
1129    }
1130
1131    // Check and see if we've already got the full snapshot; if not, download it
1132    if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1133        .into_iter()
1134        .any(|snapshot_archive| {
1135            snapshot_archive.slot() == full_snapshot_hash.0
1136                && snapshot_archive.hash().0 == full_snapshot_hash.1
1137        })
1138    {
1139        info!(
1140            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1141            full_snapshot_hash.0, full_snapshot_hash.1
1142        );
1143    } else {
1144        download_snapshot(
1145            validator_config,
1146            bootstrap_config,
1147            use_progress_bar,
1148            start_progress,
1149            minimal_snapshot_download_speed,
1150            maximum_snapshot_download_abort,
1151            download_abort_count,
1152            rpc_contact_info,
1153            full_snapshot_hash,
1154            SnapshotKind::FullSnapshot,
1155        )?;
1156    }
1157
1158    if bootstrap_config.incremental_snapshot_fetch {
1159        // Check and see if we've already got the incremental snapshot; if not, download it
1160        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1161            if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1162                .into_iter()
1163                .any(|snapshot_archive| {
1164                    snapshot_archive.slot() == incremental_snapshot_hash.0
1165                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1166                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1167                })
1168            {
1169                info!(
1170                    "Incremental snapshot archive already exists locally. Skipping download. \
1171                     slot: {}, hash: {}",
1172                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1173                );
1174            } else {
1175                download_snapshot(
1176                    validator_config,
1177                    bootstrap_config,
1178                    use_progress_bar,
1179                    start_progress,
1180                    minimal_snapshot_download_speed,
1181                    maximum_snapshot_download_abort,
1182                    download_abort_count,
1183                    rpc_contact_info,
1184                    incremental_snapshot_hash,
1185                    SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1186                )?;
1187            }
1188        }
1189    }
1190
1191    Ok(())
1192}
1193
1194/// Download a snapshot
1195#[allow(clippy::too_many_arguments)]
1196fn download_snapshot(
1197    validator_config: &ValidatorConfig,
1198    bootstrap_config: &RpcBootstrapConfig,
1199    use_progress_bar: bool,
1200    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1201    minimal_snapshot_download_speed: f32,
1202    maximum_snapshot_download_abort: u64,
1203    download_abort_count: &mut u64,
1204    rpc_contact_info: &ContactInfo,
1205    desired_snapshot_hash: (Slot, Hash),
1206    snapshot_kind: SnapshotKind,
1207) -> Result<(), String> {
1208    let maximum_full_snapshot_archives_to_retain = validator_config
1209        .snapshot_config
1210        .maximum_full_snapshot_archives_to_retain;
1211    let maximum_incremental_snapshot_archives_to_retain = validator_config
1212        .snapshot_config
1213        .maximum_incremental_snapshot_archives_to_retain;
1214    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1215    let incremental_snapshot_archives_dir = &validator_config
1216        .snapshot_config
1217        .incremental_snapshot_archives_dir;
1218
1219    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1220        slot: desired_snapshot_hash.0,
1221        rpc_addr: rpc_contact_info
1222            .rpc()
1223            .ok_or_else(|| String::from("Invalid RPC address"))?,
1224    };
1225    let desired_snapshot_hash = (
1226        desired_snapshot_hash.0,
1227        solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1228    );
1229    download_snapshot_archive(
1230        &rpc_contact_info
1231            .rpc()
1232            .ok_or_else(|| String::from("Invalid RPC address"))?,
1233        full_snapshot_archives_dir,
1234        incremental_snapshot_archives_dir,
1235        desired_snapshot_hash,
1236        snapshot_kind,
1237        maximum_full_snapshot_archives_to_retain,
1238        maximum_incremental_snapshot_archives_to_retain,
1239        use_progress_bar,
1240        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1241            debug!("Download progress: {download_progress:?}");
1242            if download_progress.last_throughput < minimal_snapshot_download_speed
1243                && download_progress.notification_count <= 1
1244                && download_progress.percentage_done <= 2_f32
1245                && download_progress.estimated_remaining_time > 60_f32
1246                && *download_abort_count < maximum_snapshot_download_abort
1247            {
1248                if let Some(ref known_validators) = validator_config.known_validators {
1249                    if known_validators.contains(rpc_contact_info.pubkey())
1250                        && known_validators.len() == 1
1251                        && bootstrap_config.only_known_rpc
1252                    {
1253                        warn!(
1254                            "The snapshot download is too slow, throughput: {} < min speed {} \
1255                             bytes/sec, but will NOT abort and try a different node as it is the \
1256                             only known validator and the --only-known-rpc flag is set. Abort \
1257                             count: {}, Progress detail: {:?}",
1258                            download_progress.last_throughput,
1259                            minimal_snapshot_download_speed,
1260                            download_abort_count,
1261                            download_progress,
1262                        );
1263                        return true; // Do not abort download from the one-and-only known validator
1264                    }
1265                }
1266                warn!(
1267                    "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1268                     will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1269                    download_progress.last_throughput,
1270                    minimal_snapshot_download_speed,
1271                    download_abort_count,
1272                    download_progress,
1273                );
1274                *download_abort_count += 1;
1275                false
1276            } else {
1277                true
1278            }
1279        })),
1280    )
1281}
1282
1283/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1284/// will be downloaded.
1285fn should_use_local_snapshot(
1286    full_snapshot_archives_dir: &Path,
1287    incremental_snapshot_archives_dir: &Path,
1288    maximum_local_snapshot_age: Slot,
1289    full_snapshot_hash: (Slot, Hash),
1290    incremental_snapshot_hash: Option<(Slot, Hash)>,
1291    incremental_snapshot_fetch: bool,
1292) -> bool {
1293    let cluster_snapshot_slot = incremental_snapshot_hash
1294        .map(|(slot, _)| slot)
1295        .unwrap_or(full_snapshot_hash.0);
1296
1297    match get_highest_local_snapshot_hash(
1298        full_snapshot_archives_dir,
1299        incremental_snapshot_archives_dir,
1300        incremental_snapshot_fetch,
1301    ) {
1302        None => {
1303            info!(
1304                "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1305                 local snapshot."
1306            );
1307            false
1308        }
1309        Some((local_snapshot_slot, _)) => {
1310            if local_snapshot_slot
1311                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1312            {
1313                info!(
1314                    "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1315                     a snapshot for slot {cluster_snapshot_slot}."
1316                );
1317                true
1318            } else {
1319                info!(
1320                    "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1321                     newer snapshot for slot {cluster_snapshot_slot}."
1322                );
1323                false
1324            }
1325        }
1326    }
1327}
1328
1329/// Get the node's highest snapshot hashes from CRDS
1330fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1331    cluster_info.get_snapshot_hashes_for_node(node).map(
1332        |crds_data::SnapshotHashes {
1333             full, incremental, ..
1334         }| {
1335            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1336            SnapshotHash {
1337                full,
1338                incr: highest_incremental_snapshot_hash,
1339            }
1340        },
1341    )
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346    use super::*;
1347
1348    impl PeerSnapshotHash {
1349        fn new(
1350            rpc_contact_info: ContactInfo,
1351            full_snapshot_hash: (Slot, Hash),
1352            incremental_snapshot_hash: Option<(Slot, Hash)>,
1353        ) -> Self {
1354            Self {
1355                rpc_contact_info,
1356                snapshot_hash: SnapshotHash {
1357                    full: full_snapshot_hash,
1358                    incr: incremental_snapshot_hash,
1359                },
1360            }
1361        }
1362    }
1363
1364    fn default_contact_info_for_tests() -> ContactInfo {
1365        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1366    }
1367
1368    #[test]
1369    fn test_build_known_snapshot_hashes() {
1370        solana_logger::setup();
1371        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1372        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1373
1374        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1375        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1376
1377        // simulate a set of known validators with various snapshot hashes
1378        let oracle = {
1379            let mut oracle = HashMap::new();
1380
1381            for (full, incr) in [
1382                // only a full snapshot
1383                (full_snapshot_hash1, None),
1384                // full and incremental snapshots
1385                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1386                // full and incremental snapshots, with different incremental hash
1387                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1388                // ...and now with different full hashes
1389                (full_snapshot_hash2, None),
1390                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1391                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1392            ] {
1393                // also simulate multiple known validators having the same snapshot hashes
1394                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1395                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1396                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1397            }
1398
1399            // no snapshots at all
1400            oracle.insert(Pubkey::new_unique(), None);
1401            oracle.insert(Pubkey::new_unique(), None);
1402            oracle.insert(Pubkey::new_unique(), None);
1403
1404            oracle
1405        };
1406
1407        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1408
1409        let known_snapshot_hashes =
1410            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1411
1412        // ensure there's only one full snapshot hash, since they all used the same slot and there
1413        // can be only one snapshot hash per slot
1414        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1415        assert_eq!(known_full_snapshot_hashes.len(), 1);
1416        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1417
1418        // and for the same reasons, ensure there is only one incremental snapshot hash
1419        let known_incremental_snapshot_hashes =
1420            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1421        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1422        let known_incremental_snapshot_hash =
1423            known_incremental_snapshot_hashes.iter().next().unwrap();
1424
1425        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1426        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1427        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1428        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1429        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1430        // "first" when building the known snapshot hashes.
1431        assert!(
1432            known_full_snapshot_hash == &full_snapshot_hash1
1433                || known_full_snapshot_hash == &full_snapshot_hash2
1434        );
1435        assert!(
1436            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1437                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1438        );
1439    }
1440
1441    #[test]
1442    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1443        let known_snapshot_hashes: KnownSnapshotHashes = [
1444            (
1445                (200_000, Hash::new_unique()),
1446                [
1447                    (200_200, Hash::new_unique()),
1448                    (200_400, Hash::new_unique()),
1449                    (200_600, Hash::new_unique()),
1450                    (200_800, Hash::new_unique()),
1451                ]
1452                .iter()
1453                .cloned()
1454                .collect(),
1455            ),
1456            (
1457                (300_000, Hash::new_unique()),
1458                [
1459                    (300_200, Hash::new_unique()),
1460                    (300_400, Hash::new_unique()),
1461                    (300_600, Hash::new_unique()),
1462                ]
1463                .iter()
1464                .cloned()
1465                .collect(),
1466            ),
1467        ]
1468        .iter()
1469        .cloned()
1470        .collect();
1471
1472        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1473        let known_full_snapshot_hash = known_snapshot_hash.0;
1474        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1475
1476        let contact_info = default_contact_info_for_tests();
1477        let peer_snapshot_hashes = vec![
1478            // bad full snapshot hash, no incremental snapshot hash
1479            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1480            // bad everything
1481            PeerSnapshotHash::new(
1482                contact_info.clone(),
1483                (111_000, Hash::default()),
1484                Some((111_111, Hash::default())),
1485            ),
1486            // good full snapshot hash, no incremental snapshot hash
1487            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1488            // bad full snapshot hash, good (not possible) incremental snapshot hash
1489            PeerSnapshotHash::new(
1490                contact_info.clone(),
1491                (111_000, Hash::default()),
1492                Some(*known_incremental_snapshot_hash),
1493            ),
1494            // good full snapshot hash, bad incremental snapshot hash
1495            PeerSnapshotHash::new(
1496                contact_info.clone(),
1497                *known_full_snapshot_hash,
1498                Some((111_111, Hash::default())),
1499            ),
1500            // good everything
1501            PeerSnapshotHash::new(
1502                contact_info.clone(),
1503                *known_full_snapshot_hash,
1504                Some(*known_incremental_snapshot_hash),
1505            ),
1506        ];
1507
1508        let expected = vec![
1509            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1510            PeerSnapshotHash::new(
1511                contact_info,
1512                *known_full_snapshot_hash,
1513                Some(*known_incremental_snapshot_hash),
1514            ),
1515        ];
1516        let mut actual = peer_snapshot_hashes;
1517        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1518            &known_snapshot_hashes,
1519            &mut actual,
1520        );
1521        assert_eq!(expected, actual);
1522    }
1523
1524    #[test]
1525    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1526        let contact_info = default_contact_info_for_tests();
1527        let peer_snapshot_hashes = vec![
1528            // old
1529            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1530            PeerSnapshotHash::new(
1531                contact_info.clone(),
1532                (100_000, Hash::default()),
1533                Some((100_100, Hash::default())),
1534            ),
1535            PeerSnapshotHash::new(
1536                contact_info.clone(),
1537                (100_000, Hash::default()),
1538                Some((100_200, Hash::default())),
1539            ),
1540            PeerSnapshotHash::new(
1541                contact_info.clone(),
1542                (100_000, Hash::default()),
1543                Some((100_300, Hash::default())),
1544            ),
1545            // new
1546            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1547            PeerSnapshotHash::new(
1548                contact_info.clone(),
1549                (200_000, Hash::default()),
1550                Some((200_100, Hash::default())),
1551            ),
1552            PeerSnapshotHash::new(
1553                contact_info.clone(),
1554                (200_000, Hash::default()),
1555                Some((200_200, Hash::default())),
1556            ),
1557            PeerSnapshotHash::new(
1558                contact_info.clone(),
1559                (200_000, Hash::default()),
1560                Some((200_300, Hash::default())),
1561            ),
1562        ];
1563
1564        let expected = vec![
1565            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1566            PeerSnapshotHash::new(
1567                contact_info.clone(),
1568                (200_000, Hash::default()),
1569                Some((200_100, Hash::default())),
1570            ),
1571            PeerSnapshotHash::new(
1572                contact_info.clone(),
1573                (200_000, Hash::default()),
1574                Some((200_200, Hash::default())),
1575            ),
1576            PeerSnapshotHash::new(
1577                contact_info,
1578                (200_000, Hash::default()),
1579                Some((200_300, Hash::default())),
1580            ),
1581        ];
1582        let mut actual = peer_snapshot_hashes;
1583        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1584        assert_eq!(expected, actual);
1585    }
1586
1587    #[test]
1588    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1589        let contact_info = default_contact_info_for_tests();
1590        let peer_snapshot_hashes = vec![
1591            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1592            PeerSnapshotHash::new(
1593                contact_info.clone(),
1594                (200_000, Hash::default()),
1595                Some((200_100, Hash::default())),
1596            ),
1597            PeerSnapshotHash::new(
1598                contact_info.clone(),
1599                (200_000, Hash::default()),
1600                Some((200_200, Hash::default())),
1601            ),
1602            PeerSnapshotHash::new(
1603                contact_info.clone(),
1604                (200_000, Hash::default()),
1605                Some((200_300, Hash::default())),
1606            ),
1607            PeerSnapshotHash::new(
1608                contact_info.clone(),
1609                (200_000, Hash::default()),
1610                Some((200_010, Hash::default())),
1611            ),
1612            PeerSnapshotHash::new(
1613                contact_info.clone(),
1614                (200_000, Hash::default()),
1615                Some((200_020, Hash::default())),
1616            ),
1617            PeerSnapshotHash::new(
1618                contact_info.clone(),
1619                (200_000, Hash::default()),
1620                Some((200_030, Hash::default())),
1621            ),
1622        ];
1623
1624        let expected = vec![PeerSnapshotHash::new(
1625            contact_info,
1626            (200_000, Hash::default()),
1627            Some((200_300, Hash::default())),
1628        )];
1629        let mut actual = peer_snapshot_hashes;
1630        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1631        assert_eq!(expected, actual);
1632    }
1633
1634    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1635    /// there are *zero* peers with incremental snapshots.
1636    #[test]
1637    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1638        let contact_info = default_contact_info_for_tests();
1639        let peer_snapshot_hashes = vec![
1640            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1641            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1642            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1643        ];
1644
1645        let expected = peer_snapshot_hashes.clone();
1646        let mut actual = peer_snapshot_hashes;
1647        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1648        assert_eq!(expected, actual);
1649    }
1650
1651    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1652    /// peer snapshot hashes input is empty.
1653    #[test]
1654    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1655        {
1656            let mut actual = vec![];
1657            let expected = actual.clone();
1658            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1659            assert_eq!(expected, actual);
1660        }
1661        {
1662            let mut actual = vec![];
1663            let expected = actual.clone();
1664            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1665            assert_eq!(expected, actual);
1666        }
1667    }
1668}