Skip to main content

agave_validator/
bootstrap.rs

1use {
2    agave_snapshots::{
3        paths as snapshot_paths, snapshot_archive_info::SnapshotArchiveInfoGetter as _,
4        SnapshotKind,
5    },
6    itertools::Itertools,
7    log::*,
8    rand::{seq::SliceRandom, thread_rng, Rng},
9    rayon::prelude::*,
10    solana_account::ReadableAccount,
11    solana_clock::Slot,
12    solana_commitment_config::CommitmentConfig,
13    solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
14    solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
15    solana_genesis_utils::download_then_check_genesis_hash,
16    solana_gossip::{
17        cluster_info::ClusterInfo,
18        contact_info::{ContactInfo, Protocol},
19        crds_data,
20        gossip_service::GossipService,
21        node::Node,
22    },
23    solana_hash::Hash,
24    solana_keypair::Keypair,
25    solana_metrics::datapoint_info,
26    solana_pubkey::Pubkey,
27    solana_rpc_client::rpc_client::RpcClient,
28    solana_signer::Signer,
29    solana_streamer::socket::SocketAddrSpace,
30    solana_vote_program::vote_state::VoteStateV4,
31    std::{
32        collections::{hash_map::RandomState, HashMap, HashSet},
33        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
34        path::Path,
35        process::exit,
36        sync::{
37            atomic::{AtomicBool, Ordering},
38            Arc, RwLock,
39        },
40        time::{Duration, Instant},
41    },
42    thiserror::Error,
43};
44
45/// When downloading snapshots, wait at most this long for snapshot hashes from
46/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
47/// known validator.
48const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
49/// If we don't have any alternative peers after this long, better off trying
50/// blacklisted peers again.
51const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
52/// If we can't find a good snapshot download candidate after this time, just
53/// give up.
54const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
55/// If we haven't found any RPC peers after this time, just give up.
56const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
57
58pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
59
60pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
61
62#[derive(Debug, PartialEq, Clone)]
63pub struct RpcBootstrapConfig {
64    pub no_genesis_fetch: bool,
65    pub no_snapshot_fetch: bool,
66    pub only_known_rpc: bool,
67    pub max_genesis_archive_unpacked_size: u64,
68    pub check_vote_account: Option<String>,
69    pub incremental_snapshot_fetch: bool,
70}
71
72fn verify_reachable_ports(
73    node: &Node,
74    cluster_entrypoint: &ContactInfo,
75    validator_config: &ValidatorConfig,
76    socket_addr_space: &SocketAddrSpace,
77) -> bool {
78    let verify_address = |addr: &Option<SocketAddr>| -> bool {
79        addr.as_ref()
80            .map(|addr| socket_addr_space.check(addr))
81            .unwrap_or_default()
82    };
83
84    let mut udp_sockets = vec![&node.sockets.repair];
85    udp_sockets.extend(node.sockets.gossip.iter());
86
87    if verify_address(&node.info.serve_repair(Protocol::UDP)) {
88        udp_sockets.push(&node.sockets.serve_repair);
89    }
90    if verify_address(&node.info.tpu(Protocol::UDP)) {
91        udp_sockets.extend(node.sockets.tpu.iter());
92        udp_sockets.extend(&node.sockets.tpu_quic);
93    }
94    if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
95        udp_sockets.extend(node.sockets.tpu_forwards.iter());
96        udp_sockets.extend(&node.sockets.tpu_forwards_quic);
97    }
98    if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
99        udp_sockets.extend(node.sockets.tpu_vote.iter());
100    }
101    if verify_address(&node.info.tvu(Protocol::UDP)) {
102        udp_sockets.extend(node.sockets.tvu.iter());
103        udp_sockets.extend(node.sockets.broadcast.iter());
104        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
105    }
106    if !solana_net_utils::verify_all_reachable_udp(
107        &cluster_entrypoint.gossip().unwrap(),
108        &udp_sockets,
109    ) {
110        return false;
111    }
112
113    let mut tcp_listeners = vec![];
114    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
115        for (purpose, bind_addr, public_addr) in &[
116            ("RPC", rpc_addr, node.info.rpc()),
117            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
118        ] {
119            if verify_address(public_addr) {
120                tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
121                    error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
122                    exit(1);
123                }));
124            }
125        }
126    }
127
128    if let Some(ip_echo) = &node.sockets.ip_echo {
129        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
130        tcp_listeners.push(ip_echo);
131    }
132
133    solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
134}
135
136fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
137    if let Some(known_validators) = known_validators {
138        known_validators.contains(id)
139    } else {
140        false
141    }
142}
143
144#[allow(clippy::too_many_arguments)]
145fn start_gossip_node(
146    identity_keypair: Arc<Keypair>,
147    cluster_entrypoints: &[ContactInfo],
148    known_validators: Option<HashSet<Pubkey>>,
149    ledger_path: &Path,
150    gossip_addr: &SocketAddr,
151    gossip_sockets: Arc<[UdpSocket]>,
152    expected_shred_version: u16,
153    gossip_validators: Option<HashSet<Pubkey>>,
154    should_check_duplicate_instance: bool,
155    socket_addr_space: SocketAddrSpace,
156) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
157    let contact_info = ClusterInfo::gossip_contact_info(
158        identity_keypair.pubkey(),
159        *gossip_addr,
160        expected_shred_version,
161    );
162    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
163    if let Some(known_validators) = known_validators {
164        cluster_info
165            .set_trim_keep_pubkeys(known_validators)
166            .expect("set_trim_keep_pubkeys should succeed as ClusterInfo was just created");
167    }
168    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
169    cluster_info.restore_contact_info(ledger_path, 0);
170    let cluster_info = Arc::new(cluster_info);
171
172    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
173    let gossip_service = GossipService::new(
174        &cluster_info,
175        None,
176        gossip_sockets,
177        gossip_validators,
178        should_check_duplicate_instance,
179        None,
180        gossip_exit_flag.clone(),
181    );
182    (cluster_info, gossip_exit_flag, gossip_service)
183}
184
185fn get_rpc_peers(
186    cluster_info: &ClusterInfo,
187    validator_config: &ValidatorConfig,
188    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
189    blacklist_timeout: &Instant,
190    retry_reason: &mut Option<String>,
191    bootstrap_config: &RpcBootstrapConfig,
192) -> Vec<ContactInfo> {
193    let shred_version = validator_config
194        .expected_shred_version
195        .unwrap_or_else(|| cluster_info.my_shred_version());
196
197    info!(
198        "Searching for an RPC service with shred version {shred_version}{}...",
199        retry_reason
200            .as_ref()
201            .map(|s| format!(" (Retrying: {s})"))
202            .unwrap_or_default()
203    );
204
205    let mut rpc_peers = cluster_info.rpc_peers();
206    if bootstrap_config.only_known_rpc {
207        rpc_peers.retain(|rpc_peer| {
208            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
209        });
210    }
211
212    let rpc_peers_total = rpc_peers.len();
213
214    // Filter out blacklisted nodes
215    let rpc_peers: Vec<_> = rpc_peers
216        .into_iter()
217        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
218        .collect();
219    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
220    let rpc_known_peers = rpc_peers
221        .iter()
222        .filter(|rpc_peer| {
223            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
224        })
225        .count();
226
227    info!(
228        "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
229         {rpc_peers_blacklisted} blacklisted"
230    );
231
232    if rpc_peers_blacklisted == rpc_peers_total {
233        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
234            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
235        {
236            // All nodes are blacklisted and no additional nodes recently discovered.
237            // Remove all nodes from the blacklist and try them again.
238            blacklisted_rpc_nodes.clear();
239            Some("Blacklist timeout expired".to_owned())
240        } else {
241            Some("Wait for known rpc peers".to_owned())
242        };
243        return vec![];
244    }
245    rpc_peers
246}
247
248fn check_vote_account(
249    rpc_client: &RpcClient,
250    identity_pubkey: &Pubkey,
251    vote_account_address: &Pubkey,
252    authorized_voter_pubkeys: &[Pubkey],
253) -> Result<(), String> {
254    let vote_account = rpc_client
255        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
256        .map_err(|err| format!("failed to fetch vote account: {err}"))?
257        .value
258        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
259
260    if vote_account.owner != solana_vote_program::id() {
261        return Err(format!(
262            "not a vote account (owned by {}): {}",
263            vote_account.owner, vote_account_address
264        ));
265    }
266
267    let identity_account = rpc_client
268        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
269        .map_err(|err| format!("failed to fetch identity account: {err}"))?
270        .value
271        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
272
273    let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
274    if let Some(vote_state) = vote_state {
275        if vote_state.authorized_voters.is_empty() {
276            return Err("Vote account not yet initialized".to_string());
277        }
278
279        if vote_state.node_pubkey != *identity_pubkey {
280            return Err(format!(
281                "vote account's identity ({}) does not match the validator's identity {}).",
282                vote_state.node_pubkey, identity_pubkey
283            ));
284        }
285
286        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
287            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
288                return Err(format!(
289                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
290                ));
291            }
292        }
293    } else {
294        return Err(format!(
295            "invalid vote account data for {vote_account_address}"
296        ));
297    }
298
299    // Maybe we can calculate minimum voting fee; rather than 1 lamport
300    if identity_account.lamports <= 1 {
301        return Err(format!(
302            "underfunded identity account ({}): only {} lamports available",
303            identity_pubkey, identity_account.lamports
304        ));
305    }
306
307    Ok(())
308}
309
310#[derive(Error, Debug)]
311pub enum GetRpcNodeError {
312    #[error("Unable to find any RPC peers")]
313    NoRpcPeersFound,
314
315    #[error("Giving up, did not get newer snapshots from the cluster")]
316    NoNewerSnapshots,
317}
318
319/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
320/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
321/// snapshots to download.
322#[derive(Debug)]
323struct GetRpcNodeResult {
324    rpc_contact_info: ContactInfo,
325    snapshot_hash: Option<SnapshotHash>,
326}
327
328/// Struct to wrap the peers & snapshot hashes together.
329#[derive(Debug, PartialEq, Eq, Clone)]
330struct PeerSnapshotHash {
331    rpc_contact_info: ContactInfo,
332    snapshot_hash: SnapshotHash,
333}
334
335/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
336/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
337#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
338pub struct SnapshotHash {
339    full: (Slot, Hash),
340    incr: Option<(Slot, Hash)>,
341}
342
343pub fn fail_rpc_node(
344    err: String,
345    known_validators: &Option<HashSet<Pubkey, RandomState>>,
346    rpc_id: &Pubkey,
347    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
348) {
349    warn!("{err}");
350    if let Some(ref known_validators) = known_validators {
351        if known_validators.contains(rpc_id) {
352            return;
353        }
354    }
355
356    info!("Excluding {rpc_id} as a future RPC candidate");
357    blacklisted_rpc_nodes.insert(*rpc_id);
358}
359
360fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
361    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
362    cluster_info.save_contact_info();
363    gossip_exit_flag.store(true, Ordering::Relaxed);
364    gossip_service.join().unwrap();
365}
366
367#[allow(clippy::too_many_arguments)]
368pub fn attempt_download_genesis_and_snapshot(
369    rpc_contact_info: &ContactInfo,
370    ledger_path: &Path,
371    validator_config: &mut ValidatorConfig,
372    bootstrap_config: &RpcBootstrapConfig,
373    use_progress_bar: bool,
374    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
375    rpc_client: &RpcClient,
376    maximum_local_snapshot_age: Slot,
377    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
378    minimal_snapshot_download_speed: f32,
379    maximum_snapshot_download_abort: u64,
380    download_abort_count: &mut u64,
381    snapshot_hash: Option<SnapshotHash>,
382    identity_keypair: &Arc<Keypair>,
383    vote_account: &Pubkey,
384    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
385) -> Result<(), String> {
386    download_then_check_genesis_hash(
387        &rpc_contact_info
388            .rpc()
389            .ok_or_else(|| String::from("Invalid RPC address"))?,
390        ledger_path,
391        &mut validator_config.expected_genesis_hash,
392        bootstrap_config.max_genesis_archive_unpacked_size,
393        bootstrap_config.no_genesis_fetch,
394        use_progress_bar,
395        rpc_client,
396    )?;
397
398    if let Some(gossip) = gossip.take() {
399        shutdown_gossip_service(gossip);
400    }
401
402    let rpc_client_slot = rpc_client
403        .get_slot_with_commitment(CommitmentConfig::finalized())
404        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
405    info!("RPC node root slot: {rpc_client_slot}");
406
407    download_snapshots(
408        validator_config,
409        bootstrap_config,
410        use_progress_bar,
411        maximum_local_snapshot_age,
412        start_progress,
413        minimal_snapshot_download_speed,
414        maximum_snapshot_download_abort,
415        download_abort_count,
416        snapshot_hash,
417        rpc_contact_info,
418    )?;
419
420    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
421        let rpc_client = RpcClient::new(url);
422        check_vote_account(
423            &rpc_client,
424            &identity_keypair.pubkey(),
425            vote_account,
426            &authorized_voter_keypairs
427                .read()
428                .unwrap()
429                .iter()
430                .map(|k| k.pubkey())
431                .collect::<Vec<_>>(),
432        )
433        .unwrap_or_else(|err| {
434            // Consider failures here to be more likely due to user error (eg,
435            // incorrect `agave-validator` command-line arguments) rather than the
436            // RPC node failing.
437            //
438            // Power users can always use the `--no-check-vote-account` option to
439            // bypass this check entirely
440            error!("{err}");
441            exit(1);
442        });
443    }
444    Ok(())
445}
446
447/// simple ping helper function which returns the time to connect
448fn ping(addr: &SocketAddr) -> Option<Duration> {
449    let start = Instant::now();
450    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
451        Ok(_) => Some(start.elapsed()),
452        Err(_) => None,
453    }
454}
455
456// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
457// used for downloading latest snapshots and/or the genesis block. Guaranteed to
458// find at least one viable node or terminate the process.
459fn get_vetted_rpc_nodes(
460    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
461    cluster_info: &Arc<ClusterInfo>,
462    validator_config: &ValidatorConfig,
463    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
464    bootstrap_config: &RpcBootstrapConfig,
465) {
466    while vetted_rpc_nodes.is_empty() {
467        let rpc_node_details = match get_rpc_nodes(
468            cluster_info,
469            validator_config,
470            blacklisted_rpc_nodes,
471            bootstrap_config,
472        ) {
473            Ok(rpc_node_details) => rpc_node_details,
474            Err(err) => {
475                error!(
476                    "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
477                     `--no-port-check`, or adjusting `--known-validator ...` arguments as \
478                     applicable"
479                );
480                exit(1);
481            }
482        };
483
484        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
485        vetted_rpc_nodes.extend(
486            rpc_node_details
487                .into_par_iter()
488                .filter_map(|rpc_node_details| {
489                    let GetRpcNodeResult {
490                        rpc_contact_info,
491                        snapshot_hash,
492                    } = rpc_node_details;
493
494                    info!(
495                        "Using RPC service from node {}: {:?}",
496                        rpc_contact_info.pubkey(),
497                        rpc_contact_info.rpc()
498                    );
499
500                    let rpc_addr = rpc_contact_info.rpc()?;
501                    let ping_time = ping(&rpc_addr);
502
503                    let rpc_client =
504                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
505
506                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
507                })
508                .filter(
509                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
510                        .get_version()
511                    {
512                        Ok(rpc_version) => {
513                            if let Some(ping_time) = ping_time {
514                                info!(
515                                    "RPC node version: {} Ping: {}ms",
516                                    rpc_version.solana_core,
517                                    ping_time.as_millis()
518                                );
519                                true
520                            } else {
521                                fail_rpc_node(
522                                    "Failed to ping RPC".to_string(),
523                                    &validator_config.known_validators,
524                                    rpc_contact_info.pubkey(),
525                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
526                                );
527                                false
528                            }
529                        }
530                        Err(err) => {
531                            fail_rpc_node(
532                                format!("Failed to get RPC node version: {err}"),
533                                &validator_config.known_validators,
534                                rpc_contact_info.pubkey(),
535                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
536                            );
537                            false
538                        }
539                    },
540                )
541                .collect::<Vec<(
542                    ContactInfo,
543                    Option<SnapshotHash>,
544                    RpcClient,
545                    Option<Duration>,
546                )>>()
547                .into_iter()
548                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
549                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
550                    (rpc_contact_info, snapshot_hash, rpc_client)
551                })
552                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
553        );
554        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
555    }
556}
557
558#[allow(clippy::too_many_arguments)]
559pub fn rpc_bootstrap(
560    node: &Node,
561    identity_keypair: &Arc<Keypair>,
562    ledger_path: &Path,
563    vote_account: &Pubkey,
564    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
565    cluster_entrypoints: &[ContactInfo],
566    validator_config: &mut ValidatorConfig,
567    bootstrap_config: RpcBootstrapConfig,
568    do_port_check: bool,
569    use_progress_bar: bool,
570    maximum_local_snapshot_age: Slot,
571    should_check_duplicate_instance: bool,
572    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
573    minimal_snapshot_download_speed: f32,
574    maximum_snapshot_download_abort: u64,
575    socket_addr_space: SocketAddrSpace,
576) {
577    if do_port_check {
578        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
579        order.shuffle(&mut thread_rng());
580        if order.into_iter().all(|i| {
581            !verify_reachable_ports(
582                node,
583                &cluster_entrypoints[i],
584                validator_config,
585                &socket_addr_space,
586            )
587        }) {
588            exit(1);
589        }
590    }
591
592    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
593        return;
594    }
595
596    let total_snapshot_download_time = Instant::now();
597    let mut get_rpc_nodes_time = Duration::new(0, 0);
598    let mut snapshot_download_time = Duration::new(0, 0);
599    let mut blacklisted_rpc_nodes = HashSet::new();
600    let mut gossip = None;
601    let mut vetted_rpc_nodes = vec![];
602    let mut download_abort_count = 0;
603    loop {
604        if gossip.is_none() {
605            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
606
607            gossip = Some(start_gossip_node(
608                identity_keypair.clone(),
609                cluster_entrypoints,
610                validator_config.known_validators.clone(),
611                ledger_path,
612                &node
613                    .info
614                    .gossip()
615                    .expect("Operator must spin up node with valid gossip address"),
616                node.sockets.gossip.clone(),
617                validator_config
618                    .expected_shred_version
619                    .expect("expected_shred_version should not be None"),
620                validator_config.gossip_validators.clone(),
621                should_check_duplicate_instance,
622                socket_addr_space,
623            ));
624        }
625
626        let get_rpc_nodes_start = Instant::now();
627        get_vetted_rpc_nodes(
628            &mut vetted_rpc_nodes,
629            &gossip.as_ref().unwrap().0,
630            validator_config,
631            &mut blacklisted_rpc_nodes,
632            &bootstrap_config,
633        );
634        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
635        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
636
637        let snapshot_download_start = Instant::now();
638        let download_result = attempt_download_genesis_and_snapshot(
639            &rpc_contact_info,
640            ledger_path,
641            validator_config,
642            &bootstrap_config,
643            use_progress_bar,
644            &mut gossip,
645            &rpc_client,
646            maximum_local_snapshot_age,
647            start_progress,
648            minimal_snapshot_download_speed,
649            maximum_snapshot_download_abort,
650            &mut download_abort_count,
651            snapshot_hash,
652            identity_keypair,
653            vote_account,
654            authorized_voter_keypairs.clone(),
655        );
656        snapshot_download_time += snapshot_download_start.elapsed();
657        match download_result {
658            Ok(()) => break,
659            Err(err) => {
660                fail_rpc_node(
661                    err,
662                    &validator_config.known_validators,
663                    rpc_contact_info.pubkey(),
664                    &mut blacklisted_rpc_nodes,
665                );
666            }
667        }
668    }
669
670    if let Some(gossip) = gossip.take() {
671        shutdown_gossip_service(gossip);
672    }
673
674    datapoint_info!(
675        "bootstrap-snapshot-download",
676        (
677            "total_time_secs",
678            total_snapshot_download_time.elapsed().as_secs(),
679            i64
680        ),
681        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
682        (
683            "snapshot_download_time_secs",
684            snapshot_download_time.as_secs(),
685            i64
686        ),
687        ("download_abort_count", download_abort_count, i64),
688        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
689    );
690}
691
692/// Get RPC peer node candidates to download from.
693///
694/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
695fn get_rpc_nodes(
696    cluster_info: &ClusterInfo,
697    validator_config: &ValidatorConfig,
698    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
699    bootstrap_config: &RpcBootstrapConfig,
700) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
701    let mut blacklist_timeout = Instant::now();
702    let mut get_rpc_peers_timout = Instant::now();
703    let mut newer_cluster_snapshot_timeout = None;
704    let mut retry_reason = None;
705    loop {
706        // Give gossip some time to populate and not spin on grabbing the crds lock
707        std::thread::sleep(Duration::from_secs(1));
708        info!("\n{}", cluster_info.rpc_info_trace());
709
710        let rpc_peers = get_rpc_peers(
711            cluster_info,
712            validator_config,
713            blacklisted_rpc_nodes,
714            &blacklist_timeout,
715            &mut retry_reason,
716            bootstrap_config,
717        );
718        if rpc_peers.is_empty() {
719            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
720                return Err(GetRpcNodeError::NoRpcPeersFound);
721            }
722            continue;
723        }
724
725        // Reset timeouts if we found any viable RPC peers.
726        blacklist_timeout = Instant::now();
727        get_rpc_peers_timout = Instant::now();
728        if bootstrap_config.no_snapshot_fetch {
729            let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
730            return Ok(vec![GetRpcNodeResult {
731                rpc_contact_info: random_peer.clone(),
732                snapshot_hash: None,
733            }]);
734        }
735
736        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
737            .as_ref()
738            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
739            .unwrap_or(true)
740        {
741            KnownValidatorsToWaitFor::All
742        } else {
743            KnownValidatorsToWaitFor::Any
744        };
745        let peer_snapshot_hashes = get_peer_snapshot_hashes(
746            cluster_info,
747            &rpc_peers,
748            validator_config.known_validators.as_ref(),
749            known_validators_to_wait_for,
750            bootstrap_config.incremental_snapshot_fetch,
751        );
752        if peer_snapshot_hashes.is_empty() {
753            match newer_cluster_snapshot_timeout {
754                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
755                Some(newer_cluster_snapshot_timeout) => {
756                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
757                        return Err(GetRpcNodeError::NoNewerSnapshots);
758                    }
759                }
760            }
761            retry_reason = Some("No snapshots available".to_owned());
762            continue;
763        } else {
764            let rpc_peers = peer_snapshot_hashes
765                .iter()
766                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
767                .collect::<Vec<_>>();
768            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
769            info!(
770                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
771                final_snapshot_hash
772                    .incr
773                    .map(|(slot, _hash)| slot)
774                    .unwrap_or(final_snapshot_hash.full.0),
775                rpc_peers.len(),
776                if rpc_peers.len() > 1 { "s" } else { "" },
777                rpc_peers,
778            );
779            let rpc_node_results = peer_snapshot_hashes
780                .iter()
781                .map(|peer_snapshot_hash| GetRpcNodeResult {
782                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
783                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
784                })
785                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
786                .collect();
787            return Ok(rpc_node_results);
788        }
789    }
790}
791
792/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
793/// snapshot or an incremental snapshot.
794fn get_highest_local_snapshot_hash(
795    full_snapshot_archives_dir: impl AsRef<Path>,
796    incremental_snapshot_archives_dir: impl AsRef<Path>,
797    incremental_snapshot_fetch: bool,
798) -> Option<(Slot, Hash)> {
799    snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
800        .and_then(|full_snapshot_info| {
801            if incremental_snapshot_fetch {
802                snapshot_paths::get_highest_incremental_snapshot_archive_info(
803                    incremental_snapshot_archives_dir,
804                    full_snapshot_info.slot(),
805                )
806                .map(|incremental_snapshot_info| {
807                    (
808                        incremental_snapshot_info.slot(),
809                        *incremental_snapshot_info.hash(),
810                    )
811                })
812            } else {
813                None
814            }
815            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
816        })
817        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
818}
819
820/// Get peer snapshot hashes
821///
822/// The result is a vector of peers with snapshot hashes that:
823/// 1. match a snapshot hash from the known validators
824/// 2. have the highest incremental snapshot slot
825/// 3. have the highest full snapshot slot of (2)
826fn get_peer_snapshot_hashes(
827    cluster_info: &ClusterInfo,
828    rpc_peers: &[ContactInfo],
829    known_validators: Option<&HashSet<Pubkey>>,
830    known_validators_to_wait_for: KnownValidatorsToWaitFor,
831    incremental_snapshot_fetch: bool,
832) -> Vec<PeerSnapshotHash> {
833    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
834    if let Some(known_validators) = known_validators {
835        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
836            cluster_info,
837            known_validators,
838            known_validators_to_wait_for,
839        );
840        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
841            &known_snapshot_hashes,
842            &mut peer_snapshot_hashes,
843        );
844    }
845    if incremental_snapshot_fetch {
846        // Only filter by highest incremental snapshot slot if we're actually going to download an
847        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
848        // being selected.  For example, if there are two peer snapshot hashes:
849        // (A) full snapshot slot: 100, incremental snapshot slot: 160
850        // (B) full snapshot slot: 150, incremental snapshot slot: None
851        // Then (A) has the highest overall snapshot slot.  But if we're not downloading and
852        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
853        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
854            &mut peer_snapshot_hashes,
855        );
856    }
857    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
858
859    peer_snapshot_hashes
860}
861
862/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
863/// is treated as the base for its set of incremental snapshot hashes.
864type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
865
866/// Get the snapshot hashes from known validators.
867///
868/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
869/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
870/// queried for their individual snapshot hashes, their results will be checked against this
871/// map to verify correctness.
872///
873/// NOTE: Only a single snashot hash is allowed per slot.  If somehow two known validators have
874/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
875/// This applies to both full and incremental snapshot hashes.
876fn get_snapshot_hashes_from_known_validators(
877    cluster_info: &ClusterInfo,
878    known_validators: &HashSet<Pubkey>,
879    known_validators_to_wait_for: KnownValidatorsToWaitFor,
880) -> KnownSnapshotHashes {
881    // Get the snapshot hashes for a node from CRDS
882    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
883
884    if !do_known_validators_have_all_snapshot_hashes(
885        known_validators,
886        known_validators_to_wait_for,
887        get_snapshot_hashes_for_node,
888    ) {
889        debug!(
890            "Snapshot hashes have not been discovered from known validators. This likely means \
891             the gossip tables are not fully populated. We will sleep and retry..."
892        );
893        return KnownSnapshotHashes::default();
894    }
895
896    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
897}
898
899/// Check if we can discover snapshot hashes for the known validators.
900///
901/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
902/// process will download snapshots.
903///
904/// This function will return false if we do not yet have snapshot hashes from known validators;
905/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
906/// based on the `KnownValidatorsToWaitFor` parameter.
907fn do_known_validators_have_all_snapshot_hashes<'a>(
908    known_validators: impl IntoIterator<Item = &'a Pubkey>,
909    known_validators_to_wait_for: KnownValidatorsToWaitFor,
910    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
911) -> bool {
912    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
913
914    match known_validators_to_wait_for {
915        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
916        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
917    }
918}
919
920/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
921/// of them?
922#[derive(Debug, Copy, Clone, Eq, PartialEq)]
923enum KnownValidatorsToWaitFor {
924    All,
925    Any,
926}
927
928/// Build the known snapshot hashes from a set of nodes.
929///
930/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
931/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
932/// runtime information such as the ClusterInfo or ValidatorConfig.
933fn build_known_snapshot_hashes<'a>(
934    nodes: impl IntoIterator<Item = &'a Pubkey>,
935    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
936) -> KnownSnapshotHashes {
937    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
938
939    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
940    /// but *different* hash as the needle.
941    fn is_any_same_slot_and_different_hash<'a>(
942        needle: &(Slot, Hash),
943        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
944    ) -> bool {
945        haystack
946            .into_iter()
947            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
948    }
949
950    'to_next_node: for node in nodes {
951        let Some(SnapshotHash {
952            full: full_snapshot_hash,
953            incr: incremental_snapshot_hash,
954        }) = get_snapshot_hashes_for_node(node)
955        else {
956            continue 'to_next_node;
957        };
958
959        // Do not add this snapshot hash if there's already a full snapshot hash with the
960        // same slot but with a _different_ hash.
961        // NOTE: Nodes should not produce snapshots at the same slot with _different_
962        // hashes.  So if it happens, keep the first and ignore the rest.
963        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
964            warn!(
965                "Ignoring all snapshot hashes from node {node} since we've seen a different full \
966                 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
967            );
968            debug!(
969                "known full snapshot hashes: {:#?}",
970                known_snapshot_hashes.keys(),
971            );
972            continue 'to_next_node;
973        }
974
975        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
976        // doesn't already exist.  This is to ensure we don't overwrite existing
977        // incremental snapshot hashes that may be present for this full snapshot hash.
978        let known_incremental_snapshot_hashes =
979            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
980
981        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
982            // Do not add this snapshot hash if there's already an incremental snapshot
983            // hash with the same slot, but with a _different_ hash.
984            // NOTE: Nodes should not produce snapshots at the same slot with _different_
985            // hashes.  So if it happens, keep the first and ignore the rest.
986            if is_any_same_slot_and_different_hash(
987                &incremental_snapshot_hash,
988                known_incremental_snapshot_hashes.iter(),
989            ) {
990                warn!(
991                    "Ignoring incremental snapshot hash from node {node} since we've seen a \
992                     different incremental snapshot hash with this slot. full snapshot hash: \
993                     {full_snapshot_hash:?}, incremental snapshot hash: \
994                     {incremental_snapshot_hash:?}"
995                );
996                debug!(
997                    "known incremental snapshot hashes based on this slot: {:#?}",
998                    known_incremental_snapshot_hashes.iter(),
999                );
1000                continue 'to_next_node;
1001            }
1002
1003            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1004        };
1005    }
1006
1007    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1008    known_snapshot_hashes
1009}
1010
1011/// Get snapshot hashes from all eligible peers.
1012///
1013/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1014/// This may be just a full snapshot hash, or a combo full snapshot hash and
1015/// incremental snapshot hash.
1016fn get_eligible_peer_snapshot_hashes(
1017    cluster_info: &ClusterInfo,
1018    rpc_peers: &[ContactInfo],
1019) -> Vec<PeerSnapshotHash> {
1020    let peer_snapshot_hashes = rpc_peers
1021        .iter()
1022        .flat_map(|rpc_peer| {
1023            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1024                PeerSnapshotHash {
1025                    rpc_contact_info: rpc_peer.clone(),
1026                    snapshot_hash,
1027                }
1028            })
1029        })
1030        .collect();
1031
1032    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1033    peer_snapshot_hashes
1034}
1035
1036/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1037fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1038    known_snapshot_hashes: &KnownSnapshotHashes,
1039    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1040) {
1041    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1042        known_snapshot_hashes
1043            .get(&peer_snapshot_hash.snapshot_hash.full)
1044            .map(|known_incremental_hashes| {
1045                if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1046                    // If the peer's full snapshot hashes match, but doesn't have any
1047                    // incremental snapshots, that's fine; keep 'em!
1048                    true
1049                } else {
1050                    known_incremental_hashes
1051                        .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1052                }
1053            })
1054            .unwrap_or(false)
1055    });
1056
1057    trace!(
1058        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1059    );
1060}
1061
1062/// Retain the peer snapshot hashes with the highest full snapshot slot
1063fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1064    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1065) {
1066    let highest_full_snapshot_hash = peer_snapshot_hashes
1067        .iter()
1068        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1069        .max_by_key(|(slot, _hash)| *slot);
1070    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1071        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1072        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1073        // will be nothing to compare against within the `retain()` predicate).
1074        return;
1075    };
1076
1077    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1078        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1079    });
1080
1081    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1082}
1083
1084/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1085fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1086    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1087) {
1088    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1089        .iter()
1090        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1091        .max_by_key(|(slot, _hash)| *slot);
1092
1093    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1094        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1095    });
1096
1097    trace!(
1098        "retain peer snapshot hashes with highest incremental snapshot slot: \
1099         {peer_snapshot_hashes:?}"
1100    );
1101}
1102
1103/// Check to see if we can use our local snapshots, otherwise download newer ones.
1104#[allow(clippy::too_many_arguments)]
1105fn download_snapshots(
1106    validator_config: &ValidatorConfig,
1107    bootstrap_config: &RpcBootstrapConfig,
1108    use_progress_bar: bool,
1109    maximum_local_snapshot_age: Slot,
1110    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1111    minimal_snapshot_download_speed: f32,
1112    maximum_snapshot_download_abort: u64,
1113    download_abort_count: &mut u64,
1114    snapshot_hash: Option<SnapshotHash>,
1115    rpc_contact_info: &ContactInfo,
1116) -> Result<(), String> {
1117    if snapshot_hash.is_none() {
1118        return Ok(());
1119    }
1120    let SnapshotHash {
1121        full: full_snapshot_hash,
1122        incr: incremental_snapshot_hash,
1123    } = snapshot_hash.unwrap();
1124    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1125    let incremental_snapshot_archives_dir = &validator_config
1126        .snapshot_config
1127        .incremental_snapshot_archives_dir;
1128
1129    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1130    if should_use_local_snapshot(
1131        full_snapshot_archives_dir,
1132        incremental_snapshot_archives_dir,
1133        maximum_local_snapshot_age,
1134        full_snapshot_hash,
1135        incremental_snapshot_hash,
1136        bootstrap_config.incremental_snapshot_fetch,
1137    ) {
1138        return Ok(());
1139    }
1140
1141    // Check and see if we've already got the full snapshot; if not, download it
1142    if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
1143        .into_iter()
1144        .any(|snapshot_archive| {
1145            snapshot_archive.slot() == full_snapshot_hash.0
1146                && snapshot_archive.hash().0 == full_snapshot_hash.1
1147        })
1148    {
1149        info!(
1150            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1151            full_snapshot_hash.0, full_snapshot_hash.1
1152        );
1153    } else {
1154        download_snapshot(
1155            validator_config,
1156            bootstrap_config,
1157            use_progress_bar,
1158            start_progress,
1159            minimal_snapshot_download_speed,
1160            maximum_snapshot_download_abort,
1161            download_abort_count,
1162            rpc_contact_info,
1163            full_snapshot_hash,
1164            SnapshotKind::FullSnapshot,
1165        )?;
1166    }
1167
1168    if bootstrap_config.incremental_snapshot_fetch {
1169        // Check and see if we've already got the incremental snapshot; if not, download it
1170        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1171            if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1172                .into_iter()
1173                .any(|snapshot_archive| {
1174                    snapshot_archive.slot() == incremental_snapshot_hash.0
1175                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1176                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1177                })
1178            {
1179                info!(
1180                    "Incremental snapshot archive already exists locally. Skipping download. \
1181                     slot: {}, hash: {}",
1182                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1183                );
1184            } else {
1185                download_snapshot(
1186                    validator_config,
1187                    bootstrap_config,
1188                    use_progress_bar,
1189                    start_progress,
1190                    minimal_snapshot_download_speed,
1191                    maximum_snapshot_download_abort,
1192                    download_abort_count,
1193                    rpc_contact_info,
1194                    incremental_snapshot_hash,
1195                    SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1196                )?;
1197            }
1198        }
1199    }
1200
1201    Ok(())
1202}
1203
1204/// Download a snapshot
1205#[allow(clippy::too_many_arguments)]
1206fn download_snapshot(
1207    validator_config: &ValidatorConfig,
1208    bootstrap_config: &RpcBootstrapConfig,
1209    use_progress_bar: bool,
1210    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1211    minimal_snapshot_download_speed: f32,
1212    maximum_snapshot_download_abort: u64,
1213    download_abort_count: &mut u64,
1214    rpc_contact_info: &ContactInfo,
1215    desired_snapshot_hash: (Slot, Hash),
1216    snapshot_kind: SnapshotKind,
1217) -> Result<(), String> {
1218    let maximum_full_snapshot_archives_to_retain = validator_config
1219        .snapshot_config
1220        .maximum_full_snapshot_archives_to_retain;
1221    let maximum_incremental_snapshot_archives_to_retain = validator_config
1222        .snapshot_config
1223        .maximum_incremental_snapshot_archives_to_retain;
1224    let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1225    let incremental_snapshot_archives_dir = &validator_config
1226        .snapshot_config
1227        .incremental_snapshot_archives_dir;
1228
1229    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1230        slot: desired_snapshot_hash.0,
1231        rpc_addr: rpc_contact_info
1232            .rpc()
1233            .ok_or_else(|| String::from("Invalid RPC address"))?,
1234    };
1235    let desired_snapshot_hash = (
1236        desired_snapshot_hash.0,
1237        agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1238    );
1239    download_snapshot_archive(
1240        &rpc_contact_info
1241            .rpc()
1242            .ok_or_else(|| String::from("Invalid RPC address"))?,
1243        full_snapshot_archives_dir,
1244        incremental_snapshot_archives_dir,
1245        desired_snapshot_hash,
1246        snapshot_kind,
1247        maximum_full_snapshot_archives_to_retain,
1248        maximum_incremental_snapshot_archives_to_retain,
1249        use_progress_bar,
1250        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1251            debug!("Download progress: {download_progress:?}");
1252            if download_progress.last_throughput < minimal_snapshot_download_speed
1253                && download_progress.notification_count <= 1
1254                && download_progress.percentage_done <= 2_f32
1255                && download_progress.estimated_remaining_time > 60_f32
1256                && *download_abort_count < maximum_snapshot_download_abort
1257            {
1258                if let Some(ref known_validators) = validator_config.known_validators {
1259                    if known_validators.contains(rpc_contact_info.pubkey())
1260                        && known_validators.len() == 1
1261                        && bootstrap_config.only_known_rpc
1262                    {
1263                        warn!(
1264                            "The snapshot download is too slow, throughput: {} < min speed {} \
1265                             bytes/sec, but will NOT abort and try a different node as it is the \
1266                             only known validator and the --only-known-rpc flag is set. Abort \
1267                             count: {}, Progress detail: {:?}",
1268                            download_progress.last_throughput,
1269                            minimal_snapshot_download_speed,
1270                            download_abort_count,
1271                            download_progress,
1272                        );
1273                        return true; // Do not abort download from the one-and-only known validator
1274                    }
1275                }
1276                warn!(
1277                    "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1278                     will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1279                    download_progress.last_throughput,
1280                    minimal_snapshot_download_speed,
1281                    download_abort_count,
1282                    download_progress,
1283                );
1284                *download_abort_count += 1;
1285                false
1286            } else {
1287                true
1288            }
1289        })),
1290    )
1291}
1292
1293/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1294/// will be downloaded.
1295fn should_use_local_snapshot(
1296    full_snapshot_archives_dir: &Path,
1297    incremental_snapshot_archives_dir: &Path,
1298    maximum_local_snapshot_age: Slot,
1299    full_snapshot_hash: (Slot, Hash),
1300    incremental_snapshot_hash: Option<(Slot, Hash)>,
1301    incremental_snapshot_fetch: bool,
1302) -> bool {
1303    let cluster_snapshot_slot = incremental_snapshot_hash
1304        .map(|(slot, _)| slot)
1305        .unwrap_or(full_snapshot_hash.0);
1306
1307    match get_highest_local_snapshot_hash(
1308        full_snapshot_archives_dir,
1309        incremental_snapshot_archives_dir,
1310        incremental_snapshot_fetch,
1311    ) {
1312        None => {
1313            info!(
1314                "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1315                 local snapshot."
1316            );
1317            false
1318        }
1319        Some((local_snapshot_slot, _)) => {
1320            if local_snapshot_slot
1321                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1322            {
1323                info!(
1324                    "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1325                     a snapshot for slot {cluster_snapshot_slot}."
1326                );
1327                true
1328            } else {
1329                info!(
1330                    "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1331                     newer snapshot for slot {cluster_snapshot_slot}."
1332                );
1333                false
1334            }
1335        }
1336    }
1337}
1338
1339/// Get the node's highest snapshot hashes from CRDS
1340fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1341    cluster_info.get_snapshot_hashes_for_node(node).map(
1342        |crds_data::SnapshotHashes {
1343             full, incremental, ..
1344         }| {
1345            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1346            SnapshotHash {
1347                full,
1348                incr: highest_incremental_snapshot_hash,
1349            }
1350        },
1351    )
1352}
1353
1354#[cfg(test)]
1355mod tests {
1356    use super::*;
1357
1358    impl PeerSnapshotHash {
1359        fn new(
1360            rpc_contact_info: ContactInfo,
1361            full_snapshot_hash: (Slot, Hash),
1362            incremental_snapshot_hash: Option<(Slot, Hash)>,
1363        ) -> Self {
1364            Self {
1365                rpc_contact_info,
1366                snapshot_hash: SnapshotHash {
1367                    full: full_snapshot_hash,
1368                    incr: incremental_snapshot_hash,
1369                },
1370            }
1371        }
1372    }
1373
1374    fn default_contact_info_for_tests() -> ContactInfo {
1375        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1376    }
1377
1378    #[test]
1379    fn test_build_known_snapshot_hashes() {
1380        agave_logger::setup();
1381        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1382        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1383
1384        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1385        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1386
1387        // simulate a set of known validators with various snapshot hashes
1388        let oracle = {
1389            let mut oracle = HashMap::new();
1390
1391            for (full, incr) in [
1392                // only a full snapshot
1393                (full_snapshot_hash1, None),
1394                // full and incremental snapshots
1395                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1396                // full and incremental snapshots, with different incremental hash
1397                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1398                // ...and now with different full hashes
1399                (full_snapshot_hash2, None),
1400                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1401                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1402            ] {
1403                // also simulate multiple known validators having the same snapshot hashes
1404                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1405                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1406                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1407            }
1408
1409            // no snapshots at all
1410            oracle.insert(Pubkey::new_unique(), None);
1411            oracle.insert(Pubkey::new_unique(), None);
1412            oracle.insert(Pubkey::new_unique(), None);
1413
1414            oracle
1415        };
1416
1417        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1418
1419        let known_snapshot_hashes =
1420            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1421
1422        // ensure there's only one full snapshot hash, since they all used the same slot and there
1423        // can be only one snapshot hash per slot
1424        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1425        assert_eq!(known_full_snapshot_hashes.len(), 1);
1426        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1427
1428        // and for the same reasons, ensure there is only one incremental snapshot hash
1429        let known_incremental_snapshot_hashes =
1430            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1431        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1432        let known_incremental_snapshot_hash =
1433            known_incremental_snapshot_hashes.iter().next().unwrap();
1434
1435        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1436        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1437        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1438        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1439        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1440        // "first" when building the known snapshot hashes.
1441        assert!(
1442            known_full_snapshot_hash == &full_snapshot_hash1
1443                || known_full_snapshot_hash == &full_snapshot_hash2
1444        );
1445        assert!(
1446            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1447                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1448        );
1449    }
1450
1451    #[test]
1452    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1453        let known_snapshot_hashes: KnownSnapshotHashes = [
1454            (
1455                (200_000, Hash::new_unique()),
1456                [
1457                    (200_200, Hash::new_unique()),
1458                    (200_400, Hash::new_unique()),
1459                    (200_600, Hash::new_unique()),
1460                    (200_800, Hash::new_unique()),
1461                ]
1462                .iter()
1463                .cloned()
1464                .collect(),
1465            ),
1466            (
1467                (300_000, Hash::new_unique()),
1468                [
1469                    (300_200, Hash::new_unique()),
1470                    (300_400, Hash::new_unique()),
1471                    (300_600, Hash::new_unique()),
1472                ]
1473                .iter()
1474                .cloned()
1475                .collect(),
1476            ),
1477        ]
1478        .iter()
1479        .cloned()
1480        .collect();
1481
1482        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1483        let known_full_snapshot_hash = known_snapshot_hash.0;
1484        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1485
1486        let contact_info = default_contact_info_for_tests();
1487        let peer_snapshot_hashes = vec![
1488            // bad full snapshot hash, no incremental snapshot hash
1489            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1490            // bad everything
1491            PeerSnapshotHash::new(
1492                contact_info.clone(),
1493                (111_000, Hash::default()),
1494                Some((111_111, Hash::default())),
1495            ),
1496            // good full snapshot hash, no incremental snapshot hash
1497            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1498            // bad full snapshot hash, good (not possible) incremental snapshot hash
1499            PeerSnapshotHash::new(
1500                contact_info.clone(),
1501                (111_000, Hash::default()),
1502                Some(*known_incremental_snapshot_hash),
1503            ),
1504            // good full snapshot hash, bad incremental snapshot hash
1505            PeerSnapshotHash::new(
1506                contact_info.clone(),
1507                *known_full_snapshot_hash,
1508                Some((111_111, Hash::default())),
1509            ),
1510            // good everything
1511            PeerSnapshotHash::new(
1512                contact_info.clone(),
1513                *known_full_snapshot_hash,
1514                Some(*known_incremental_snapshot_hash),
1515            ),
1516        ];
1517
1518        let expected = vec![
1519            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1520            PeerSnapshotHash::new(
1521                contact_info,
1522                *known_full_snapshot_hash,
1523                Some(*known_incremental_snapshot_hash),
1524            ),
1525        ];
1526        let mut actual = peer_snapshot_hashes;
1527        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1528            &known_snapshot_hashes,
1529            &mut actual,
1530        );
1531        assert_eq!(expected, actual);
1532    }
1533
1534    #[test]
1535    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1536        let contact_info = default_contact_info_for_tests();
1537        let peer_snapshot_hashes = vec![
1538            // old
1539            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1540            PeerSnapshotHash::new(
1541                contact_info.clone(),
1542                (100_000, Hash::default()),
1543                Some((100_100, Hash::default())),
1544            ),
1545            PeerSnapshotHash::new(
1546                contact_info.clone(),
1547                (100_000, Hash::default()),
1548                Some((100_200, Hash::default())),
1549            ),
1550            PeerSnapshotHash::new(
1551                contact_info.clone(),
1552                (100_000, Hash::default()),
1553                Some((100_300, Hash::default())),
1554            ),
1555            // new
1556            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1557            PeerSnapshotHash::new(
1558                contact_info.clone(),
1559                (200_000, Hash::default()),
1560                Some((200_100, Hash::default())),
1561            ),
1562            PeerSnapshotHash::new(
1563                contact_info.clone(),
1564                (200_000, Hash::default()),
1565                Some((200_200, Hash::default())),
1566            ),
1567            PeerSnapshotHash::new(
1568                contact_info.clone(),
1569                (200_000, Hash::default()),
1570                Some((200_300, Hash::default())),
1571            ),
1572        ];
1573
1574        let expected = vec![
1575            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1576            PeerSnapshotHash::new(
1577                contact_info.clone(),
1578                (200_000, Hash::default()),
1579                Some((200_100, Hash::default())),
1580            ),
1581            PeerSnapshotHash::new(
1582                contact_info.clone(),
1583                (200_000, Hash::default()),
1584                Some((200_200, Hash::default())),
1585            ),
1586            PeerSnapshotHash::new(
1587                contact_info,
1588                (200_000, Hash::default()),
1589                Some((200_300, Hash::default())),
1590            ),
1591        ];
1592        let mut actual = peer_snapshot_hashes;
1593        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1594        assert_eq!(expected, actual);
1595    }
1596
1597    #[test]
1598    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1599        let contact_info = default_contact_info_for_tests();
1600        let peer_snapshot_hashes = vec![
1601            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1602            PeerSnapshotHash::new(
1603                contact_info.clone(),
1604                (200_000, Hash::default()),
1605                Some((200_100, Hash::default())),
1606            ),
1607            PeerSnapshotHash::new(
1608                contact_info.clone(),
1609                (200_000, Hash::default()),
1610                Some((200_200, Hash::default())),
1611            ),
1612            PeerSnapshotHash::new(
1613                contact_info.clone(),
1614                (200_000, Hash::default()),
1615                Some((200_300, Hash::default())),
1616            ),
1617            PeerSnapshotHash::new(
1618                contact_info.clone(),
1619                (200_000, Hash::default()),
1620                Some((200_010, Hash::default())),
1621            ),
1622            PeerSnapshotHash::new(
1623                contact_info.clone(),
1624                (200_000, Hash::default()),
1625                Some((200_020, Hash::default())),
1626            ),
1627            PeerSnapshotHash::new(
1628                contact_info.clone(),
1629                (200_000, Hash::default()),
1630                Some((200_030, Hash::default())),
1631            ),
1632        ];
1633
1634        let expected = vec![PeerSnapshotHash::new(
1635            contact_info,
1636            (200_000, Hash::default()),
1637            Some((200_300, Hash::default())),
1638        )];
1639        let mut actual = peer_snapshot_hashes;
1640        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1641        assert_eq!(expected, actual);
1642    }
1643
1644    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1645    /// there are *zero* peers with incremental snapshots.
1646    #[test]
1647    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1648        let contact_info = default_contact_info_for_tests();
1649        let peer_snapshot_hashes = vec![
1650            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1651            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1652            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1653        ];
1654
1655        let expected = peer_snapshot_hashes.clone();
1656        let mut actual = peer_snapshot_hashes;
1657        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1658        assert_eq!(expected, actual);
1659    }
1660
1661    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1662    /// peer snapshot hashes input is empty.
1663    #[test]
1664    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1665        {
1666            let mut actual = vec![];
1667            let expected = actual.clone();
1668            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1669            assert_eq!(expected, actual);
1670        }
1671        {
1672            let mut actual = vec![];
1673            let expected = actual.clone();
1674            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1675            assert_eq!(expected, actual);
1676        }
1677    }
1678}