agave_validator/
bootstrap.rs

1use {
2    itertools::Itertools,
3    log::*,
4    rand::{seq::SliceRandom, thread_rng, Rng},
5    rayon::prelude::*,
6    solana_clock::Slot,
7    solana_commitment_config::CommitmentConfig,
8    solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
9    solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
10    solana_genesis_utils::download_then_check_genesis_hash,
11    solana_gossip::{
12        cluster_info::{ClusterInfo, Node},
13        contact_info::{ContactInfo, Protocol},
14        crds_data,
15        gossip_service::GossipService,
16    },
17    solana_hash::Hash,
18    solana_keypair::Keypair,
19    solana_metrics::datapoint_info,
20    solana_pubkey::Pubkey,
21    solana_rpc_client::rpc_client::RpcClient,
22    solana_runtime::{
23        snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
24        snapshot_utils,
25    },
26    solana_signer::Signer,
27    solana_streamer::socket::SocketAddrSpace,
28    std::{
29        collections::{hash_map::RandomState, HashMap, HashSet},
30        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
31        path::Path,
32        process::exit,
33        sync::{
34            atomic::{AtomicBool, Ordering},
35            Arc, RwLock,
36        },
37        time::{Duration, Instant},
38    },
39    thiserror::Error,
40};
41
42/// When downloading snapshots, wait at most this long for snapshot hashes from
43/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
44/// known validator.
45const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
46/// If we don't have any alternative peers after this long, better off trying
47/// blacklisted peers again.
48const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
49/// If we can't find a good snapshot download candidate after this time, just
50/// give up.
51const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
52/// If we haven't found any RPC peers after this time, just give up.
53const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
54
55pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
56
57pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
58
59#[derive(Debug)]
60pub struct RpcBootstrapConfig {
61    pub no_genesis_fetch: bool,
62    pub no_snapshot_fetch: bool,
63    pub only_known_rpc: bool,
64    pub max_genesis_archive_unpacked_size: u64,
65    pub check_vote_account: Option<String>,
66    pub incremental_snapshot_fetch: bool,
67}
68
69fn verify_reachable_ports(
70    node: &Node,
71    cluster_entrypoint: &ContactInfo,
72    validator_config: &ValidatorConfig,
73    socket_addr_space: &SocketAddrSpace,
74) -> bool {
75    let verify_address = |addr: &Option<SocketAddr>| -> bool {
76        addr.as_ref()
77            .map(|addr| socket_addr_space.check(addr))
78            .unwrap_or_default()
79    };
80    let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
81
82    if verify_address(&node.info.serve_repair(Protocol::UDP)) {
83        udp_sockets.push(&node.sockets.serve_repair);
84    }
85    if verify_address(&node.info.tpu(Protocol::UDP)) {
86        udp_sockets.extend(node.sockets.tpu.iter());
87        udp_sockets.extend(&node.sockets.tpu_quic);
88    }
89    if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
90        udp_sockets.extend(node.sockets.tpu_forwards.iter());
91        udp_sockets.extend(&node.sockets.tpu_forwards_quic);
92    }
93    if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
94        udp_sockets.extend(node.sockets.tpu_vote.iter());
95    }
96    if verify_address(&node.info.tvu(Protocol::UDP)) {
97        udp_sockets.extend(node.sockets.tvu.iter());
98        udp_sockets.extend(node.sockets.broadcast.iter());
99        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
100    }
101    if !solana_net_utils::verify_all_reachable_udp(
102        &cluster_entrypoint.gossip().unwrap(),
103        &udp_sockets,
104    ) {
105        return false;
106    }
107
108    let mut tcp_listeners = vec![];
109    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
110        for (purpose, bind_addr, public_addr) in &[
111            ("RPC", rpc_addr, node.info.rpc()),
112            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
113        ] {
114            if verify_address(public_addr) {
115                tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
116                    error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
117                    exit(1);
118                }));
119            }
120        }
121    }
122
123    if let Some(ip_echo) = &node.sockets.ip_echo {
124        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
125        tcp_listeners.push(ip_echo);
126    }
127
128    solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
129}
130
131fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
132    if let Some(known_validators) = known_validators {
133        known_validators.contains(id)
134    } else {
135        false
136    }
137}
138
139fn start_gossip_node(
140    identity_keypair: Arc<Keypair>,
141    cluster_entrypoints: &[ContactInfo],
142    ledger_path: &Path,
143    gossip_addr: &SocketAddr,
144    gossip_socket: UdpSocket,
145    expected_shred_version: u16,
146    gossip_validators: Option<HashSet<Pubkey>>,
147    should_check_duplicate_instance: bool,
148    socket_addr_space: SocketAddrSpace,
149) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
150    let contact_info = ClusterInfo::gossip_contact_info(
151        identity_keypair.pubkey(),
152        *gossip_addr,
153        expected_shred_version,
154    );
155    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
156    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
157    cluster_info.restore_contact_info(ledger_path, 0);
158    let cluster_info = Arc::new(cluster_info);
159
160    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
161    let gossip_service = GossipService::new(
162        &cluster_info,
163        None,
164        gossip_socket,
165        gossip_validators,
166        should_check_duplicate_instance,
167        None,
168        gossip_exit_flag.clone(),
169    );
170    (cluster_info, gossip_exit_flag, gossip_service)
171}
172
173fn get_rpc_peers(
174    cluster_info: &ClusterInfo,
175    validator_config: &ValidatorConfig,
176    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
177    blacklist_timeout: &Instant,
178    retry_reason: &mut Option<String>,
179    bootstrap_config: &RpcBootstrapConfig,
180) -> Vec<ContactInfo> {
181    let shred_version = validator_config
182        .expected_shred_version
183        .unwrap_or_else(|| cluster_info.my_shred_version());
184
185    info!(
186        "Searching for an RPC service with shred version {shred_version}{}...",
187        retry_reason
188            .as_ref()
189            .map(|s| format!(" (Retrying: {s})"))
190            .unwrap_or_default()
191    );
192
193    let mut rpc_peers = cluster_info
194        .all_rpc_peers()
195        .into_iter()
196        .filter(|contact_info| contact_info.shred_version() == shred_version)
197        .collect::<Vec<_>>();
198
199    if bootstrap_config.only_known_rpc {
200        rpc_peers.retain(|rpc_peer| {
201            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
202        });
203    }
204
205    let rpc_peers_total = rpc_peers.len();
206
207    // Filter out blacklisted nodes
208    let rpc_peers: Vec<_> = rpc_peers
209        .into_iter()
210        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
211        .collect();
212    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
213    let rpc_known_peers = rpc_peers
214        .iter()
215        .filter(|rpc_peer| {
216            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
217        })
218        .count();
219
220    info!(
221        "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
222         {rpc_peers_blacklisted} blacklisted"
223    );
224
225    if rpc_peers_blacklisted == rpc_peers_total {
226        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
227            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
228        {
229            // All nodes are blacklisted and no additional nodes recently discovered.
230            // Remove all nodes from the blacklist and try them again.
231            blacklisted_rpc_nodes.clear();
232            Some("Blacklist timeout expired".to_owned())
233        } else {
234            Some("Wait for known rpc peers".to_owned())
235        };
236        return vec![];
237    }
238    rpc_peers
239}
240
241fn check_vote_account(
242    rpc_client: &RpcClient,
243    identity_pubkey: &Pubkey,
244    vote_account_address: &Pubkey,
245    authorized_voter_pubkeys: &[Pubkey],
246) -> Result<(), String> {
247    let vote_account = rpc_client
248        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
249        .map_err(|err| format!("failed to fetch vote account: {err}"))?
250        .value
251        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
252
253    if vote_account.owner != solana_vote_program::id() {
254        return Err(format!(
255            "not a vote account (owned by {}): {}",
256            vote_account.owner, vote_account_address
257        ));
258    }
259
260    let identity_account = rpc_client
261        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
262        .map_err(|err| format!("failed to fetch identity account: {err}"))?
263        .value
264        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
265
266    let vote_state = solana_vote_program::vote_state::from(&vote_account);
267    if let Some(vote_state) = vote_state {
268        if vote_state.authorized_voters().is_empty() {
269            return Err("Vote account not yet initialized".to_string());
270        }
271
272        if vote_state.node_pubkey != *identity_pubkey {
273            return Err(format!(
274                "vote account's identity ({}) does not match the validator's identity {}).",
275                vote_state.node_pubkey, identity_pubkey
276            ));
277        }
278
279        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
280            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
281                return Err(format!(
282                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
283                ));
284            }
285        }
286    } else {
287        return Err(format!(
288            "invalid vote account data for {vote_account_address}"
289        ));
290    }
291
292    // Maybe we can calculate minimum voting fee; rather than 1 lamport
293    if identity_account.lamports <= 1 {
294        return Err(format!(
295            "underfunded identity account ({}): only {} lamports available",
296            identity_pubkey, identity_account.lamports
297        ));
298    }
299
300    Ok(())
301}
302
303#[derive(Error, Debug)]
304pub enum GetRpcNodeError {
305    #[error("Unable to find any RPC peers")]
306    NoRpcPeersFound,
307
308    #[error("Giving up, did not get newer snapshots from the cluster")]
309    NoNewerSnapshots,
310}
311
312/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
313/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
314/// snapshots to download.
315#[derive(Debug)]
316struct GetRpcNodeResult {
317    rpc_contact_info: ContactInfo,
318    snapshot_hash: Option<SnapshotHash>,
319}
320
321/// Struct to wrap the peers & snapshot hashes together.
322#[derive(Debug, PartialEq, Eq, Clone)]
323struct PeerSnapshotHash {
324    rpc_contact_info: ContactInfo,
325    snapshot_hash: SnapshotHash,
326}
327
328/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
329/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
330#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
331pub struct SnapshotHash {
332    full: (Slot, Hash),
333    incr: Option<(Slot, Hash)>,
334}
335
336pub fn fail_rpc_node(
337    err: String,
338    known_validators: &Option<HashSet<Pubkey, RandomState>>,
339    rpc_id: &Pubkey,
340    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
341) {
342    warn!("{err}");
343    if let Some(ref known_validators) = known_validators {
344        if known_validators.contains(rpc_id) {
345            return;
346        }
347    }
348
349    info!("Excluding {rpc_id} as a future RPC candidate");
350    blacklisted_rpc_nodes.insert(*rpc_id);
351}
352
353fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
354    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
355    cluster_info.save_contact_info();
356    gossip_exit_flag.store(true, Ordering::Relaxed);
357    gossip_service.join().unwrap();
358}
359
360#[allow(clippy::too_many_arguments)]
361pub fn attempt_download_genesis_and_snapshot(
362    rpc_contact_info: &ContactInfo,
363    ledger_path: &Path,
364    validator_config: &mut ValidatorConfig,
365    bootstrap_config: &RpcBootstrapConfig,
366    use_progress_bar: bool,
367    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
368    rpc_client: &RpcClient,
369    full_snapshot_archives_dir: &Path,
370    incremental_snapshot_archives_dir: &Path,
371    maximum_local_snapshot_age: Slot,
372    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
373    minimal_snapshot_download_speed: f32,
374    maximum_snapshot_download_abort: u64,
375    download_abort_count: &mut u64,
376    snapshot_hash: Option<SnapshotHash>,
377    identity_keypair: &Arc<Keypair>,
378    vote_account: &Pubkey,
379    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
380) -> Result<(), String> {
381    download_then_check_genesis_hash(
382        &rpc_contact_info
383            .rpc()
384            .ok_or_else(|| String::from("Invalid RPC address"))?,
385        ledger_path,
386        &mut validator_config.expected_genesis_hash,
387        bootstrap_config.max_genesis_archive_unpacked_size,
388        bootstrap_config.no_genesis_fetch,
389        use_progress_bar,
390        rpc_client,
391    )?;
392
393    if let Some(gossip) = gossip.take() {
394        shutdown_gossip_service(gossip);
395    }
396
397    let rpc_client_slot = rpc_client
398        .get_slot_with_commitment(CommitmentConfig::finalized())
399        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
400    info!("RPC node root slot: {rpc_client_slot}");
401
402    download_snapshots(
403        full_snapshot_archives_dir,
404        incremental_snapshot_archives_dir,
405        validator_config,
406        bootstrap_config,
407        use_progress_bar,
408        maximum_local_snapshot_age,
409        start_progress,
410        minimal_snapshot_download_speed,
411        maximum_snapshot_download_abort,
412        download_abort_count,
413        snapshot_hash,
414        rpc_contact_info,
415    )?;
416
417    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
418        let rpc_client = RpcClient::new(url);
419        check_vote_account(
420            &rpc_client,
421            &identity_keypair.pubkey(),
422            vote_account,
423            &authorized_voter_keypairs
424                .read()
425                .unwrap()
426                .iter()
427                .map(|k| k.pubkey())
428                .collect::<Vec<_>>(),
429        )
430        .unwrap_or_else(|err| {
431            // Consider failures here to be more likely due to user error (eg,
432            // incorrect `agave-validator` command-line arguments) rather than the
433            // RPC node failing.
434            //
435            // Power users can always use the `--no-check-vote-account` option to
436            // bypass this check entirely
437            error!("{err}");
438            exit(1);
439        });
440    }
441    Ok(())
442}
443
444/// simple ping helper function which returns the time to connect
445fn ping(addr: &SocketAddr) -> Option<Duration> {
446    let start = Instant::now();
447    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
448        Ok(_) => Some(start.elapsed()),
449        Err(_) => None,
450    }
451}
452
453// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
454// used for downloading latest snapshots and/or the genesis block. Guaranteed to
455// find at least one viable node or terminate the process.
456fn get_vetted_rpc_nodes(
457    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
458    cluster_info: &Arc<ClusterInfo>,
459    validator_config: &ValidatorConfig,
460    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
461    bootstrap_config: &RpcBootstrapConfig,
462) {
463    while vetted_rpc_nodes.is_empty() {
464        let rpc_node_details = match get_rpc_nodes(
465            cluster_info,
466            validator_config,
467            blacklisted_rpc_nodes,
468            bootstrap_config,
469        ) {
470            Ok(rpc_node_details) => rpc_node_details,
471            Err(err) => {
472                error!(
473                    "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
474                    `--no-port-check`, or adjusting `--known-validator ...` arguments as \
475                    applicable"
476                );
477                exit(1);
478            }
479        };
480
481        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
482        vetted_rpc_nodes.extend(
483            rpc_node_details
484                .into_par_iter()
485                .filter_map(|rpc_node_details| {
486                    let GetRpcNodeResult {
487                        rpc_contact_info,
488                        snapshot_hash,
489                    } = rpc_node_details;
490
491                    info!(
492                        "Using RPC service from node {}: {:?}",
493                        rpc_contact_info.pubkey(),
494                        rpc_contact_info.rpc()
495                    );
496
497                    let rpc_addr = rpc_contact_info.rpc()?;
498                    let ping_time = ping(&rpc_addr);
499
500                    let rpc_client =
501                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
502
503                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
504                })
505                .filter(
506                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
507                        .get_version()
508                    {
509                        Ok(rpc_version) => {
510                            if let Some(ping_time) = ping_time {
511                                info!(
512                                    "RPC node version: {} Ping: {}ms",
513                                    rpc_version.solana_core,
514                                    ping_time.as_millis()
515                                );
516                                true
517                            } else {
518                                fail_rpc_node(
519                                    "Failed to ping RPC".to_string(),
520                                    &validator_config.known_validators,
521                                    rpc_contact_info.pubkey(),
522                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
523                                );
524                                false
525                            }
526                        }
527                        Err(err) => {
528                            fail_rpc_node(
529                                format!("Failed to get RPC node version: {err}"),
530                                &validator_config.known_validators,
531                                rpc_contact_info.pubkey(),
532                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
533                            );
534                            false
535                        }
536                    },
537                )
538                .collect::<Vec<(
539                    ContactInfo,
540                    Option<SnapshotHash>,
541                    RpcClient,
542                    Option<Duration>,
543                )>>()
544                .into_iter()
545                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
546                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
547                    (rpc_contact_info, snapshot_hash, rpc_client)
548                })
549                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
550        );
551        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
552    }
553}
554
555#[allow(clippy::too_many_arguments)]
556pub fn rpc_bootstrap(
557    node: &Node,
558    identity_keypair: &Arc<Keypair>,
559    ledger_path: &Path,
560    full_snapshot_archives_dir: &Path,
561    incremental_snapshot_archives_dir: &Path,
562    vote_account: &Pubkey,
563    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
564    cluster_entrypoints: &[ContactInfo],
565    validator_config: &mut ValidatorConfig,
566    bootstrap_config: RpcBootstrapConfig,
567    do_port_check: bool,
568    use_progress_bar: bool,
569    maximum_local_snapshot_age: Slot,
570    should_check_duplicate_instance: bool,
571    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
572    minimal_snapshot_download_speed: f32,
573    maximum_snapshot_download_abort: u64,
574    socket_addr_space: SocketAddrSpace,
575) {
576    if do_port_check {
577        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
578        order.shuffle(&mut thread_rng());
579        if order.into_iter().all(|i| {
580            !verify_reachable_ports(
581                node,
582                &cluster_entrypoints[i],
583                validator_config,
584                &socket_addr_space,
585            )
586        }) {
587            exit(1);
588        }
589    }
590
591    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
592        return;
593    }
594
595    let total_snapshot_download_time = Instant::now();
596    let mut get_rpc_nodes_time = Duration::new(0, 0);
597    let mut snapshot_download_time = Duration::new(0, 0);
598    let mut blacklisted_rpc_nodes = HashSet::new();
599    let mut gossip = None;
600    let mut vetted_rpc_nodes = vec![];
601    let mut download_abort_count = 0;
602    loop {
603        if gossip.is_none() {
604            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
605
606            gossip = Some(start_gossip_node(
607                identity_keypair.clone(),
608                cluster_entrypoints,
609                ledger_path,
610                &node
611                    .info
612                    .gossip()
613                    .expect("Operator must spin up node with valid gossip address"),
614                node.sockets.gossip.try_clone().unwrap(),
615                validator_config
616                    .expected_shred_version
617                    .expect("expected_shred_version should not be None"),
618                validator_config.gossip_validators.clone(),
619                should_check_duplicate_instance,
620                socket_addr_space,
621            ));
622        }
623
624        let get_rpc_nodes_start = Instant::now();
625        get_vetted_rpc_nodes(
626            &mut vetted_rpc_nodes,
627            &gossip.as_ref().unwrap().0,
628            validator_config,
629            &mut blacklisted_rpc_nodes,
630            &bootstrap_config,
631        );
632        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
633        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
634
635        let snapshot_download_start = Instant::now();
636        let download_result = attempt_download_genesis_and_snapshot(
637            &rpc_contact_info,
638            ledger_path,
639            validator_config,
640            &bootstrap_config,
641            use_progress_bar,
642            &mut gossip,
643            &rpc_client,
644            full_snapshot_archives_dir,
645            incremental_snapshot_archives_dir,
646            maximum_local_snapshot_age,
647            start_progress,
648            minimal_snapshot_download_speed,
649            maximum_snapshot_download_abort,
650            &mut download_abort_count,
651            snapshot_hash,
652            identity_keypair,
653            vote_account,
654            authorized_voter_keypairs.clone(),
655        );
656        snapshot_download_time += snapshot_download_start.elapsed();
657        match download_result {
658            Ok(()) => break,
659            Err(err) => {
660                fail_rpc_node(
661                    err,
662                    &validator_config.known_validators,
663                    rpc_contact_info.pubkey(),
664                    &mut blacklisted_rpc_nodes,
665                );
666            }
667        }
668    }
669
670    if let Some(gossip) = gossip.take() {
671        shutdown_gossip_service(gossip);
672    }
673
674    datapoint_info!(
675        "bootstrap-snapshot-download",
676        (
677            "total_time_secs",
678            total_snapshot_download_time.elapsed().as_secs(),
679            i64
680        ),
681        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
682        (
683            "snapshot_download_time_secs",
684            snapshot_download_time.as_secs(),
685            i64
686        ),
687        ("download_abort_count", download_abort_count, i64),
688        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
689    );
690}
691
692/// Get RPC peer node candidates to download from.
693///
694/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
695fn get_rpc_nodes(
696    cluster_info: &ClusterInfo,
697    validator_config: &ValidatorConfig,
698    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
699    bootstrap_config: &RpcBootstrapConfig,
700) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
701    let mut blacklist_timeout = Instant::now();
702    let mut get_rpc_peers_timout = Instant::now();
703    let mut newer_cluster_snapshot_timeout = None;
704    let mut retry_reason = None;
705    loop {
706        // Give gossip some time to populate and not spin on grabbing the crds lock
707        std::thread::sleep(Duration::from_secs(1));
708        info!("\n{}", cluster_info.rpc_info_trace());
709
710        let rpc_peers = get_rpc_peers(
711            cluster_info,
712            validator_config,
713            blacklisted_rpc_nodes,
714            &blacklist_timeout,
715            &mut retry_reason,
716            bootstrap_config,
717        );
718        if rpc_peers.is_empty() {
719            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
720                return Err(GetRpcNodeError::NoRpcPeersFound);
721            }
722            continue;
723        }
724
725        // Reset timeouts if we found any viable RPC peers.
726        blacklist_timeout = Instant::now();
727        get_rpc_peers_timout = Instant::now();
728        if bootstrap_config.no_snapshot_fetch {
729            let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
730            return Ok(vec![GetRpcNodeResult {
731                rpc_contact_info: random_peer.clone(),
732                snapshot_hash: None,
733            }]);
734        }
735
736        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
737            .as_ref()
738            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
739            .unwrap_or(true)
740        {
741            KnownValidatorsToWaitFor::All
742        } else {
743            KnownValidatorsToWaitFor::Any
744        };
745        let peer_snapshot_hashes = get_peer_snapshot_hashes(
746            cluster_info,
747            &rpc_peers,
748            validator_config.known_validators.as_ref(),
749            known_validators_to_wait_for,
750            bootstrap_config.incremental_snapshot_fetch,
751        );
752        if peer_snapshot_hashes.is_empty() {
753            match newer_cluster_snapshot_timeout {
754                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
755                Some(newer_cluster_snapshot_timeout) => {
756                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
757                        return Err(GetRpcNodeError::NoNewerSnapshots);
758                    }
759                }
760            }
761            retry_reason = Some("No snapshots available".to_owned());
762            continue;
763        } else {
764            let rpc_peers = peer_snapshot_hashes
765                .iter()
766                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
767                .collect::<Vec<_>>();
768            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
769            info!(
770                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
771                final_snapshot_hash
772                    .incr
773                    .map(|(slot, _hash)| slot)
774                    .unwrap_or(final_snapshot_hash.full.0),
775                rpc_peers.len(),
776                if rpc_peers.len() > 1 { "s" } else { "" },
777                rpc_peers,
778            );
779            let rpc_node_results = peer_snapshot_hashes
780                .iter()
781                .map(|peer_snapshot_hash| GetRpcNodeResult {
782                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
783                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
784                })
785                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
786                .collect();
787            return Ok(rpc_node_results);
788        }
789    }
790}
791
792/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
793/// snapshot or an incremental snapshot.
794fn get_highest_local_snapshot_hash(
795    full_snapshot_archives_dir: impl AsRef<Path>,
796    incremental_snapshot_archives_dir: impl AsRef<Path>,
797    incremental_snapshot_fetch: bool,
798) -> Option<(Slot, Hash)> {
799    snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
800        .and_then(|full_snapshot_info| {
801            if incremental_snapshot_fetch {
802                snapshot_utils::get_highest_incremental_snapshot_archive_info(
803                    incremental_snapshot_archives_dir,
804                    full_snapshot_info.slot(),
805                )
806                .map(|incremental_snapshot_info| {
807                    (
808                        incremental_snapshot_info.slot(),
809                        *incremental_snapshot_info.hash(),
810                    )
811                })
812            } else {
813                None
814            }
815            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
816        })
817        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
818}
819
820/// Get peer snapshot hashes
821///
822/// The result is a vector of peers with snapshot hashes that:
823/// 1. match a snapshot hash from the known validators
824/// 2. have the highest incremental snapshot slot
825/// 3. have the highest full snapshot slot of (2)
826fn get_peer_snapshot_hashes(
827    cluster_info: &ClusterInfo,
828    rpc_peers: &[ContactInfo],
829    known_validators: Option<&HashSet<Pubkey>>,
830    known_validators_to_wait_for: KnownValidatorsToWaitFor,
831    incremental_snapshot_fetch: bool,
832) -> Vec<PeerSnapshotHash> {
833    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
834    if let Some(known_validators) = known_validators {
835        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
836            cluster_info,
837            known_validators,
838            known_validators_to_wait_for,
839        );
840        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
841            &known_snapshot_hashes,
842            &mut peer_snapshot_hashes,
843        );
844    }
845    if incremental_snapshot_fetch {
846        // Only filter by highest incremental snapshot slot if we're actually going to download an
847        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
848        // being selected.  For example, if there are two peer snapshot hashes:
849        // (A) full snapshot slot: 100, incremental snapshot slot: 160
850        // (B) full snapshot slot: 150, incremental snapshot slot: None
851        // Then (A) has the highest overall snapshot slot.  But if we're not downloading and
852        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
853        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
854            &mut peer_snapshot_hashes,
855        );
856    }
857    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
858
859    peer_snapshot_hashes
860}
861
862/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
863/// is treated as the base for its set of incremental snapshot hashes.
864type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
865
866/// Get the snapshot hashes from known validators.
867///
868/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
869/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
870/// queried for their individual snapshot hashes, their results will be checked against this
871/// map to verify correctness.
872///
873/// NOTE: Only a single snashot hash is allowed per slot.  If somehow two known validators have
874/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
875/// This applies to both full and incremental snapshot hashes.
876fn get_snapshot_hashes_from_known_validators(
877    cluster_info: &ClusterInfo,
878    known_validators: &HashSet<Pubkey>,
879    known_validators_to_wait_for: KnownValidatorsToWaitFor,
880) -> KnownSnapshotHashes {
881    // Get the snapshot hashes for a node from CRDS
882    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
883
884    if !do_known_validators_have_all_snapshot_hashes(
885        known_validators,
886        known_validators_to_wait_for,
887        get_snapshot_hashes_for_node,
888    ) {
889        debug!(
890            "Snapshot hashes have not been discovered from known validators. This likely means \
891             the gossip tables are not fully populated. We will sleep and retry..."
892        );
893        return KnownSnapshotHashes::default();
894    }
895
896    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
897}
898
899/// Check if we can discover snapshot hashes for the known validators.
900///
901/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
902/// process will download snapshots.
903///
904/// This function will return false if we do not yet have snapshot hashes from known validators;
905/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
906/// based on the `KnownValidatorsToWaitFor` parameter.
907fn do_known_validators_have_all_snapshot_hashes<'a>(
908    known_validators: impl IntoIterator<Item = &'a Pubkey>,
909    known_validators_to_wait_for: KnownValidatorsToWaitFor,
910    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
911) -> bool {
912    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
913
914    match known_validators_to_wait_for {
915        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
916        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
917    }
918}
919
920/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
921/// of them?
922#[derive(Debug, Copy, Clone, Eq, PartialEq)]
923enum KnownValidatorsToWaitFor {
924    All,
925    Any,
926}
927
928/// Build the known snapshot hashes from a set of nodes.
929///
930/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
931/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
932/// runtime information such as the ClusterInfo or ValidatorConfig.
933fn build_known_snapshot_hashes<'a>(
934    nodes: impl IntoIterator<Item = &'a Pubkey>,
935    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
936) -> KnownSnapshotHashes {
937    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
938
939    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
940    /// but *different* hash as the needle.
941    fn is_any_same_slot_and_different_hash<'a>(
942        needle: &(Slot, Hash),
943        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
944    ) -> bool {
945        haystack
946            .into_iter()
947            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
948    }
949
950    'to_next_node: for node in nodes {
951        let Some(SnapshotHash {
952            full: full_snapshot_hash,
953            incr: incremental_snapshot_hash,
954        }) = get_snapshot_hashes_for_node(node)
955        else {
956            continue 'to_next_node;
957        };
958
959        // Do not add this snapshot hash if there's already a full snapshot hash with the
960        // same slot but with a _different_ hash.
961        // NOTE: Nodes should not produce snapshots at the same slot with _different_
962        // hashes.  So if it happens, keep the first and ignore the rest.
963        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
964            warn!(
965                "Ignoring all snapshot hashes from node {node} since we've seen a different full \
966                 snapshot hash with this slot.\
967                 \nfull snapshot hash: {full_snapshot_hash:?}"
968            );
969            debug!(
970                "known full snapshot hashes: {:#?}",
971                known_snapshot_hashes.keys(),
972            );
973            continue 'to_next_node;
974        }
975
976        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
977        // doesn't already exist.  This is to ensure we don't overwrite existing
978        // incremental snapshot hashes that may be present for this full snapshot hash.
979        let known_incremental_snapshot_hashes =
980            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
981
982        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
983            // Do not add this snapshot hash if there's already an incremental snapshot
984            // hash with the same slot, but with a _different_ hash.
985            // NOTE: Nodes should not produce snapshots at the same slot with _different_
986            // hashes.  So if it happens, keep the first and ignore the rest.
987            if is_any_same_slot_and_different_hash(
988                &incremental_snapshot_hash,
989                known_incremental_snapshot_hashes.iter(),
990            ) {
991                warn!(
992                    "Ignoring incremental snapshot hash from node {node} since we've seen a \
993                     different incremental snapshot hash with this slot.\
994                     \nfull snapshot hash: {full_snapshot_hash:?}\
995                     \nincremental snapshot hash: {incremental_snapshot_hash:?}"
996                );
997                debug!(
998                    "known incremental snapshot hashes based on this slot: {:#?}",
999                    known_incremental_snapshot_hashes.iter(),
1000                );
1001                continue 'to_next_node;
1002            }
1003
1004            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1005        };
1006    }
1007
1008    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1009    known_snapshot_hashes
1010}
1011
1012/// Get snapshot hashes from all eligible peers.
1013///
1014/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1015/// This may be just a full snapshot hash, or a combo full snapshot hash and
1016/// incremental snapshot hash.
1017fn get_eligible_peer_snapshot_hashes(
1018    cluster_info: &ClusterInfo,
1019    rpc_peers: &[ContactInfo],
1020) -> Vec<PeerSnapshotHash> {
1021    let peer_snapshot_hashes = rpc_peers
1022        .iter()
1023        .flat_map(|rpc_peer| {
1024            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1025                PeerSnapshotHash {
1026                    rpc_contact_info: rpc_peer.clone(),
1027                    snapshot_hash,
1028                }
1029            })
1030        })
1031        .collect();
1032
1033    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1034    peer_snapshot_hashes
1035}
1036
1037/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1038fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1039    known_snapshot_hashes: &KnownSnapshotHashes,
1040    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1041) {
1042    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1043        known_snapshot_hashes
1044            .get(&peer_snapshot_hash.snapshot_hash.full)
1045            .map(|known_incremental_hashes| {
1046                if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1047                    // If the peer's full snapshot hashes match, but doesn't have any
1048                    // incremental snapshots, that's fine; keep 'em!
1049                    true
1050                } else {
1051                    known_incremental_hashes
1052                        .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1053                }
1054            })
1055            .unwrap_or(false)
1056    });
1057
1058    trace!(
1059        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1060    );
1061}
1062
1063/// Retain the peer snapshot hashes with the highest full snapshot slot
1064fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1065    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1066) {
1067    let highest_full_snapshot_hash = peer_snapshot_hashes
1068        .iter()
1069        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1070        .max_by_key(|(slot, _hash)| *slot);
1071    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1072        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1073        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1074        // will be nothing to compare against within the `retain()` predicate).
1075        return;
1076    };
1077
1078    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1079        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1080    });
1081
1082    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1083}
1084
1085/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1086fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1087    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1088) {
1089    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1090        .iter()
1091        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1092        .max_by_key(|(slot, _hash)| *slot);
1093
1094    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1095        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1096    });
1097
1098    trace!(
1099        "retain peer snapshot hashes with highest incremental snapshot slot: \
1100         {peer_snapshot_hashes:?}"
1101    );
1102}
1103
1104/// Check to see if we can use our local snapshots, otherwise download newer ones.
1105#[allow(clippy::too_many_arguments)]
1106fn download_snapshots(
1107    full_snapshot_archives_dir: &Path,
1108    incremental_snapshot_archives_dir: &Path,
1109    validator_config: &ValidatorConfig,
1110    bootstrap_config: &RpcBootstrapConfig,
1111    use_progress_bar: bool,
1112    maximum_local_snapshot_age: Slot,
1113    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1114    minimal_snapshot_download_speed: f32,
1115    maximum_snapshot_download_abort: u64,
1116    download_abort_count: &mut u64,
1117    snapshot_hash: Option<SnapshotHash>,
1118    rpc_contact_info: &ContactInfo,
1119) -> Result<(), String> {
1120    if snapshot_hash.is_none() {
1121        return Ok(());
1122    }
1123    let SnapshotHash {
1124        full: full_snapshot_hash,
1125        incr: incremental_snapshot_hash,
1126    } = snapshot_hash.unwrap();
1127
1128    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1129    if should_use_local_snapshot(
1130        full_snapshot_archives_dir,
1131        incremental_snapshot_archives_dir,
1132        maximum_local_snapshot_age,
1133        full_snapshot_hash,
1134        incremental_snapshot_hash,
1135        bootstrap_config.incremental_snapshot_fetch,
1136    ) {
1137        return Ok(());
1138    }
1139
1140    // Check and see if we've already got the full snapshot; if not, download it
1141    if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1142        .into_iter()
1143        .any(|snapshot_archive| {
1144            snapshot_archive.slot() == full_snapshot_hash.0
1145                && snapshot_archive.hash().0 == full_snapshot_hash.1
1146        })
1147    {
1148        info!(
1149            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1150            full_snapshot_hash.0, full_snapshot_hash.1
1151        );
1152    } else {
1153        download_snapshot(
1154            full_snapshot_archives_dir,
1155            incremental_snapshot_archives_dir,
1156            validator_config,
1157            bootstrap_config,
1158            use_progress_bar,
1159            start_progress,
1160            minimal_snapshot_download_speed,
1161            maximum_snapshot_download_abort,
1162            download_abort_count,
1163            rpc_contact_info,
1164            full_snapshot_hash,
1165            SnapshotKind::FullSnapshot,
1166        )?;
1167    }
1168
1169    if bootstrap_config.incremental_snapshot_fetch {
1170        // Check and see if we've already got the incremental snapshot; if not, download it
1171        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1172            if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1173                .into_iter()
1174                .any(|snapshot_archive| {
1175                    snapshot_archive.slot() == incremental_snapshot_hash.0
1176                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1177                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1178                })
1179            {
1180                info!(
1181                    "Incremental snapshot archive already exists locally. Skipping download. \
1182                     slot: {}, hash: {}",
1183                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1184                );
1185            } else {
1186                download_snapshot(
1187                    full_snapshot_archives_dir,
1188                    incremental_snapshot_archives_dir,
1189                    validator_config,
1190                    bootstrap_config,
1191                    use_progress_bar,
1192                    start_progress,
1193                    minimal_snapshot_download_speed,
1194                    maximum_snapshot_download_abort,
1195                    download_abort_count,
1196                    rpc_contact_info,
1197                    incremental_snapshot_hash,
1198                    SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1199                )?;
1200            }
1201        }
1202    }
1203
1204    Ok(())
1205}
1206
1207/// Download a snapshot
1208#[allow(clippy::too_many_arguments)]
1209fn download_snapshot(
1210    full_snapshot_archives_dir: &Path,
1211    incremental_snapshot_archives_dir: &Path,
1212    validator_config: &ValidatorConfig,
1213    bootstrap_config: &RpcBootstrapConfig,
1214    use_progress_bar: bool,
1215    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1216    minimal_snapshot_download_speed: f32,
1217    maximum_snapshot_download_abort: u64,
1218    download_abort_count: &mut u64,
1219    rpc_contact_info: &ContactInfo,
1220    desired_snapshot_hash: (Slot, Hash),
1221    snapshot_kind: SnapshotKind,
1222) -> Result<(), String> {
1223    let maximum_full_snapshot_archives_to_retain = validator_config
1224        .snapshot_config
1225        .maximum_full_snapshot_archives_to_retain;
1226    let maximum_incremental_snapshot_archives_to_retain = validator_config
1227        .snapshot_config
1228        .maximum_incremental_snapshot_archives_to_retain;
1229
1230    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1231        slot: desired_snapshot_hash.0,
1232        rpc_addr: rpc_contact_info
1233            .rpc()
1234            .ok_or_else(|| String::from("Invalid RPC address"))?,
1235    };
1236    let desired_snapshot_hash = (
1237        desired_snapshot_hash.0,
1238        solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1239    );
1240    download_snapshot_archive(
1241        &rpc_contact_info
1242            .rpc()
1243            .ok_or_else(|| String::from("Invalid RPC address"))?,
1244        full_snapshot_archives_dir,
1245        incremental_snapshot_archives_dir,
1246        desired_snapshot_hash,
1247        snapshot_kind,
1248        maximum_full_snapshot_archives_to_retain,
1249        maximum_incremental_snapshot_archives_to_retain,
1250        use_progress_bar,
1251        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1252            debug!("Download progress: {download_progress:?}");
1253            if download_progress.last_throughput < minimal_snapshot_download_speed
1254                && download_progress.notification_count <= 1
1255                && download_progress.percentage_done <= 2_f32
1256                && download_progress.estimated_remaining_time > 60_f32
1257                && *download_abort_count < maximum_snapshot_download_abort
1258            {
1259                if let Some(ref known_validators) = validator_config.known_validators {
1260                    if known_validators.contains(rpc_contact_info.pubkey())
1261                        && known_validators.len() == 1
1262                        && bootstrap_config.only_known_rpc
1263                    {
1264                        warn!(
1265                            "The snapshot download is too slow, throughput: {} < min speed {} \
1266                             bytes/sec, but will NOT abort and try a different node as it is the \
1267                             only known validator and the --only-known-rpc flag is set. Abort \
1268                             count: {}, Progress detail: {:?}",
1269                            download_progress.last_throughput,
1270                            minimal_snapshot_download_speed,
1271                            download_abort_count,
1272                            download_progress,
1273                        );
1274                        return true; // Do not abort download from the one-and-only known validator
1275                    }
1276                }
1277                warn!(
1278                    "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1279                     will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1280                    download_progress.last_throughput,
1281                    minimal_snapshot_download_speed,
1282                    download_abort_count,
1283                    download_progress,
1284                );
1285                *download_abort_count += 1;
1286                false
1287            } else {
1288                true
1289            }
1290        })),
1291    )
1292}
1293
1294/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1295/// will be downloaded.
1296fn should_use_local_snapshot(
1297    full_snapshot_archives_dir: &Path,
1298    incremental_snapshot_archives_dir: &Path,
1299    maximum_local_snapshot_age: Slot,
1300    full_snapshot_hash: (Slot, Hash),
1301    incremental_snapshot_hash: Option<(Slot, Hash)>,
1302    incremental_snapshot_fetch: bool,
1303) -> bool {
1304    let cluster_snapshot_slot = incremental_snapshot_hash
1305        .map(|(slot, _)| slot)
1306        .unwrap_or(full_snapshot_hash.0);
1307
1308    match get_highest_local_snapshot_hash(
1309        full_snapshot_archives_dir,
1310        incremental_snapshot_archives_dir,
1311        incremental_snapshot_fetch,
1312    ) {
1313        None => {
1314            info!(
1315                "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1316                 local snapshot."
1317            );
1318            false
1319        }
1320        Some((local_snapshot_slot, _)) => {
1321            if local_snapshot_slot
1322                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1323            {
1324                info!(
1325                    "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1326                     a snapshot for slot {cluster_snapshot_slot}."
1327                );
1328                true
1329            } else {
1330                info!(
1331                    "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1332                     newer snapshot for slot {cluster_snapshot_slot}."
1333                );
1334                false
1335            }
1336        }
1337    }
1338}
1339
1340/// Get the node's highest snapshot hashes from CRDS
1341fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1342    cluster_info.get_snapshot_hashes_for_node(node).map(
1343        |crds_data::SnapshotHashes {
1344             full, incremental, ..
1345         }| {
1346            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1347            SnapshotHash {
1348                full,
1349                incr: highest_incremental_snapshot_hash,
1350            }
1351        },
1352    )
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357    use super::*;
1358
1359    impl PeerSnapshotHash {
1360        fn new(
1361            rpc_contact_info: ContactInfo,
1362            full_snapshot_hash: (Slot, Hash),
1363            incremental_snapshot_hash: Option<(Slot, Hash)>,
1364        ) -> Self {
1365            Self {
1366                rpc_contact_info,
1367                snapshot_hash: SnapshotHash {
1368                    full: full_snapshot_hash,
1369                    incr: incremental_snapshot_hash,
1370                },
1371            }
1372        }
1373    }
1374
1375    fn default_contact_info_for_tests() -> ContactInfo {
1376        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1377    }
1378
1379    #[test]
1380    fn test_build_known_snapshot_hashes() {
1381        solana_logger::setup();
1382        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1383        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1384
1385        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1386        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1387
1388        // simulate a set of known validators with various snapshot hashes
1389        let oracle = {
1390            let mut oracle = HashMap::new();
1391
1392            for (full, incr) in [
1393                // only a full snapshot
1394                (full_snapshot_hash1, None),
1395                // full and incremental snapshots
1396                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1397                // full and incremental snapshots, with different incremental hash
1398                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1399                // ...and now with different full hashes
1400                (full_snapshot_hash2, None),
1401                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1402                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1403            ] {
1404                // also simulate multiple known validators having the same snapshot hashes
1405                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1406                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1407                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1408            }
1409
1410            // no snapshots at all
1411            oracle.insert(Pubkey::new_unique(), None);
1412            oracle.insert(Pubkey::new_unique(), None);
1413            oracle.insert(Pubkey::new_unique(), None);
1414
1415            oracle
1416        };
1417
1418        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1419
1420        let known_snapshot_hashes =
1421            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1422
1423        // ensure there's only one full snapshot hash, since they all used the same slot and there
1424        // can be only one snapshot hash per slot
1425        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1426        assert_eq!(known_full_snapshot_hashes.len(), 1);
1427        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1428
1429        // and for the same reasons, ensure there is only one incremental snapshot hash
1430        let known_incremental_snapshot_hashes =
1431            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1432        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1433        let known_incremental_snapshot_hash =
1434            known_incremental_snapshot_hashes.iter().next().unwrap();
1435
1436        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1437        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1438        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1439        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1440        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1441        // "first" when building the known snapshot hashes.
1442        assert!(
1443            known_full_snapshot_hash == &full_snapshot_hash1
1444                || known_full_snapshot_hash == &full_snapshot_hash2
1445        );
1446        assert!(
1447            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1448                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1449        );
1450    }
1451
1452    #[test]
1453    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1454        let known_snapshot_hashes: KnownSnapshotHashes = [
1455            (
1456                (200_000, Hash::new_unique()),
1457                [
1458                    (200_200, Hash::new_unique()),
1459                    (200_400, Hash::new_unique()),
1460                    (200_600, Hash::new_unique()),
1461                    (200_800, Hash::new_unique()),
1462                ]
1463                .iter()
1464                .cloned()
1465                .collect(),
1466            ),
1467            (
1468                (300_000, Hash::new_unique()),
1469                [
1470                    (300_200, Hash::new_unique()),
1471                    (300_400, Hash::new_unique()),
1472                    (300_600, Hash::new_unique()),
1473                ]
1474                .iter()
1475                .cloned()
1476                .collect(),
1477            ),
1478        ]
1479        .iter()
1480        .cloned()
1481        .collect();
1482
1483        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1484        let known_full_snapshot_hash = known_snapshot_hash.0;
1485        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1486
1487        let contact_info = default_contact_info_for_tests();
1488        let peer_snapshot_hashes = vec![
1489            // bad full snapshot hash, no incremental snapshot hash
1490            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1491            // bad everything
1492            PeerSnapshotHash::new(
1493                contact_info.clone(),
1494                (111_000, Hash::default()),
1495                Some((111_111, Hash::default())),
1496            ),
1497            // good full snapshot hash, no incremental snapshot hash
1498            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1499            // bad full snapshot hash, good (not possible) incremental snapshot hash
1500            PeerSnapshotHash::new(
1501                contact_info.clone(),
1502                (111_000, Hash::default()),
1503                Some(*known_incremental_snapshot_hash),
1504            ),
1505            // good full snapshot hash, bad incremental snapshot hash
1506            PeerSnapshotHash::new(
1507                contact_info.clone(),
1508                *known_full_snapshot_hash,
1509                Some((111_111, Hash::default())),
1510            ),
1511            // good everything
1512            PeerSnapshotHash::new(
1513                contact_info.clone(),
1514                *known_full_snapshot_hash,
1515                Some(*known_incremental_snapshot_hash),
1516            ),
1517        ];
1518
1519        let expected = vec![
1520            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1521            PeerSnapshotHash::new(
1522                contact_info,
1523                *known_full_snapshot_hash,
1524                Some(*known_incremental_snapshot_hash),
1525            ),
1526        ];
1527        let mut actual = peer_snapshot_hashes;
1528        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1529            &known_snapshot_hashes,
1530            &mut actual,
1531        );
1532        assert_eq!(expected, actual);
1533    }
1534
1535    #[test]
1536    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1537        let contact_info = default_contact_info_for_tests();
1538        let peer_snapshot_hashes = vec![
1539            // old
1540            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1541            PeerSnapshotHash::new(
1542                contact_info.clone(),
1543                (100_000, Hash::default()),
1544                Some((100_100, Hash::default())),
1545            ),
1546            PeerSnapshotHash::new(
1547                contact_info.clone(),
1548                (100_000, Hash::default()),
1549                Some((100_200, Hash::default())),
1550            ),
1551            PeerSnapshotHash::new(
1552                contact_info.clone(),
1553                (100_000, Hash::default()),
1554                Some((100_300, Hash::default())),
1555            ),
1556            // new
1557            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1558            PeerSnapshotHash::new(
1559                contact_info.clone(),
1560                (200_000, Hash::default()),
1561                Some((200_100, Hash::default())),
1562            ),
1563            PeerSnapshotHash::new(
1564                contact_info.clone(),
1565                (200_000, Hash::default()),
1566                Some((200_200, Hash::default())),
1567            ),
1568            PeerSnapshotHash::new(
1569                contact_info.clone(),
1570                (200_000, Hash::default()),
1571                Some((200_300, Hash::default())),
1572            ),
1573        ];
1574
1575        let expected = vec![
1576            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1577            PeerSnapshotHash::new(
1578                contact_info.clone(),
1579                (200_000, Hash::default()),
1580                Some((200_100, Hash::default())),
1581            ),
1582            PeerSnapshotHash::new(
1583                contact_info.clone(),
1584                (200_000, Hash::default()),
1585                Some((200_200, Hash::default())),
1586            ),
1587            PeerSnapshotHash::new(
1588                contact_info,
1589                (200_000, Hash::default()),
1590                Some((200_300, Hash::default())),
1591            ),
1592        ];
1593        let mut actual = peer_snapshot_hashes;
1594        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1595        assert_eq!(expected, actual);
1596    }
1597
1598    #[test]
1599    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1600        let contact_info = default_contact_info_for_tests();
1601        let peer_snapshot_hashes = vec![
1602            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1603            PeerSnapshotHash::new(
1604                contact_info.clone(),
1605                (200_000, Hash::default()),
1606                Some((200_100, Hash::default())),
1607            ),
1608            PeerSnapshotHash::new(
1609                contact_info.clone(),
1610                (200_000, Hash::default()),
1611                Some((200_200, Hash::default())),
1612            ),
1613            PeerSnapshotHash::new(
1614                contact_info.clone(),
1615                (200_000, Hash::default()),
1616                Some((200_300, Hash::default())),
1617            ),
1618            PeerSnapshotHash::new(
1619                contact_info.clone(),
1620                (200_000, Hash::default()),
1621                Some((200_010, Hash::default())),
1622            ),
1623            PeerSnapshotHash::new(
1624                contact_info.clone(),
1625                (200_000, Hash::default()),
1626                Some((200_020, Hash::default())),
1627            ),
1628            PeerSnapshotHash::new(
1629                contact_info.clone(),
1630                (200_000, Hash::default()),
1631                Some((200_030, Hash::default())),
1632            ),
1633        ];
1634
1635        let expected = vec![PeerSnapshotHash::new(
1636            contact_info,
1637            (200_000, Hash::default()),
1638            Some((200_300, Hash::default())),
1639        )];
1640        let mut actual = peer_snapshot_hashes;
1641        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1642        assert_eq!(expected, actual);
1643    }
1644
1645    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1646    /// there are *zero* peers with incremental snapshots.
1647    #[test]
1648    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1649        let contact_info = default_contact_info_for_tests();
1650        let peer_snapshot_hashes = vec![
1651            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1652            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1653            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1654        ];
1655
1656        let expected = peer_snapshot_hashes.clone();
1657        let mut actual = peer_snapshot_hashes;
1658        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1659        assert_eq!(expected, actual);
1660    }
1661
1662    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1663    /// peer snapshot hashes input is empty.
1664    #[test]
1665    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1666        {
1667            let mut actual = vec![];
1668            let expected = actual.clone();
1669            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1670            assert_eq!(expected, actual);
1671        }
1672        {
1673            let mut actual = vec![];
1674            let expected = actual.clone();
1675            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1676            assert_eq!(expected, actual);
1677        }
1678    }
1679}