Skip to main content

agave_validator/
bootstrap.rs

1use {
2    agave_snapshots::{
3        SnapshotArchiveKind, paths as snapshot_paths,
4        snapshot_archive_info::SnapshotArchiveInfoGetter as _,
5    },
6    itertools::Itertools,
7    log::*,
8    rand::{Rng, rng, seq::SliceRandom},
9    rayon::prelude::*,
10    solana_account::ReadableAccount,
11    solana_clock::Slot,
12    solana_commitment_config::CommitmentConfig,
13    solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
14    solana_download_utils::{DownloadProgressRecord, download_snapshot_archive},
15    solana_genesis_utils::download_then_check_genesis_hash,
16    solana_gossip::{
17        cluster_info::ClusterInfo,
18        contact_info::{ContactInfo, Protocol},
19        crds_data,
20        gossip_service::GossipService,
21        node::Node,
22    },
23    solana_hash::Hash,
24    solana_keypair::Keypair,
25    solana_metrics::datapoint_info,
26    solana_net_utils::SocketAddrSpace,
27    solana_pubkey::Pubkey,
28    solana_rpc_client::rpc_client::RpcClient,
29    solana_signer::Signer,
30    solana_vote_program::vote_state::VoteStateV4,
31    std::{
32        collections::{HashMap, HashSet, hash_map::RandomState},
33        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
34        path::Path,
35        process::exit,
36        sync::{
37            Arc, RwLock,
38            atomic::{AtomicBool, Ordering},
39        },
40        time::{Duration, Instant},
41    },
42    thiserror::Error,
43};
44
45/// When downloading snapshots, wait at most this long for snapshot hashes from
46/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
47/// known validator.
48const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
49/// If we don't have any alternative peers after this long, better off trying
50/// blacklisted peers again.
51const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
52/// If we can't find a good snapshot download candidate after this time, just
53/// give up.
54const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
55/// If we haven't found any RPC peers after this time, just give up.
56const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
57
58pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
59
60pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
61
62#[derive(Debug, PartialEq, Clone)]
63pub struct RpcBootstrapConfig {
64    pub no_genesis_fetch: bool,
65    pub no_snapshot_fetch: bool,
66    pub only_known_rpc: bool,
67    pub max_genesis_archive_unpacked_size: u64,
68    pub check_vote_account: Option<String>,
69    pub incremental_snapshot_fetch: bool,
70}
71
72fn verify_reachable_ports(
73    node: &Node,
74    cluster_entrypoint: &ContactInfo,
75    validator_config: &ValidatorConfig,
76    socket_addr_space: &SocketAddrSpace,
77) -> bool {
78    let verify_address = |addr: &Option<SocketAddr>| -> bool {
79        addr.as_ref()
80            .map(|addr| socket_addr_space.check(addr))
81            .unwrap_or_default()
82    };
83
84    let mut udp_sockets = vec![&node.sockets.repair];
85    udp_sockets.extend(node.sockets.gossip.iter());
86
87    if verify_address(&node.info.serve_repair(Protocol::UDP)) {
88        udp_sockets.push(&node.sockets.serve_repair);
89    }
90    if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
91        udp_sockets.extend(node.sockets.tpu_vote.iter());
92    }
93    if verify_address(&node.info.tvu(Protocol::UDP)) {
94        udp_sockets.extend(node.sockets.tvu.iter());
95        udp_sockets.extend(node.sockets.broadcast.iter());
96        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
97    }
98    if !solana_net_utils::verify_all_reachable_udp(
99        &cluster_entrypoint.gossip().unwrap(),
100        &udp_sockets,
101    ) {
102        return false;
103    }
104
105    let mut tcp_listeners = vec![];
106    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
107        for (purpose, bind_addr, public_addr) in &[
108            ("RPC", rpc_addr, node.info.rpc()),
109            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
110        ] {
111            if verify_address(public_addr) {
112                tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
113                    error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
114                    exit(1);
115                }));
116            }
117        }
118    }
119
120    if let Some(ip_echo) = &node.sockets.ip_echo {
121        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
122        tcp_listeners.push(ip_echo);
123    }
124
125    solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
126}
127
128fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
129    if let Some(known_validators) = known_validators {
130        known_validators.contains(id)
131    } else {
132        false
133    }
134}
135
136#[allow(clippy::too_many_arguments)]
137fn start_gossip_node(
138    identity_keypair: Arc<Keypair>,
139    cluster_entrypoints: &[ContactInfo],
140    known_validators: Option<HashSet<Pubkey>>,
141    ledger_path: &Path,
142    gossip_addr: &SocketAddr,
143    gossip_sockets: Arc<[UdpSocket]>,
144    expected_shred_version: u16,
145    gossip_validators: Option<HashSet<Pubkey>>,
146    should_check_duplicate_instance: bool,
147    socket_addr_space: SocketAddrSpace,
148) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
149    let contact_info = ClusterInfo::gossip_contact_info(
150        identity_keypair.pubkey(),
151        *gossip_addr,
152        expected_shred_version,
153    );
154    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
155    if let Some(known_validators) = known_validators {
156        cluster_info
157            .set_trim_keep_pubkeys(known_validators)
158            .expect("set_trim_keep_pubkeys should succeed as ClusterInfo was just created");
159    }
160    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
161    cluster_info.restore_contact_info(ledger_path, 0);
162    let cluster_info = Arc::new(cluster_info);
163
164    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
165    let gossip_service = GossipService::new(
166        &cluster_info,
167        None,
168        gossip_sockets,
169        gossip_validators,
170        should_check_duplicate_instance,
171        None,
172        gossip_exit_flag.clone(),
173    );
174    (cluster_info, gossip_exit_flag, gossip_service)
175}
176
177fn get_rpc_peers(
178    cluster_info: &ClusterInfo,
179    validator_config: &ValidatorConfig,
180    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
181    blacklist_timeout: &Instant,
182    retry_reason: &mut Option<String>,
183    bootstrap_config: &RpcBootstrapConfig,
184) -> Vec<ContactInfo> {
185    let shred_version = validator_config
186        .expected_shred_version
187        .unwrap_or_else(|| cluster_info.my_shred_version());
188
189    info!(
190        "Searching for an RPC service with shred version {shred_version}{}...",
191        retry_reason
192            .as_ref()
193            .map(|s| format!(" (Retrying: {s})"))
194            .unwrap_or_default()
195    );
196
197    let mut rpc_peers = cluster_info.rpc_peers();
198    if bootstrap_config.only_known_rpc {
199        rpc_peers.retain(|rpc_peer| {
200            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
201        });
202    }
203
204    let rpc_peers_total = rpc_peers.len();
205
206    // Filter out blacklisted nodes
207    let rpc_peers: Vec<_> = rpc_peers
208        .into_iter()
209        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
210        .collect();
211    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
212    let rpc_known_peers = rpc_peers
213        .iter()
214        .filter(|rpc_peer| {
215            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
216        })
217        .count();
218
219    info!(
220        "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
221         {rpc_peers_blacklisted} blacklisted"
222    );
223
224    if rpc_peers_blacklisted == rpc_peers_total {
225        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
226            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
227        {
228            // All nodes are blacklisted and no additional nodes recently discovered.
229            // Remove all nodes from the blacklist and try them again.
230            blacklisted_rpc_nodes.clear();
231            Some("Blacklist timeout expired".to_owned())
232        } else {
233            Some("Wait for known rpc peers".to_owned())
234        };
235        return vec![];
236    }
237    rpc_peers
238}
239
240fn check_vote_account(
241    rpc_client: &RpcClient,
242    identity_pubkey: &Pubkey,
243    vote_account_address: &Pubkey,
244    authorized_voter_pubkeys: &[Pubkey],
245) -> Result<(), String> {
246    let vote_account = rpc_client
247        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
248        .map_err(|err| format!("failed to fetch vote account: {err}"))?
249        .value
250        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
251
252    if vote_account.owner != solana_vote_program::id() {
253        return Err(format!(
254            "not a vote account (owned by {}): {}",
255            vote_account.owner, vote_account_address
256        ));
257    }
258
259    let identity_account = rpc_client
260        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
261        .map_err(|err| format!("failed to fetch identity account: {err}"))?
262        .value
263        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
264
265    let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
266    if let Some(vote_state) = vote_state {
267        if vote_state.authorized_voters.is_empty() {
268            return Err("Vote account not yet initialized".to_string());
269        }
270
271        if vote_state.node_pubkey != *identity_pubkey {
272            return Err(format!(
273                "vote account's identity ({}) does not match the validator's identity {}).",
274                vote_state.node_pubkey, identity_pubkey
275            ));
276        }
277
278        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
279            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
280                return Err(format!(
281                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
282                ));
283            }
284        }
285    } else {
286        return Err(format!(
287            "invalid vote account data for {vote_account_address}"
288        ));
289    }
290
291    // Maybe we can calculate minimum voting fee; rather than 1 lamport
292    if identity_account.lamports <= 1 {
293        return Err(format!(
294            "underfunded identity account ({}): only {} lamports available",
295            identity_pubkey, identity_account.lamports
296        ));
297    }
298
299    Ok(())
300}
301
302#[derive(Error, Debug)]
303pub enum GetRpcNodeError {
304    #[error("Unable to find any RPC peers")]
305    NoRpcPeersFound,
306
307    #[error("Giving up, did not get newer snapshots from the cluster")]
308    NoNewerSnapshots,
309}
310
311/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
312/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
313/// snapshots to download.
314#[derive(Debug)]
315struct GetRpcNodeResult {
316    rpc_contact_info: ContactInfo,
317    snapshot_hash: Option<SnapshotHash>,
318}
319
320/// Struct to wrap the peers & snapshot hashes together.
321#[derive(Debug, PartialEq, Eq, Clone)]
322struct PeerSnapshotHash {
323    rpc_contact_info: ContactInfo,
324    snapshot_hash: SnapshotHash,
325}
326
327/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
328/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
329#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
330pub struct SnapshotHash {
331    full: (Slot, Hash),
332    incr: Option<(Slot, Hash)>,
333}
334
335pub fn fail_rpc_node(
336    err: String,
337    known_validators: &Option<HashSet<Pubkey, RandomState>>,
338    rpc_id: &Pubkey,
339    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
340) {
341    warn!("{err}");
342    if let Some(known_validators) = known_validators {
343        if known_validators.contains(rpc_id) {
344            return;
345        }
346    }
347
348    info!("Excluding {rpc_id} as a future RPC candidate");
349    blacklisted_rpc_nodes.insert(*rpc_id);
350}
351
352fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
353    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
354    cluster_info.save_contact_info();
355    gossip_exit_flag.store(true, Ordering::Relaxed);
356    gossip_service.join().unwrap();
357}
358
359#[allow(clippy::too_many_arguments)]
360pub fn attempt_download_genesis_and_snapshot(
361    rpc_contact_info: &ContactInfo,
362    ledger_path: &Path,
363    validator_config: &mut ValidatorConfig,
364    bootstrap_config: &RpcBootstrapConfig,
365    use_progress_bar: bool,
366    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
367    rpc_client: &RpcClient,
368    maximum_local_snapshot_age: Slot,
369    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
370    minimal_snapshot_download_speed: f32,
371    maximum_snapshot_download_abort: u64,
372    download_abort_count: &mut u64,
373    snapshot_hash: Option<SnapshotHash>,
374    identity_keypair: &Arc<Keypair>,
375    vote_account: &Pubkey,
376    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
377) -> Result<(), String> {
378    download_then_check_genesis_hash(
379        &rpc_contact_info
380            .rpc()
381            .ok_or_else(|| String::from("Invalid RPC address"))?,
382        ledger_path,
383        &mut validator_config.expected_genesis_hash,
384        bootstrap_config.max_genesis_archive_unpacked_size,
385        bootstrap_config.no_genesis_fetch,
386        use_progress_bar,
387        rpc_client,
388    )?;
389
390    if let Some(gossip) = gossip.take() {
391        shutdown_gossip_service(gossip);
392    }
393
394    let rpc_client_slot = rpc_client
395        .get_slot_with_commitment(CommitmentConfig::finalized())
396        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
397    info!("RPC node root slot: {rpc_client_slot}");
398
399    download_snapshots(
400        validator_config,
401        bootstrap_config,
402        use_progress_bar,
403        maximum_local_snapshot_age,
404        start_progress,
405        minimal_snapshot_download_speed,
406        maximum_snapshot_download_abort,
407        download_abort_count,
408        snapshot_hash,
409        rpc_contact_info,
410    )?;
411
412    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
413        let rpc_client = RpcClient::new(url);
414        check_vote_account(
415            &rpc_client,
416            &identity_keypair.pubkey(),
417            vote_account,
418            &authorized_voter_keypairs
419                .read()
420                .unwrap()
421                .iter()
422                .map(|k| k.pubkey())
423                .collect::<Vec<_>>(),
424        )
425        .unwrap_or_else(|err| {
426            // Consider failures here to be more likely due to user error (eg,
427            // incorrect `agave-validator` command-line arguments) rather than the
428            // RPC node failing.
429            //
430            // Power users can always use the `--no-check-vote-account` option to
431            // bypass this check entirely
432            error!("{err}");
433            exit(1);
434        });
435    }
436    Ok(())
437}
438
439/// simple ping helper function which returns the time to connect
440fn ping(addr: &SocketAddr) -> Option<Duration> {
441    let start = Instant::now();
442    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
443        Ok(_) => Some(start.elapsed()),
444        Err(_) => None,
445    }
446}
447
448// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
449// used for downloading latest snapshots and/or the genesis block. Guaranteed to
450// find at least one viable node or terminate the process.
451fn get_vetted_rpc_nodes(
452    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
453    cluster_info: &Arc<ClusterInfo>,
454    validator_config: &ValidatorConfig,
455    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
456    bootstrap_config: &RpcBootstrapConfig,
457) {
458    while vetted_rpc_nodes.is_empty() {
459        let rpc_node_details = match get_rpc_nodes(
460            cluster_info,
461            validator_config,
462            blacklisted_rpc_nodes,
463            bootstrap_config,
464        ) {
465            Ok(rpc_node_details) => rpc_node_details,
466            Err(err) => {
467                error!(
468                    "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
469                     `--no-port-check`, or adjusting `--known-validator ...` arguments as \
470                     applicable"
471                );
472                exit(1);
473            }
474        };
475
476        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
477        vetted_rpc_nodes.extend(
478            rpc_node_details
479                .into_par_iter()
480                .filter_map(|rpc_node_details| {
481                    let GetRpcNodeResult {
482                        rpc_contact_info,
483                        snapshot_hash,
484                    } = rpc_node_details;
485
486                    info!(
487                        "Using RPC service from node {}: {:?}",
488                        rpc_contact_info.pubkey(),
489                        rpc_contact_info.rpc()
490                    );
491
492                    let rpc_addr = rpc_contact_info.rpc()?;
493                    let ping_time = ping(&rpc_addr);
494
495                    let rpc_client =
496                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
497
498                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
499                })
500                .filter(
501                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
502                        .get_version()
503                    {
504                        Ok(rpc_version) => {
505                            if let Some(ping_time) = ping_time {
506                                info!(
507                                    "RPC node version: {} Ping: {}ms",
508                                    rpc_version.solana_core,
509                                    ping_time.as_millis()
510                                );
511                                true
512                            } else {
513                                fail_rpc_node(
514                                    "Failed to ping RPC".to_string(),
515                                    &validator_config.known_validators,
516                                    rpc_contact_info.pubkey(),
517                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
518                                );
519                                false
520                            }
521                        }
522                        Err(err) => {
523                            fail_rpc_node(
524                                format!("Failed to get RPC node version: {err}"),
525                                &validator_config.known_validators,
526                                rpc_contact_info.pubkey(),
527                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
528                            );
529                            false
530                        }
531                    },
532                )
533                .collect::<Vec<(
534                    ContactInfo,
535                    Option<SnapshotHash>,
536                    RpcClient,
537                    Option<Duration>,
538                )>>()
539                .into_iter()
540                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
541                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
542                    (rpc_contact_info, snapshot_hash, rpc_client)
543                })
544                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
545        );
546        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
547    }
548}
549
550#[allow(clippy::too_many_arguments)]
551pub fn rpc_bootstrap(
552    node: &Node,
553    identity_keypair: &Arc<Keypair>,
554    ledger_path: &Path,
555    vote_account: &Pubkey,
556    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
557    cluster_entrypoints: &[ContactInfo],
558    validator_config: &mut ValidatorConfig,
559    bootstrap_config: RpcBootstrapConfig,
560    do_port_check: bool,
561    use_progress_bar: bool,
562    maximum_local_snapshot_age: Slot,
563    should_check_duplicate_instance: bool,
564    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
565    minimal_snapshot_download_speed: f32,
566    maximum_snapshot_download_abort: u64,
567    socket_addr_space: SocketAddrSpace,
568) {
569    if do_port_check {
570        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
571        order.shuffle(&mut rng());
572        if order.into_iter().all(|i| {
573            !verify_reachable_ports(
574                node,
575                &cluster_entrypoints[i],
576                validator_config,
577                &socket_addr_space,
578            )
579        }) {
580            exit(1);
581        }
582    }
583
584    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
585        return;
586    }
587
588    let total_snapshot_download_time = Instant::now();
589    let mut get_rpc_nodes_time = Duration::new(0, 0);
590    let mut snapshot_download_time = Duration::new(0, 0);
591    let mut blacklisted_rpc_nodes = HashSet::new();
592    let mut gossip = None;
593    let mut vetted_rpc_nodes = vec![];
594    let mut download_abort_count = 0;
595    loop {
596        if gossip.is_none() {
597            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
598
599            gossip = Some(start_gossip_node(
600                identity_keypair.clone(),
601                cluster_entrypoints,
602                validator_config.known_validators.clone(),
603                ledger_path,
604                &node
605                    .info
606                    .gossip()
607                    .expect("Operator must spin up node with valid gossip address"),
608                node.sockets.gossip.clone(),
609                validator_config
610                    .expected_shred_version
611                    .expect("expected_shred_version should not be None"),
612                validator_config.gossip_validators.clone(),
613                should_check_duplicate_instance,
614                socket_addr_space,
615            ));
616        }
617
618        let get_rpc_nodes_start = Instant::now();
619        get_vetted_rpc_nodes(
620            &mut vetted_rpc_nodes,
621            &gossip.as_ref().unwrap().0,
622            validator_config,
623            &mut blacklisted_rpc_nodes,
624            &bootstrap_config,
625        );
626        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
627        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
628
629        let snapshot_download_start = Instant::now();
630        let download_result = attempt_download_genesis_and_snapshot(
631            &rpc_contact_info,
632            ledger_path,
633            validator_config,
634            &bootstrap_config,
635            use_progress_bar,
636            &mut gossip,
637            &rpc_client,
638            maximum_local_snapshot_age,
639            start_progress,
640            minimal_snapshot_download_speed,
641            maximum_snapshot_download_abort,
642            &mut download_abort_count,
643            snapshot_hash,
644            identity_keypair,
645            vote_account,
646            authorized_voter_keypairs.clone(),
647        );
648        snapshot_download_time += snapshot_download_start.elapsed();
649        match download_result {
650            Ok(()) => break,
651            Err(err) => {
652                fail_rpc_node(
653                    err,
654                    &validator_config.known_validators,
655                    rpc_contact_info.pubkey(),
656                    &mut blacklisted_rpc_nodes,
657                );
658            }
659        }
660    }
661
662    if let Some(gossip) = gossip.take() {
663        shutdown_gossip_service(gossip);
664    }
665
666    datapoint_info!(
667        "bootstrap-snapshot-download",
668        (
669            "total_time_secs",
670            total_snapshot_download_time.elapsed().as_secs(),
671            i64
672        ),
673        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
674        (
675            "snapshot_download_time_secs",
676            snapshot_download_time.as_secs(),
677            i64
678        ),
679        ("download_abort_count", download_abort_count, i64),
680        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
681    );
682}
683
684/// Get RPC peer node candidates to download from.
685///
686/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
687fn get_rpc_nodes(
688    cluster_info: &ClusterInfo,
689    validator_config: &ValidatorConfig,
690    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
691    bootstrap_config: &RpcBootstrapConfig,
692) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
693    let mut blacklist_timeout = Instant::now();
694    let mut get_rpc_peers_timout = Instant::now();
695    let mut newer_cluster_snapshot_timeout = None;
696    let mut retry_reason = None;
697    loop {
698        // Give gossip some time to populate and not spin on grabbing the crds lock
699        std::thread::sleep(Duration::from_secs(1));
700        info!("\n{}", cluster_info.rpc_info_trace());
701
702        let rpc_peers = get_rpc_peers(
703            cluster_info,
704            validator_config,
705            blacklisted_rpc_nodes,
706            &blacklist_timeout,
707            &mut retry_reason,
708            bootstrap_config,
709        );
710        if rpc_peers.is_empty() {
711            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
712                return Err(GetRpcNodeError::NoRpcPeersFound);
713            }
714            continue;
715        }
716
717        // Reset timeouts if we found any viable RPC peers.
718        blacklist_timeout = Instant::now();
719        get_rpc_peers_timout = Instant::now();
720        if bootstrap_config.no_snapshot_fetch {
721            let random_peer = &rpc_peers[rng().random_range(0..rpc_peers.len())];
722            return Ok(vec![GetRpcNodeResult {
723                rpc_contact_info: random_peer.clone(),
724                snapshot_hash: None,
725            }]);
726        }
727
728        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
729            .as_ref()
730            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
731            .unwrap_or(true)
732        {
733            KnownValidatorsToWaitFor::All
734        } else {
735            KnownValidatorsToWaitFor::Any
736        };
737        let peer_snapshot_hashes = get_peer_snapshot_hashes(
738            cluster_info,
739            &rpc_peers,
740            validator_config.known_validators.as_ref(),
741            known_validators_to_wait_for,
742            bootstrap_config.incremental_snapshot_fetch,
743        );
744        if peer_snapshot_hashes.is_empty() {
745            match newer_cluster_snapshot_timeout {
746                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
747                Some(newer_cluster_snapshot_timeout) => {
748                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
749                        return Err(GetRpcNodeError::NoNewerSnapshots);
750                    }
751                }
752            }
753            retry_reason = Some("No snapshots available".to_owned());
754            continue;
755        } else {
756            let rpc_peers = peer_snapshot_hashes
757                .iter()
758                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
759                .collect::<Vec<_>>();
760            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
761            info!(
762                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
763                final_snapshot_hash
764                    .incr
765                    .map(|(slot, _hash)| slot)
766                    .unwrap_or(final_snapshot_hash.full.0),
767                rpc_peers.len(),
768                if rpc_peers.len() > 1 { "s" } else { "" },
769                rpc_peers,
770            );
771            let rpc_node_results = peer_snapshot_hashes
772                .iter()
773                .map(|peer_snapshot_hash| GetRpcNodeResult {
774                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
775                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
776                })
777                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
778                .collect();
779            return Ok(rpc_node_results);
780        }
781    }
782}
783
784/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
785/// snapshot or an incremental snapshot.
786fn get_highest_local_snapshot_hash(
787    full_snapshot_archives_dir: impl AsRef<Path>,
788    incremental_snapshot_archives_dir: impl AsRef<Path>,
789    incremental_snapshot_fetch: bool,
790) -> Option<(Slot, Hash)> {
791    snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
792        .and_then(|full_snapshot_info| {
793            if incremental_snapshot_fetch {
794                snapshot_paths::get_highest_incremental_snapshot_archive_info(
795                    incremental_snapshot_archives_dir,
796                    full_snapshot_info.slot(),
797                )
798                .map(|incremental_snapshot_info| {
799                    (
800                        incremental_snapshot_info.slot(),
801                        *incremental_snapshot_info.hash(),
802                    )
803                })
804            } else {
805                None
806            }
807            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
808        })
809        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
810}
811
812/// Get peer snapshot hashes
813///
814/// The result is a vector of peers with snapshot hashes that:
815/// 1. match a snapshot hash from the known validators
816/// 2. have the highest incremental snapshot slot
817/// 3. have the highest full snapshot slot of (2)
818fn get_peer_snapshot_hashes(
819    cluster_info: &ClusterInfo,
820    rpc_peers: &[ContactInfo],
821    known_validators: Option<&HashSet<Pubkey>>,
822    known_validators_to_wait_for: KnownValidatorsToWaitFor,
823    incremental_snapshot_fetch: bool,
824) -> Vec<PeerSnapshotHash> {
825    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
826    if let Some(known_validators) = known_validators {
827        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
828            cluster_info,
829            known_validators,
830            known_validators_to_wait_for,
831        );
832        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
833            &known_snapshot_hashes,
834            &mut peer_snapshot_hashes,
835        );
836    }
837    if incremental_snapshot_fetch {
838        // Only filter by highest incremental snapshot slot if we're actually going to download an
839        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
840        // being selected.  For example, if there are two peer snapshot hashes:
841        // (A) full snapshot slot: 100, incremental snapshot slot: 160
842        // (B) full snapshot slot: 150, incremental snapshot slot: None
843        // Then (A) has the highest overall snapshot slot.  But if we're not downloading and
844        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
845        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
846            &mut peer_snapshot_hashes,
847        );
848    }
849    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
850
851    peer_snapshot_hashes
852}
853
854/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
855/// is treated as the base for its set of incremental snapshot hashes.
856type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
857
858/// Get the snapshot hashes from known validators.
859///
860/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
861/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
862/// queried for their individual snapshot hashes, their results will be checked against this
863/// map to verify correctness.
864///
865/// NOTE: Only a single snapshot hash is allowed per slot.  If somehow two known validators have
866/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
867/// This applies to both full and incremental snapshot hashes.
868fn get_snapshot_hashes_from_known_validators(
869    cluster_info: &ClusterInfo,
870    known_validators: &HashSet<Pubkey>,
871    known_validators_to_wait_for: KnownValidatorsToWaitFor,
872) -> KnownSnapshotHashes {
873    // Get the snapshot hashes for a node from CRDS
874    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
875
876    if !do_known_validators_have_all_snapshot_hashes(
877        known_validators,
878        known_validators_to_wait_for,
879        get_snapshot_hashes_for_node,
880    ) {
881        debug!(
882            "Snapshot hashes have not been discovered from known validators. This likely means \
883             the gossip tables are not fully populated. We will sleep and retry..."
884        );
885        return KnownSnapshotHashes::default();
886    }
887
888    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
889}
890
891/// Check if we can discover snapshot hashes for the known validators.
892///
893/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
894/// process will download snapshots.
895///
896/// This function will return false if we do not yet have snapshot hashes from known validators;
897/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
898/// based on the `KnownValidatorsToWaitFor` parameter.
899fn do_known_validators_have_all_snapshot_hashes<'a>(
900    known_validators: impl IntoIterator<Item = &'a Pubkey>,
901    known_validators_to_wait_for: KnownValidatorsToWaitFor,
902    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
903) -> bool {
904    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
905
906    match known_validators_to_wait_for {
907        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
908        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
909    }
910}
911
912/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
913/// of them?
914#[derive(Debug, Copy, Clone, Eq, PartialEq)]
915enum KnownValidatorsToWaitFor {
916    All,
917    Any,
918}
919
920/// Build the known snapshot hashes from a set of nodes.
921///
922/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
923/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
924/// runtime information such as the ClusterInfo or ValidatorConfig.
925fn build_known_snapshot_hashes<'a>(
926    nodes: impl IntoIterator<Item = &'a Pubkey>,
927    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
928) -> KnownSnapshotHashes {
929    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
930
931    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
932    /// but *different* hash as the needle.
933    fn is_any_same_slot_and_different_hash<'a>(
934        needle: &(Slot, Hash),
935        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
936    ) -> bool {
937        haystack
938            .into_iter()
939            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
940    }
941
942    'to_next_node: for node in nodes {
943        let Some(SnapshotHash {
944            full: full_snapshot_hash,
945            incr: incremental_snapshot_hash,
946        }) = get_snapshot_hashes_for_node(node)
947        else {
948            continue 'to_next_node;
949        };
950
951        // Do not add this snapshot hash if there's already a full snapshot hash with the
952        // same slot but with a _different_ hash.
953        // NOTE: Nodes should not produce snapshots at the same slot with _different_
954        // hashes.  So if it happens, keep the first and ignore the rest.
955        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
956            warn!(
957                "Ignoring all snapshot hashes from node {node} since we've seen a different full \
958                 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
959            );
960            debug!(
961                "known full snapshot hashes: {:#?}",
962                known_snapshot_hashes.keys(),
963            );
964            continue 'to_next_node;
965        }
966
967        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
968        // doesn't already exist.  This is to ensure we don't overwrite existing
969        // incremental snapshot hashes that may be present for this full snapshot hash.
970        let known_incremental_snapshot_hashes =
971            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
972
973        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
974            // Do not add this snapshot hash if there's already an incremental snapshot
975            // hash with the same slot, but with a _different_ hash.
976            // NOTE: Nodes should not produce snapshots at the same slot with _different_
977            // hashes.  So if it happens, keep the first and ignore the rest.
978            if is_any_same_slot_and_different_hash(
979                &incremental_snapshot_hash,
980                known_incremental_snapshot_hashes.iter(),
981            ) {
982                warn!(
983                    "Ignoring incremental snapshot hash from node {node} since we've seen a \
984                     different incremental snapshot hash with this slot. full snapshot hash: \
985                     {full_snapshot_hash:?}, incremental snapshot hash: \
986                     {incremental_snapshot_hash:?}"
987                );
988                debug!(
989                    "known incremental snapshot hashes based on this slot: {:#?}",
990                    known_incremental_snapshot_hashes.iter(),
991                );
992                continue 'to_next_node;
993            }
994
995            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
996        };
997    }
998
999    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1000    known_snapshot_hashes
1001}
1002
1003/// Get snapshot hashes from all eligible peers.
1004///
1005/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1006/// This may be just a full snapshot hash, or a combo full snapshot hash and
1007/// incremental snapshot hash.
1008fn get_eligible_peer_snapshot_hashes(
1009    cluster_info: &ClusterInfo,
1010    rpc_peers: &[ContactInfo],
1011) -> Vec<PeerSnapshotHash> {
1012    let peer_snapshot_hashes = rpc_peers
1013        .iter()
1014        .flat_map(|rpc_peer| {
1015            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1016                PeerSnapshotHash {
1017                    rpc_contact_info: rpc_peer.clone(),
1018                    snapshot_hash,
1019                }
1020            })
1021        })
1022        .collect();
1023
1024    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1025    peer_snapshot_hashes
1026}
1027
1028/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1029fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1030    known_snapshot_hashes: &KnownSnapshotHashes,
1031    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1032) {
1033    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1034        known_snapshot_hashes
1035            .get(&peer_snapshot_hash.snapshot_hash.full)
1036            .map(|known_incremental_hashes| {
1037                if let Some(incr) = peer_snapshot_hash.snapshot_hash.incr.as_ref() {
1038                    known_incremental_hashes.contains(incr)
1039                } else {
1040                    // If the peer's full snapshot hashes match, but doesn't have any
1041                    // incremental snapshots, that's fine; keep 'em!
1042                    true
1043                }
1044            })
1045            .unwrap_or(false)
1046    });
1047
1048    trace!(
1049        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1050    );
1051}
1052
1053/// Retain the peer snapshot hashes with the highest full snapshot slot
1054fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1055    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1056) {
1057    let highest_full_snapshot_hash = peer_snapshot_hashes
1058        .iter()
1059        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1060        .max_by_key(|(slot, _hash)| *slot);
1061    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1062        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1063        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1064        // will be nothing to compare against within the `retain()` predicate).
1065        return;
1066    };
1067
1068    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1069        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1070    });
1071
1072    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1073}
1074
1075/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1076fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1077    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1078) {
1079    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1080        .iter()
1081        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1082        .max_by_key(|(slot, _hash)| *slot);
1083
1084    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1085        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1086    });
1087
1088    trace!(
1089        "retain peer snapshot hashes with highest incremental snapshot slot: \
1090         {peer_snapshot_hashes:?}"
1091    );
1092}
1093
1094/// Check to see if we can use our local snapshots, otherwise download newer ones.
1095#[allow(clippy::too_many_arguments)]
1096fn download_snapshots(
1097    validator_config: &ValidatorConfig,
1098    bootstrap_config: &RpcBootstrapConfig,
1099    use_progress_bar: bool,
1100    maximum_local_snapshot_age: Slot,
1101    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1102    minimal_snapshot_download_speed: f32,
1103    maximum_snapshot_download_abort: u64,
1104    download_abort_count: &mut u64,
1105    snapshot_hash: Option<SnapshotHash>,
1106    rpc_contact_info: &ContactInfo,
1107) -> Result<(), String> {
1108    if snapshot_hash.is_none() {
1109        return Ok(());
1110    }
1111    let SnapshotHash {
1112        full: full_snapshot_hash,
1113        incr: incremental_snapshot_hash,
1114    } = snapshot_hash.unwrap();
1115    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1116    let incremental_snapshot_archives_dir = &validator_config
1117        .snapshot_config
1118        .incremental_snapshot_archives_dir;
1119
1120    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1121    if should_use_local_snapshot(
1122        full_snapshot_archives_dir,
1123        incremental_snapshot_archives_dir,
1124        maximum_local_snapshot_age,
1125        full_snapshot_hash,
1126        incremental_snapshot_hash,
1127        bootstrap_config.incremental_snapshot_fetch,
1128    ) {
1129        return Ok(());
1130    }
1131
1132    // Check and see if we've already got the full snapshot; if not, download it
1133    if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
1134        .into_iter()
1135        .any(|snapshot_archive| {
1136            snapshot_archive.slot() == full_snapshot_hash.0
1137                && snapshot_archive.hash().0 == full_snapshot_hash.1
1138        })
1139    {
1140        info!(
1141            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1142            full_snapshot_hash.0, full_snapshot_hash.1
1143        );
1144    } else {
1145        download_snapshot(
1146            validator_config,
1147            bootstrap_config,
1148            use_progress_bar,
1149            start_progress,
1150            minimal_snapshot_download_speed,
1151            maximum_snapshot_download_abort,
1152            download_abort_count,
1153            rpc_contact_info,
1154            full_snapshot_hash,
1155            SnapshotArchiveKind::Full,
1156        )?;
1157    }
1158
1159    if bootstrap_config.incremental_snapshot_fetch {
1160        // Check and see if we've already got the incremental snapshot; if not, download it
1161        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1162            if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1163                .into_iter()
1164                .any(|snapshot_archive| {
1165                    snapshot_archive.slot() == incremental_snapshot_hash.0
1166                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1167                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1168                })
1169            {
1170                info!(
1171                    "Incremental snapshot archive already exists locally. Skipping download. \
1172                     slot: {}, hash: {}",
1173                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1174                );
1175            } else {
1176                download_snapshot(
1177                    validator_config,
1178                    bootstrap_config,
1179                    use_progress_bar,
1180                    start_progress,
1181                    minimal_snapshot_download_speed,
1182                    maximum_snapshot_download_abort,
1183                    download_abort_count,
1184                    rpc_contact_info,
1185                    incremental_snapshot_hash,
1186                    SnapshotArchiveKind::Incremental(full_snapshot_hash.0),
1187                )?;
1188            }
1189        }
1190    }
1191
1192    Ok(())
1193}
1194
1195/// Download a snapshot
1196#[allow(clippy::too_many_arguments)]
1197fn download_snapshot(
1198    validator_config: &ValidatorConfig,
1199    bootstrap_config: &RpcBootstrapConfig,
1200    use_progress_bar: bool,
1201    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1202    minimal_snapshot_download_speed: f32,
1203    maximum_snapshot_download_abort: u64,
1204    download_abort_count: &mut u64,
1205    rpc_contact_info: &ContactInfo,
1206    desired_snapshot_hash: (Slot, Hash),
1207    snapshot_kind: SnapshotArchiveKind,
1208) -> Result<(), String> {
1209    let maximum_full_snapshot_archives_to_retain = validator_config
1210        .snapshot_config
1211        .maximum_full_snapshot_archives_to_retain;
1212    let maximum_incremental_snapshot_archives_to_retain = validator_config
1213        .snapshot_config
1214        .maximum_incremental_snapshot_archives_to_retain;
1215    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1216    let incremental_snapshot_archives_dir = &validator_config
1217        .snapshot_config
1218        .incremental_snapshot_archives_dir;
1219
1220    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1221        slot: desired_snapshot_hash.0,
1222        rpc_addr: rpc_contact_info
1223            .rpc()
1224            .ok_or_else(|| String::from("Invalid RPC address"))?,
1225    };
1226    let desired_snapshot_hash = (
1227        desired_snapshot_hash.0,
1228        agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1229    );
1230    download_snapshot_archive(
1231        &rpc_contact_info
1232            .rpc()
1233            .ok_or_else(|| String::from("Invalid RPC address"))?,
1234        full_snapshot_archives_dir,
1235        incremental_snapshot_archives_dir,
1236        desired_snapshot_hash,
1237        snapshot_kind,
1238        maximum_full_snapshot_archives_to_retain,
1239        maximum_incremental_snapshot_archives_to_retain,
1240        use_progress_bar,
1241        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1242            debug!("Download progress: {download_progress:?}");
1243            if download_progress.last_throughput < minimal_snapshot_download_speed
1244                && download_progress.notification_count <= 1
1245                && download_progress.percentage_done <= 2_f32
1246                && download_progress.estimated_remaining_time > 60_f32
1247                && *download_abort_count < maximum_snapshot_download_abort
1248            {
1249                if let Some(ref known_validators) = validator_config.known_validators {
1250                    if known_validators.contains(rpc_contact_info.pubkey())
1251                        && known_validators.len() == 1
1252                        && bootstrap_config.only_known_rpc
1253                    {
1254                        warn!(
1255                            "The snapshot download is too slow, throughput: {} < min speed {} \
1256                             bytes/sec, but will NOT abort and try a different node as it is the \
1257                             only known validator and the --only-known-rpc flag is set. Abort \
1258                             count: {}, Progress detail: {:?}",
1259                            download_progress.last_throughput,
1260                            minimal_snapshot_download_speed,
1261                            download_abort_count,
1262                            download_progress,
1263                        );
1264                        return true; // Do not abort download from the one-and-only known validator
1265                    }
1266                }
1267                warn!(
1268                    "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1269                     will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1270                    download_progress.last_throughput,
1271                    minimal_snapshot_download_speed,
1272                    download_abort_count,
1273                    download_progress,
1274                );
1275                *download_abort_count += 1;
1276                false
1277            } else {
1278                true
1279            }
1280        })),
1281    )
1282}
1283
1284/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1285/// will be downloaded.
1286fn should_use_local_snapshot(
1287    full_snapshot_archives_dir: &Path,
1288    incremental_snapshot_archives_dir: &Path,
1289    maximum_local_snapshot_age: Slot,
1290    full_snapshot_hash: (Slot, Hash),
1291    incremental_snapshot_hash: Option<(Slot, Hash)>,
1292    incremental_snapshot_fetch: bool,
1293) -> bool {
1294    let cluster_snapshot_slot = incremental_snapshot_hash
1295        .map(|(slot, _)| slot)
1296        .unwrap_or(full_snapshot_hash.0);
1297
1298    match get_highest_local_snapshot_hash(
1299        full_snapshot_archives_dir,
1300        incremental_snapshot_archives_dir,
1301        incremental_snapshot_fetch,
1302    ) {
1303        None => {
1304            info!(
1305                "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1306                 local snapshot."
1307            );
1308            false
1309        }
1310        Some((local_snapshot_slot, _)) => {
1311            if local_snapshot_slot
1312                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1313            {
1314                info!(
1315                    "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1316                     a snapshot for slot {cluster_snapshot_slot}."
1317                );
1318                true
1319            } else {
1320                info!(
1321                    "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1322                     newer snapshot for slot {cluster_snapshot_slot}."
1323                );
1324                false
1325            }
1326        }
1327    }
1328}
1329
1330/// Get the node's highest snapshot hashes from CRDS
1331fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1332    cluster_info.get_snapshot_hashes_for_node(node).map(
1333        |crds_data::SnapshotHashes {
1334             full, incremental, ..
1335         }| {
1336            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1337            SnapshotHash {
1338                full,
1339                incr: highest_incremental_snapshot_hash,
1340            }
1341        },
1342    )
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347    use super::*;
1348
1349    impl PeerSnapshotHash {
1350        fn new(
1351            rpc_contact_info: ContactInfo,
1352            full_snapshot_hash: (Slot, Hash),
1353            incremental_snapshot_hash: Option<(Slot, Hash)>,
1354        ) -> Self {
1355            Self {
1356                rpc_contact_info,
1357                snapshot_hash: SnapshotHash {
1358                    full: full_snapshot_hash,
1359                    incr: incremental_snapshot_hash,
1360                },
1361            }
1362        }
1363    }
1364
1365    fn default_contact_info_for_tests() -> ContactInfo {
1366        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1367    }
1368
1369    #[test]
1370    fn test_build_known_snapshot_hashes() {
1371        agave_logger::setup();
1372        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1373        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1374
1375        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1376        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1377
1378        // simulate a set of known validators with various snapshot hashes
1379        let oracle = {
1380            let mut oracle = HashMap::new();
1381
1382            for (full, incr) in [
1383                // only a full snapshot
1384                (full_snapshot_hash1, None),
1385                // full and incremental snapshots
1386                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1387                // full and incremental snapshots, with different incremental hash
1388                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1389                // ...and now with different full hashes
1390                (full_snapshot_hash2, None),
1391                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1392                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1393            ] {
1394                // also simulate multiple known validators having the same snapshot hashes
1395                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1396                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1397                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1398            }
1399
1400            // no snapshots at all
1401            oracle.insert(Pubkey::new_unique(), None);
1402            oracle.insert(Pubkey::new_unique(), None);
1403            oracle.insert(Pubkey::new_unique(), None);
1404
1405            oracle
1406        };
1407
1408        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1409
1410        let known_snapshot_hashes =
1411            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1412
1413        // ensure there's only one full snapshot hash, since they all used the same slot and there
1414        // can be only one snapshot hash per slot
1415        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1416        assert_eq!(known_full_snapshot_hashes.len(), 1);
1417        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1418
1419        // and for the same reasons, ensure there is only one incremental snapshot hash
1420        let known_incremental_snapshot_hashes =
1421            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1422        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1423        let known_incremental_snapshot_hash =
1424            known_incremental_snapshot_hashes.iter().next().unwrap();
1425
1426        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1427        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1428        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1429        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1430        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1431        // "first" when building the known snapshot hashes.
1432        assert!(
1433            known_full_snapshot_hash == &full_snapshot_hash1
1434                || known_full_snapshot_hash == &full_snapshot_hash2
1435        );
1436        assert!(
1437            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1438                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1439        );
1440    }
1441
1442    #[test]
1443    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1444        let known_snapshot_hashes: KnownSnapshotHashes = [
1445            (
1446                (200_000, Hash::new_unique()),
1447                [
1448                    (200_200, Hash::new_unique()),
1449                    (200_400, Hash::new_unique()),
1450                    (200_600, Hash::new_unique()),
1451                    (200_800, Hash::new_unique()),
1452                ]
1453                .iter()
1454                .cloned()
1455                .collect(),
1456            ),
1457            (
1458                (300_000, Hash::new_unique()),
1459                [
1460                    (300_200, Hash::new_unique()),
1461                    (300_400, Hash::new_unique()),
1462                    (300_600, Hash::new_unique()),
1463                ]
1464                .iter()
1465                .cloned()
1466                .collect(),
1467            ),
1468        ]
1469        .iter()
1470        .cloned()
1471        .collect();
1472
1473        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1474        let known_full_snapshot_hash = known_snapshot_hash.0;
1475        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1476
1477        let contact_info = default_contact_info_for_tests();
1478        let peer_snapshot_hashes = vec![
1479            // bad full snapshot hash, no incremental snapshot hash
1480            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1481            // bad everything
1482            PeerSnapshotHash::new(
1483                contact_info.clone(),
1484                (111_000, Hash::default()),
1485                Some((111_111, Hash::default())),
1486            ),
1487            // good full snapshot hash, no incremental snapshot hash
1488            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1489            // bad full snapshot hash, good (not possible) incremental snapshot hash
1490            PeerSnapshotHash::new(
1491                contact_info.clone(),
1492                (111_000, Hash::default()),
1493                Some(*known_incremental_snapshot_hash),
1494            ),
1495            // good full snapshot hash, bad incremental snapshot hash
1496            PeerSnapshotHash::new(
1497                contact_info.clone(),
1498                *known_full_snapshot_hash,
1499                Some((111_111, Hash::default())),
1500            ),
1501            // good everything
1502            PeerSnapshotHash::new(
1503                contact_info.clone(),
1504                *known_full_snapshot_hash,
1505                Some(*known_incremental_snapshot_hash),
1506            ),
1507        ];
1508
1509        let expected = vec![
1510            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1511            PeerSnapshotHash::new(
1512                contact_info,
1513                *known_full_snapshot_hash,
1514                Some(*known_incremental_snapshot_hash),
1515            ),
1516        ];
1517        let mut actual = peer_snapshot_hashes;
1518        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1519            &known_snapshot_hashes,
1520            &mut actual,
1521        );
1522        assert_eq!(expected, actual);
1523    }
1524
1525    #[test]
1526    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1527        let contact_info = default_contact_info_for_tests();
1528        let peer_snapshot_hashes = vec![
1529            // old
1530            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1531            PeerSnapshotHash::new(
1532                contact_info.clone(),
1533                (100_000, Hash::default()),
1534                Some((100_100, Hash::default())),
1535            ),
1536            PeerSnapshotHash::new(
1537                contact_info.clone(),
1538                (100_000, Hash::default()),
1539                Some((100_200, Hash::default())),
1540            ),
1541            PeerSnapshotHash::new(
1542                contact_info.clone(),
1543                (100_000, Hash::default()),
1544                Some((100_300, Hash::default())),
1545            ),
1546            // new
1547            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1548            PeerSnapshotHash::new(
1549                contact_info.clone(),
1550                (200_000, Hash::default()),
1551                Some((200_100, Hash::default())),
1552            ),
1553            PeerSnapshotHash::new(
1554                contact_info.clone(),
1555                (200_000, Hash::default()),
1556                Some((200_200, Hash::default())),
1557            ),
1558            PeerSnapshotHash::new(
1559                contact_info.clone(),
1560                (200_000, Hash::default()),
1561                Some((200_300, Hash::default())),
1562            ),
1563        ];
1564
1565        let expected = vec![
1566            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1567            PeerSnapshotHash::new(
1568                contact_info.clone(),
1569                (200_000, Hash::default()),
1570                Some((200_100, Hash::default())),
1571            ),
1572            PeerSnapshotHash::new(
1573                contact_info.clone(),
1574                (200_000, Hash::default()),
1575                Some((200_200, Hash::default())),
1576            ),
1577            PeerSnapshotHash::new(
1578                contact_info,
1579                (200_000, Hash::default()),
1580                Some((200_300, Hash::default())),
1581            ),
1582        ];
1583        let mut actual = peer_snapshot_hashes;
1584        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1585        assert_eq!(expected, actual);
1586    }
1587
1588    #[test]
1589    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1590        let contact_info = default_contact_info_for_tests();
1591        let peer_snapshot_hashes = vec![
1592            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1593            PeerSnapshotHash::new(
1594                contact_info.clone(),
1595                (200_000, Hash::default()),
1596                Some((200_100, Hash::default())),
1597            ),
1598            PeerSnapshotHash::new(
1599                contact_info.clone(),
1600                (200_000, Hash::default()),
1601                Some((200_200, Hash::default())),
1602            ),
1603            PeerSnapshotHash::new(
1604                contact_info.clone(),
1605                (200_000, Hash::default()),
1606                Some((200_300, Hash::default())),
1607            ),
1608            PeerSnapshotHash::new(
1609                contact_info.clone(),
1610                (200_000, Hash::default()),
1611                Some((200_010, Hash::default())),
1612            ),
1613            PeerSnapshotHash::new(
1614                contact_info.clone(),
1615                (200_000, Hash::default()),
1616                Some((200_020, Hash::default())),
1617            ),
1618            PeerSnapshotHash::new(
1619                contact_info.clone(),
1620                (200_000, Hash::default()),
1621                Some((200_030, Hash::default())),
1622            ),
1623        ];
1624
1625        let expected = vec![PeerSnapshotHash::new(
1626            contact_info,
1627            (200_000, Hash::default()),
1628            Some((200_300, Hash::default())),
1629        )];
1630        let mut actual = peer_snapshot_hashes;
1631        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1632        assert_eq!(expected, actual);
1633    }
1634
1635    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1636    /// there are *zero* peers with incremental snapshots.
1637    #[test]
1638    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1639        let contact_info = default_contact_info_for_tests();
1640        let peer_snapshot_hashes = vec![
1641            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1642            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1643            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1644        ];
1645
1646        let expected = peer_snapshot_hashes.clone();
1647        let mut actual = peer_snapshot_hashes;
1648        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1649        assert_eq!(expected, actual);
1650    }
1651
1652    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1653    /// peer snapshot hashes input is empty.
1654    #[test]
1655    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1656        {
1657            let mut actual = vec![];
1658            let expected = actual.clone();
1659            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1660            assert_eq!(expected, actual);
1661        }
1662        {
1663            let mut actual = vec![];
1664            let expected = actual.clone();
1665            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1666            assert_eq!(expected, actual);
1667        }
1668    }
1669}