miraland_validator/
bootstrap.rs

1use {
2    itertools::Itertools,
3    log::*,
4    miraland_core::validator::{ValidatorConfig, ValidatorStartProgress},
5    miraland_download_utils::{download_snapshot_archive, DownloadProgressRecord},
6    miraland_genesis_utils::download_then_check_genesis_hash,
7    miraland_gossip::{
8        cluster_info::{ClusterInfo, Node},
9        contact_info::Protocol,
10        crds_value,
11        gossip_service::GossipService,
12        legacy_contact_info::LegacyContactInfo as ContactInfo,
13    },
14    miraland_metrics::datapoint_info,
15    miraland_rpc_client::rpc_client::RpcClient,
16    miraland_streamer::socket::SocketAddrSpace,
17    rand::{seq::SliceRandom, thread_rng, Rng},
18    rayon::prelude::*,
19    miraland_runtime::{
20        snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
21        snapshot_utils,
22    },
23    miraland_sdk::{
24        clock::Slot,
25        commitment_config::CommitmentConfig,
26        hash::Hash,
27        pubkey::Pubkey,
28        signature::{Keypair, Signer},
29    },
30    std::{
31        collections::{hash_map::RandomState, HashMap, HashSet},
32        net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
33        path::Path,
34        process::exit,
35        sync::{
36            atomic::{AtomicBool, Ordering},
37            Arc, RwLock,
38        },
39        time::{Duration, Instant},
40    },
41    thiserror::Error,
42};
43
44/// When downloading snapshots, wait at most this long for snapshot hashes from
45/// _all_ known validators.  Afterwards, wait for snapshot hashes from _any_
46/// known validator.
47const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
48/// If we don't have any alternative peers after this long, better off trying
49/// blacklisted peers again.
50const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
51/// If we can't find a good snapshot download candidate after this time, just
52/// give up.
53const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
54/// If we haven't found any RPC peers after this time, just give up.
55const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
56
57pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
58
59pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
60
61#[derive(Debug)]
62pub struct RpcBootstrapConfig {
63    pub no_genesis_fetch: bool,
64    pub no_snapshot_fetch: bool,
65    pub only_known_rpc: bool,
66    pub max_genesis_archive_unpacked_size: u64,
67    pub check_vote_account: Option<String>,
68    pub incremental_snapshot_fetch: bool,
69}
70
71fn verify_reachable_ports(
72    node: &Node,
73    cluster_entrypoint: &ContactInfo,
74    validator_config: &ValidatorConfig,
75    socket_addr_space: &SocketAddrSpace,
76) -> bool {
77    let verify_address = |addr: &Option<SocketAddr>| -> bool {
78        addr.as_ref()
79            .map(|addr| socket_addr_space.check(addr))
80            .unwrap_or_default()
81    };
82    let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
83
84    if verify_address(&node.info.serve_repair(Protocol::UDP).ok()) {
85        udp_sockets.push(&node.sockets.serve_repair);
86    }
87    if verify_address(&node.info.tpu(Protocol::UDP).ok()) {
88        udp_sockets.extend(node.sockets.tpu.iter());
89        udp_sockets.push(&node.sockets.tpu_quic);
90    }
91    if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) {
92        udp_sockets.extend(node.sockets.tpu_forwards.iter());
93        udp_sockets.push(&node.sockets.tpu_forwards_quic);
94    }
95    if verify_address(&node.info.tpu_vote().ok()) {
96        udp_sockets.extend(node.sockets.tpu_vote.iter());
97    }
98    if verify_address(&node.info.tvu(Protocol::UDP).ok()) {
99        udp_sockets.extend(node.sockets.tvu.iter());
100        udp_sockets.extend(node.sockets.broadcast.iter());
101        udp_sockets.extend(node.sockets.retransmit_sockets.iter());
102    }
103
104    let mut tcp_listeners = vec![];
105    if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
106        for (purpose, bind_addr, public_addr) in &[
107            ("RPC", rpc_addr, node.info.rpc()),
108            ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
109        ] {
110            if verify_address(&public_addr.as_ref().ok().copied()) {
111                tcp_listeners.push((
112                    bind_addr.port(),
113                    TcpListener::bind(bind_addr).unwrap_or_else(|err| {
114                        error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
115                        exit(1);
116                    }),
117                ));
118            }
119        }
120    }
121
122    if let Some(ip_echo) = &node.sockets.ip_echo {
123        let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
124        tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo));
125    }
126
127    miraland_net_utils::verify_reachable_ports(
128        &cluster_entrypoint.gossip().unwrap(),
129        tcp_listeners,
130        &udp_sockets,
131    )
132}
133
134fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
135    if let Some(known_validators) = known_validators {
136        known_validators.contains(id)
137    } else {
138        false
139    }
140}
141
142fn start_gossip_node(
143    identity_keypair: Arc<Keypair>,
144    cluster_entrypoints: &[ContactInfo],
145    ledger_path: &Path,
146    gossip_addr: &SocketAddr,
147    gossip_socket: UdpSocket,
148    expected_shred_version: Option<u16>,
149    gossip_validators: Option<HashSet<Pubkey>>,
150    should_check_duplicate_instance: bool,
151    socket_addr_space: SocketAddrSpace,
152) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
153    let contact_info = ClusterInfo::gossip_contact_info(
154        identity_keypair.pubkey(),
155        *gossip_addr,
156        expected_shred_version.unwrap_or(0),
157    );
158    let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
159    cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
160    cluster_info.restore_contact_info(ledger_path, 0);
161    let cluster_info = Arc::new(cluster_info);
162
163    let gossip_exit_flag = Arc::new(AtomicBool::new(false));
164    let gossip_service = GossipService::new(
165        &cluster_info,
166        None,
167        gossip_socket,
168        gossip_validators,
169        should_check_duplicate_instance,
170        None,
171        gossip_exit_flag.clone(),
172    );
173    (cluster_info, gossip_exit_flag, gossip_service)
174}
175
176fn get_rpc_peers(
177    cluster_info: &ClusterInfo,
178    cluster_entrypoints: &[ContactInfo],
179    validator_config: &ValidatorConfig,
180    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
181    blacklist_timeout: &Instant,
182    retry_reason: &mut Option<String>,
183    bootstrap_config: &RpcBootstrapConfig,
184) -> Vec<ContactInfo> {
185    let shred_version = validator_config
186        .expected_shred_version
187        .unwrap_or_else(|| cluster_info.my_shred_version());
188    if shred_version == 0 {
189        let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
190            cluster_entrypoint
191                .gossip()
192                .ok()
193                .and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr))
194                .map_or(false, |entrypoint| entrypoint.shred_version() == 0)
195        });
196
197        if all_zero_shred_versions {
198            eprintln!("Entrypoint shred version is zero.  Restart with --expected-shred-version");
199            exit(1);
200        }
201        info!("Waiting to adopt entrypoint shred version...");
202        return vec![];
203    }
204
205    info!(
206        "Searching for an RPC service with shred version {shred_version}{}...",
207        retry_reason
208            .as_ref()
209            .map(|s| format!(" (Retrying: {s})"))
210            .unwrap_or_default()
211    );
212
213    let mut rpc_peers = cluster_info
214        .all_rpc_peers()
215        .into_iter()
216        .filter(|contact_info| contact_info.shred_version() == shred_version)
217        .collect::<Vec<_>>();
218
219    if bootstrap_config.only_known_rpc {
220        rpc_peers.retain(|rpc_peer| {
221            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
222        });
223    }
224
225    let rpc_peers_total = rpc_peers.len();
226
227    // Filter out blacklisted nodes
228    let rpc_peers: Vec<_> = rpc_peers
229        .into_iter()
230        .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
231        .collect();
232    let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
233    let rpc_known_peers = rpc_peers
234        .iter()
235        .filter(|rpc_peer| {
236            is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
237        })
238        .count();
239
240    info!(
241        "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
242         {rpc_peers_blacklisted} blacklisted"
243    );
244
245    if rpc_peers_blacklisted == rpc_peers_total {
246        *retry_reason = if !blacklisted_rpc_nodes.is_empty()
247            && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
248        {
249            // All nodes are blacklisted and no additional nodes recently discovered.
250            // Remove all nodes from the blacklist and try them again.
251            blacklisted_rpc_nodes.clear();
252            Some("Blacklist timeout expired".to_owned())
253        } else {
254            Some("Wait for known rpc peers".to_owned())
255        };
256        return vec![];
257    }
258    rpc_peers
259}
260
261fn check_vote_account(
262    rpc_client: &RpcClient,
263    identity_pubkey: &Pubkey,
264    vote_account_address: &Pubkey,
265    authorized_voter_pubkeys: &[Pubkey],
266) -> Result<(), String> {
267    let vote_account = rpc_client
268        .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
269        .map_err(|err| format!("failed to fetch vote account: {err}"))?
270        .value
271        .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
272
273    if vote_account.owner != miraland_vote_program::id() {
274        return Err(format!(
275            "not a vote account (owned by {}): {}",
276            vote_account.owner, vote_account_address
277        ));
278    }
279
280    let identity_account = rpc_client
281        .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
282        .map_err(|err| format!("failed to fetch identity account: {err}"))?
283        .value
284        .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
285
286    let vote_state = miraland_vote_program::vote_state::from(&vote_account);
287    if let Some(vote_state) = vote_state {
288        if vote_state.authorized_voters().is_empty() {
289            return Err("Vote account not yet initialized".to_string());
290        }
291
292        if vote_state.node_pubkey != *identity_pubkey {
293            return Err(format!(
294                "vote account's identity ({}) does not match the validator's identity {}).",
295                vote_state.node_pubkey, identity_pubkey
296            ));
297        }
298
299        for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
300            if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
301                return Err(format!(
302                    "authorized voter {vote_account_authorized_voter_pubkey} not available"
303                ));
304            }
305        }
306    } else {
307        return Err(format!(
308            "invalid vote account data for {vote_account_address}"
309        ));
310    }
311
312    // Maybe we can calculate minimum voting fee; rather than 1 lamport
313    if identity_account.lamports <= 1 {
314        return Err(format!(
315            "underfunded identity account ({}): only {} lamports available",
316            identity_pubkey, identity_account.lamports
317        ));
318    }
319
320    Ok(())
321}
322
323#[derive(Error, Debug)]
324pub enum GetRpcNodeError {
325    #[error("Unable to find any RPC peers")]
326    NoRpcPeersFound,
327
328    #[error("Giving up, did not get newer snapshots from the cluster")]
329    NoNewerSnapshots,
330}
331
332/// Struct to wrap the return value from get_rpc_nodes().  The `rpc_contact_info` is the peer to
333/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
334/// snapshots to download.
335#[derive(Debug)]
336struct GetRpcNodeResult {
337    rpc_contact_info: ContactInfo,
338    snapshot_hash: Option<SnapshotHash>,
339}
340
341/// Struct to wrap the peers & snapshot hashes together.
342#[derive(Debug, PartialEq, Eq, Clone)]
343struct PeerSnapshotHash {
344    rpc_contact_info: ContactInfo,
345    snapshot_hash: SnapshotHash,
346}
347
348/// A snapshot hash.  In this context (bootstrap *with* incremental snapshots), a snapshot hash
349/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
350#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
351pub struct SnapshotHash {
352    full: (Slot, Hash),
353    incr: Option<(Slot, Hash)>,
354}
355
356pub fn fail_rpc_node(
357    err: String,
358    known_validators: &Option<HashSet<Pubkey, RandomState>>,
359    rpc_id: &Pubkey,
360    blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
361) {
362    warn!("{err}");
363    if let Some(ref known_validators) = known_validators {
364        if known_validators.contains(rpc_id) {
365            return;
366        }
367    }
368
369    info!("Excluding {rpc_id} as a future RPC candidate");
370    blacklisted_rpc_nodes.insert(*rpc_id);
371}
372
373fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
374    let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
375    cluster_info.save_contact_info();
376    gossip_exit_flag.store(true, Ordering::Relaxed);
377    gossip_service.join().unwrap();
378}
379
380#[allow(clippy::too_many_arguments)]
381pub fn attempt_download_genesis_and_snapshot(
382    rpc_contact_info: &ContactInfo,
383    ledger_path: &Path,
384    validator_config: &mut ValidatorConfig,
385    bootstrap_config: &RpcBootstrapConfig,
386    use_progress_bar: bool,
387    gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
388    rpc_client: &RpcClient,
389    full_snapshot_archives_dir: &Path,
390    incremental_snapshot_archives_dir: &Path,
391    maximum_local_snapshot_age: Slot,
392    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
393    minimal_snapshot_download_speed: f32,
394    maximum_snapshot_download_abort: u64,
395    download_abort_count: &mut u64,
396    snapshot_hash: Option<SnapshotHash>,
397    identity_keypair: &Arc<Keypair>,
398    vote_account: &Pubkey,
399    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
400) -> Result<(), String> {
401    download_then_check_genesis_hash(
402        &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
403        ledger_path,
404        &mut validator_config.expected_genesis_hash,
405        bootstrap_config.max_genesis_archive_unpacked_size,
406        bootstrap_config.no_genesis_fetch,
407        use_progress_bar,
408        rpc_client,
409    )?;
410
411    if let Some(gossip) = gossip.take() {
412        shutdown_gossip_service(gossip);
413    }
414
415    let rpc_client_slot = rpc_client
416        .get_slot_with_commitment(CommitmentConfig::finalized())
417        .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
418    info!("RPC node root slot: {rpc_client_slot}");
419
420    download_snapshots(
421        full_snapshot_archives_dir,
422        incremental_snapshot_archives_dir,
423        validator_config,
424        bootstrap_config,
425        use_progress_bar,
426        maximum_local_snapshot_age,
427        start_progress,
428        minimal_snapshot_download_speed,
429        maximum_snapshot_download_abort,
430        download_abort_count,
431        snapshot_hash,
432        rpc_contact_info,
433    )?;
434
435    if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
436        let rpc_client = RpcClient::new(url);
437        check_vote_account(
438            &rpc_client,
439            &identity_keypair.pubkey(),
440            vote_account,
441            &authorized_voter_keypairs
442                .read()
443                .unwrap()
444                .iter()
445                .map(|k| k.pubkey())
446                .collect::<Vec<_>>(),
447        )
448        .unwrap_or_else(|err| {
449            // Consider failures here to be more likely due to user error (eg,
450            // incorrect `miraland-validator` command-line arguments) rather than the
451            // RPC node failing.
452            //
453            // Power users can always use the `--no-check-vote-account` option to
454            // bypass this check entirely
455            error!("{err}");
456            exit(1);
457        });
458    }
459    Ok(())
460}
461
462/// simple ping helper function which returns the time to connect
463fn ping(addr: &SocketAddr) -> Option<Duration> {
464    let start = Instant::now();
465    match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
466        Ok(_) => Some(start.elapsed()),
467        Err(_) => None,
468    }
469}
470
471// Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
472// used for downloading latest snapshots and/or the genesis block. Guaranteed to
473// find at least one viable node or terminate the process.
474fn get_vetted_rpc_nodes(
475    vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
476    cluster_info: &Arc<ClusterInfo>,
477    cluster_entrypoints: &[ContactInfo],
478    validator_config: &ValidatorConfig,
479    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
480    bootstrap_config: &RpcBootstrapConfig,
481) {
482    while vetted_rpc_nodes.is_empty() {
483        let rpc_node_details = match get_rpc_nodes(
484            cluster_info,
485            cluster_entrypoints,
486            validator_config,
487            blacklisted_rpc_nodes,
488            bootstrap_config,
489        ) {
490            Ok(rpc_node_details) => rpc_node_details,
491            Err(err) => {
492                error!(
493                    "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
494                    `--no-port-check`, or adjusting `--known-validator ...` arguments as \
495                    applicable"
496                );
497                exit(1);
498            }
499        };
500
501        let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
502        vetted_rpc_nodes.extend(
503            rpc_node_details
504                .into_par_iter()
505                .filter_map(|rpc_node_details| {
506                    let GetRpcNodeResult {
507                        rpc_contact_info,
508                        snapshot_hash,
509                    } = rpc_node_details;
510
511                    info!(
512                        "Using RPC service from node {}: {:?}",
513                        rpc_contact_info.pubkey(),
514                        rpc_contact_info.rpc()
515                    );
516
517                    let rpc_addr = rpc_contact_info.rpc().ok()?;
518                    let ping_time = ping(&rpc_addr);
519
520                    let rpc_client =
521                        RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
522
523                    Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
524                })
525                .filter(
526                    |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
527                        .get_version()
528                    {
529                        Ok(rpc_version) => {
530                            if let Some(ping_time) = ping_time {
531                                info!(
532                                    "RPC node version: {} Ping: {}ms",
533                                    rpc_version.miraland_core,
534                                    ping_time.as_millis()
535                                );
536                                true
537                            } else {
538                                fail_rpc_node(
539                                    "Failed to ping RPC".to_string(),
540                                    &validator_config.known_validators,
541                                    rpc_contact_info.pubkey(),
542                                    &mut newly_blacklisted_rpc_nodes.write().unwrap(),
543                                );
544                                false
545                            }
546                        }
547                        Err(err) => {
548                            fail_rpc_node(
549                                format!("Failed to get RPC node version: {err}"),
550                                &validator_config.known_validators,
551                                rpc_contact_info.pubkey(),
552                                &mut newly_blacklisted_rpc_nodes.write().unwrap(),
553                            );
554                            false
555                        }
556                    },
557                )
558                .collect::<Vec<(
559                    ContactInfo,
560                    Option<SnapshotHash>,
561                    RpcClient,
562                    Option<Duration>,
563                )>>()
564                .into_iter()
565                .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
566                .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
567                    (rpc_contact_info, snapshot_hash, rpc_client)
568                })
569                .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
570        );
571        blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
572    }
573}
574
575#[allow(clippy::too_many_arguments)]
576pub fn rpc_bootstrap(
577    node: &Node,
578    identity_keypair: &Arc<Keypair>,
579    ledger_path: &Path,
580    full_snapshot_archives_dir: &Path,
581    incremental_snapshot_archives_dir: &Path,
582    vote_account: &Pubkey,
583    authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
584    cluster_entrypoints: &[ContactInfo],
585    validator_config: &mut ValidatorConfig,
586    bootstrap_config: RpcBootstrapConfig,
587    do_port_check: bool,
588    use_progress_bar: bool,
589    maximum_local_snapshot_age: Slot,
590    should_check_duplicate_instance: bool,
591    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
592    minimal_snapshot_download_speed: f32,
593    maximum_snapshot_download_abort: u64,
594    socket_addr_space: SocketAddrSpace,
595) {
596    if do_port_check {
597        let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
598        order.shuffle(&mut thread_rng());
599        if order.into_iter().all(|i| {
600            !verify_reachable_ports(
601                node,
602                &cluster_entrypoints[i],
603                validator_config,
604                &socket_addr_space,
605            )
606        }) {
607            exit(1);
608        }
609    }
610
611    if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
612        return;
613    }
614
615    let total_snapshot_download_time = Instant::now();
616    let mut get_rpc_nodes_time = Duration::new(0, 0);
617    let mut snapshot_download_time = Duration::new(0, 0);
618    let mut blacklisted_rpc_nodes = HashSet::new();
619    let mut gossip = None;
620    let mut vetted_rpc_nodes = vec![];
621    let mut download_abort_count = 0;
622    loop {
623        if gossip.is_none() {
624            *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
625
626            gossip = Some(start_gossip_node(
627                identity_keypair.clone(),
628                cluster_entrypoints,
629                ledger_path,
630                &node
631                    .info
632                    .gossip()
633                    .expect("Operator must spin up node with valid gossip address"),
634                node.sockets.gossip.try_clone().unwrap(),
635                validator_config.expected_shred_version,
636                validator_config.gossip_validators.clone(),
637                should_check_duplicate_instance,
638                socket_addr_space,
639            ));
640        }
641
642        let get_rpc_nodes_start = Instant::now();
643        get_vetted_rpc_nodes(
644            &mut vetted_rpc_nodes,
645            &gossip.as_ref().unwrap().0,
646            cluster_entrypoints,
647            validator_config,
648            &mut blacklisted_rpc_nodes,
649            &bootstrap_config,
650        );
651        let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
652        get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
653
654        let snapshot_download_start = Instant::now();
655        let download_result = attempt_download_genesis_and_snapshot(
656            &rpc_contact_info,
657            ledger_path,
658            validator_config,
659            &bootstrap_config,
660            use_progress_bar,
661            &mut gossip,
662            &rpc_client,
663            full_snapshot_archives_dir,
664            incremental_snapshot_archives_dir,
665            maximum_local_snapshot_age,
666            start_progress,
667            minimal_snapshot_download_speed,
668            maximum_snapshot_download_abort,
669            &mut download_abort_count,
670            snapshot_hash,
671            identity_keypair,
672            vote_account,
673            authorized_voter_keypairs.clone(),
674        );
675        snapshot_download_time += snapshot_download_start.elapsed();
676        match download_result {
677            Ok(()) => break,
678            Err(err) => {
679                fail_rpc_node(
680                    err,
681                    &validator_config.known_validators,
682                    rpc_contact_info.pubkey(),
683                    &mut blacklisted_rpc_nodes,
684                );
685            }
686        }
687    }
688
689    if let Some(gossip) = gossip.take() {
690        shutdown_gossip_service(gossip);
691    }
692
693    datapoint_info!(
694        "bootstrap-snapshot-download",
695        (
696            "total_time_secs",
697            total_snapshot_download_time.elapsed().as_secs(),
698            i64
699        ),
700        ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
701        (
702            "snapshot_download_time_secs",
703            snapshot_download_time.as_secs(),
704            i64
705        ),
706        ("download_abort_count", download_abort_count, i64),
707        ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
708    );
709}
710
711/// Get RPC peer node candidates to download from.
712///
713/// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
714fn get_rpc_nodes(
715    cluster_info: &ClusterInfo,
716    cluster_entrypoints: &[ContactInfo],
717    validator_config: &ValidatorConfig,
718    blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
719    bootstrap_config: &RpcBootstrapConfig,
720) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
721    let mut blacklist_timeout = Instant::now();
722    let mut get_rpc_peers_timout = Instant::now();
723    let mut newer_cluster_snapshot_timeout = None;
724    let mut retry_reason = None;
725    loop {
726        // Give gossip some time to populate and not spin on grabbing the crds lock
727        std::thread::sleep(Duration::from_secs(1));
728        info!("\n{}", cluster_info.rpc_info_trace());
729
730        let rpc_peers = get_rpc_peers(
731            cluster_info,
732            cluster_entrypoints,
733            validator_config,
734            blacklisted_rpc_nodes,
735            &blacklist_timeout,
736            &mut retry_reason,
737            bootstrap_config,
738        );
739        if rpc_peers.is_empty() {
740            if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
741                return Err(GetRpcNodeError::NoRpcPeersFound);
742            }
743            continue;
744        }
745
746        // Reset timeouts if we found any viable RPC peers.
747        blacklist_timeout = Instant::now();
748        get_rpc_peers_timout = Instant::now();
749        if bootstrap_config.no_snapshot_fetch {
750            let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
751            return Ok(vec![GetRpcNodeResult {
752                rpc_contact_info: random_peer.clone(),
753                snapshot_hash: None,
754            }]);
755        }
756
757        let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
758            .as_ref()
759            .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
760            .unwrap_or(true)
761        {
762            KnownValidatorsToWaitFor::All
763        } else {
764            KnownValidatorsToWaitFor::Any
765        };
766        let peer_snapshot_hashes = get_peer_snapshot_hashes(
767            cluster_info,
768            &rpc_peers,
769            validator_config.known_validators.as_ref(),
770            known_validators_to_wait_for,
771            bootstrap_config.incremental_snapshot_fetch,
772        );
773        if peer_snapshot_hashes.is_empty() {
774            match newer_cluster_snapshot_timeout {
775                None => newer_cluster_snapshot_timeout = Some(Instant::now()),
776                Some(newer_cluster_snapshot_timeout) => {
777                    if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
778                        return Err(GetRpcNodeError::NoNewerSnapshots);
779                    }
780                }
781            }
782            retry_reason = Some("No snapshots available".to_owned());
783            continue;
784        } else {
785            let rpc_peers = peer_snapshot_hashes
786                .iter()
787                .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
788                .collect::<Vec<_>>();
789            let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
790            info!(
791                "Highest available snapshot slot is {}, available from {} node{}: {:?}",
792                final_snapshot_hash
793                    .incr
794                    .map(|(slot, _hash)| slot)
795                    .unwrap_or(final_snapshot_hash.full.0),
796                rpc_peers.len(),
797                if rpc_peers.len() > 1 { "s" } else { "" },
798                rpc_peers,
799            );
800            let rpc_node_results = peer_snapshot_hashes
801                .iter()
802                .map(|peer_snapshot_hash| GetRpcNodeResult {
803                    rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
804                    snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
805                })
806                .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
807                .collect();
808            return Ok(rpc_node_results);
809        }
810    }
811}
812
813/// Get the Slot and Hash of the local snapshot with the highest slot.  Can be either a full
814/// snapshot or an incremental snapshot.
815fn get_highest_local_snapshot_hash(
816    full_snapshot_archives_dir: impl AsRef<Path>,
817    incremental_snapshot_archives_dir: impl AsRef<Path>,
818    incremental_snapshot_fetch: bool,
819) -> Option<(Slot, Hash)> {
820    snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
821        .and_then(|full_snapshot_info| {
822            if incremental_snapshot_fetch {
823                snapshot_utils::get_highest_incremental_snapshot_archive_info(
824                    incremental_snapshot_archives_dir,
825                    full_snapshot_info.slot(),
826                )
827                .map(|incremental_snapshot_info| {
828                    (
829                        incremental_snapshot_info.slot(),
830                        *incremental_snapshot_info.hash(),
831                    )
832                })
833            } else {
834                None
835            }
836            .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
837        })
838        .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
839}
840
841/// Get peer snapshot hashes
842///
843/// The result is a vector of peers with snapshot hashes that:
844/// 1. match a snapshot hash from the known validators
845/// 2. have the highest incremental snapshot slot
846/// 3. have the highest full snapshot slot of (2)
847fn get_peer_snapshot_hashes(
848    cluster_info: &ClusterInfo,
849    rpc_peers: &[ContactInfo],
850    known_validators: Option<&HashSet<Pubkey>>,
851    known_validators_to_wait_for: KnownValidatorsToWaitFor,
852    incremental_snapshot_fetch: bool,
853) -> Vec<PeerSnapshotHash> {
854    let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
855    if let Some(known_validators) = known_validators {
856        let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
857            cluster_info,
858            known_validators,
859            known_validators_to_wait_for,
860        );
861        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
862            &known_snapshot_hashes,
863            &mut peer_snapshot_hashes,
864        );
865    }
866    if incremental_snapshot_fetch {
867        // Only filter by highest incremental snapshot slot if we're actually going to download an
868        // incremental snapshot.  Otherwise this could remove higher full snapshot slots from
869        // being selected.  For example, if there are two peer snapshot hashes:
870        // (A) full snapshot slot: 100, incremental snapshot slot: 160
871        // (B) full snapshot slot: 150, incremental snapshot slot: None
872        // Then (A) has the highest overall snapshot slot.  But if we're not downlading and
873        // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
874        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
875            &mut peer_snapshot_hashes,
876        );
877    }
878    retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
879
880    peer_snapshot_hashes
881}
882
883/// Map full snapshot hashes to a set of incremental snapshot hashes.  Each full snapshot hash
884/// is treated as the base for its set of incremental snapshot hashes.
885type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
886
887/// Get the snapshot hashes from known validators.
888///
889/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
890/// snapshot hashes.  This map will be used as the "known snapshot hashes"; when peers are
891/// queried for their individual snapshot hashes, their results will be checked against this
892/// map to verify correctness.
893///
894/// NOTE: Only a single snashot hash is allowed per slot.  If somehow two known validators have
895/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
896/// This applies to both full and incremental snapshot hashes.
897fn get_snapshot_hashes_from_known_validators(
898    cluster_info: &ClusterInfo,
899    known_validators: &HashSet<Pubkey>,
900    known_validators_to_wait_for: KnownValidatorsToWaitFor,
901) -> KnownSnapshotHashes {
902    // Get the snapshot hashes for a node from CRDS
903    let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
904
905    if !do_known_validators_have_all_snapshot_hashes(
906        known_validators,
907        known_validators_to_wait_for,
908        get_snapshot_hashes_for_node,
909    ) {
910        debug!(
911            "Snapshot hashes have not been discovered from known validators. This likely means \
912             the gossip tables are not fully populated. We will sleep and retry..."
913        );
914        return KnownSnapshotHashes::default();
915    }
916
917    build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
918}
919
920/// Check if we can discover snapshot hashes for the known validators.
921///
922/// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
923/// process will download snapshots.
924///
925/// This function will return false if we do not yet have snapshot hashes from known validators;
926/// and true otherwise.  Either require snapshot hashes from *all* or *any* of the known validators
927/// based on the `KnownValidatorsToWaitFor` parameter.
928fn do_known_validators_have_all_snapshot_hashes<'a>(
929    known_validators: impl IntoIterator<Item = &'a Pubkey>,
930    known_validators_to_wait_for: KnownValidatorsToWaitFor,
931    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
932) -> bool {
933    let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
934
935    match known_validators_to_wait_for {
936        KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
937        KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
938    }
939}
940
941/// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
942/// of them?
943#[derive(Debug, Copy, Clone, Eq, PartialEq)]
944enum KnownValidatorsToWaitFor {
945    All,
946    Any,
947}
948
949/// Build the known snapshot hashes from a set of nodes.
950///
951/// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
952/// hashes.  This parameter exist to provide a way to test the inner algorithm without needing
953/// runtime information such as the ClusterInfo or ValidatorConfig.
954fn build_known_snapshot_hashes<'a>(
955    nodes: impl IntoIterator<Item = &'a Pubkey>,
956    get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
957) -> KnownSnapshotHashes {
958    let mut known_snapshot_hashes = KnownSnapshotHashes::new();
959
960    /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
961    /// but *different* hash as the needle.
962    fn is_any_same_slot_and_different_hash<'a>(
963        needle: &(Slot, Hash),
964        haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
965    ) -> bool {
966        haystack
967            .into_iter()
968            .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
969    }
970
971    'to_next_node: for node in nodes {
972        let Some(SnapshotHash {
973            full: full_snapshot_hash,
974            incr: incremental_snapshot_hash,
975        }) = get_snapshot_hashes_for_node(node)
976        else {
977            continue 'to_next_node;
978        };
979
980        // Do not add this snapshot hash if there's already a full snapshot hash with the
981        // same slot but with a _different_ hash.
982        // NOTE: Nodes should not produce snapshots at the same slot with _different_
983        // hashes.  So if it happens, keep the first and ignore the rest.
984        if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
985            warn!(
986                "Ignoring all snapshot hashes from node {node} since we've seen a different full \
987                 snapshot hash with this slot.\
988                 \nfull snapshot hash: {full_snapshot_hash:?}"
989            );
990            debug!(
991                "known full snapshot hashes: {:#?}",
992                known_snapshot_hashes.keys(),
993            );
994            continue 'to_next_node;
995        }
996
997        // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
998        // doesn't already exist.  This is to ensure we don't overwrite existing
999        // incremental snapshot hashes that may be present for this full snapshot hash.
1000        let known_incremental_snapshot_hashes =
1001            known_snapshot_hashes.entry(full_snapshot_hash).or_default();
1002
1003        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1004            // Do not add this snapshot hash if there's already an incremental snapshot
1005            // hash with the same slot, but with a _different_ hash.
1006            // NOTE: Nodes should not produce snapshots at the same slot with _different_
1007            // hashes.  So if it happens, keep the first and ignore the rest.
1008            if is_any_same_slot_and_different_hash(
1009                &incremental_snapshot_hash,
1010                known_incremental_snapshot_hashes.iter(),
1011            ) {
1012                warn!(
1013                    "Ignoring incremental snapshot hash from node {node} since we've seen a \
1014                     different incremental snapshot hash with this slot.\
1015                     \nfull snapshot hash: {full_snapshot_hash:?}\
1016                     \nincremental snapshot hash: {incremental_snapshot_hash:?}"
1017                );
1018                debug!(
1019                    "known incremental snapshot hashes based on this slot: {:#?}",
1020                    known_incremental_snapshot_hashes.iter(),
1021                );
1022                continue 'to_next_node;
1023            }
1024
1025            known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1026        };
1027    }
1028
1029    trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1030    known_snapshot_hashes
1031}
1032
1033/// Get snapshot hashes from all eligible peers.
1034///
1035/// This fn will get only one snapshot hash per peer (the one with the highest slot).
1036/// This may be just a full snapshot hash, or a combo full snapshot hash and
1037/// incremental snapshot hash.
1038fn get_eligible_peer_snapshot_hashes(
1039    cluster_info: &ClusterInfo,
1040    rpc_peers: &[ContactInfo],
1041) -> Vec<PeerSnapshotHash> {
1042    let peer_snapshot_hashes = rpc_peers
1043        .iter()
1044        .flat_map(|rpc_peer| {
1045            get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1046                PeerSnapshotHash {
1047                    rpc_contact_info: rpc_peer.clone(),
1048                    snapshot_hash,
1049                }
1050            })
1051        })
1052        .collect();
1053
1054    trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1055    peer_snapshot_hashes
1056}
1057
1058/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
1059fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1060    known_snapshot_hashes: &KnownSnapshotHashes,
1061    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1062) {
1063    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1064        known_snapshot_hashes
1065            .get(&peer_snapshot_hash.snapshot_hash.full)
1066            .map(|known_incremental_hashes| {
1067                if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1068                    // If the peer's full snapshot hashes match, but doesn't have any
1069                    // incremental snapshots, that's fine; keep 'em!
1070                    true
1071                } else {
1072                    known_incremental_hashes
1073                        .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1074                }
1075            })
1076            .unwrap_or(false)
1077    });
1078
1079    trace!(
1080        "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1081    );
1082}
1083
1084/// Retain the peer snapshot hashes with the highest full snapshot slot
1085fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1086    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1087) {
1088    let highest_full_snapshot_hash = peer_snapshot_hashes
1089        .iter()
1090        .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1091        .max_by_key(|(slot, _hash)| *slot);
1092    let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1093        // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
1094        // In that case there's nothing to do (additionally, without a valid 'max' value, there
1095        // will be nothing to compare against within the `retain()` predicate).
1096        return;
1097    };
1098
1099    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1100        peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1101    });
1102
1103    trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1104}
1105
1106/// Retain the peer snapshot hashes with the highest incremental snapshot slot
1107fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1108    peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1109) {
1110    let highest_incremental_snapshot_hash = peer_snapshot_hashes
1111        .iter()
1112        .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1113        .max_by_key(|(slot, _hash)| *slot);
1114
1115    peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1116        peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1117    });
1118
1119    trace!(
1120        "retain peer snapshot hashes with highest incremental snapshot slot: \
1121         {peer_snapshot_hashes:?}"
1122    );
1123}
1124
1125/// Check to see if we can use our local snapshots, otherwise download newer ones.
1126#[allow(clippy::too_many_arguments)]
1127fn download_snapshots(
1128    full_snapshot_archives_dir: &Path,
1129    incremental_snapshot_archives_dir: &Path,
1130    validator_config: &ValidatorConfig,
1131    bootstrap_config: &RpcBootstrapConfig,
1132    use_progress_bar: bool,
1133    maximum_local_snapshot_age: Slot,
1134    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1135    minimal_snapshot_download_speed: f32,
1136    maximum_snapshot_download_abort: u64,
1137    download_abort_count: &mut u64,
1138    snapshot_hash: Option<SnapshotHash>,
1139    rpc_contact_info: &ContactInfo,
1140) -> Result<(), String> {
1141    if snapshot_hash.is_none() {
1142        return Ok(());
1143    }
1144    let SnapshotHash {
1145        full: full_snapshot_hash,
1146        incr: incremental_snapshot_hash,
1147    } = snapshot_hash.unwrap();
1148
1149    // If the local snapshots are new enough, then use 'em; no need to download new snapshots
1150    if should_use_local_snapshot(
1151        full_snapshot_archives_dir,
1152        incremental_snapshot_archives_dir,
1153        maximum_local_snapshot_age,
1154        full_snapshot_hash,
1155        incremental_snapshot_hash,
1156        bootstrap_config.incremental_snapshot_fetch,
1157    ) {
1158        return Ok(());
1159    }
1160
1161    // Check and see if we've already got the full snapshot; if not, download it
1162    if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1163        .into_iter()
1164        .any(|snapshot_archive| {
1165            snapshot_archive.slot() == full_snapshot_hash.0
1166                && snapshot_archive.hash().0 == full_snapshot_hash.1
1167        })
1168    {
1169        info!(
1170            "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1171            full_snapshot_hash.0, full_snapshot_hash.1
1172        );
1173    } else {
1174        download_snapshot(
1175            full_snapshot_archives_dir,
1176            incremental_snapshot_archives_dir,
1177            validator_config,
1178            bootstrap_config,
1179            use_progress_bar,
1180            start_progress,
1181            minimal_snapshot_download_speed,
1182            maximum_snapshot_download_abort,
1183            download_abort_count,
1184            rpc_contact_info,
1185            full_snapshot_hash,
1186            SnapshotKind::FullSnapshot,
1187        )?;
1188    }
1189
1190    if bootstrap_config.incremental_snapshot_fetch {
1191        // Check and see if we've already got the incremental snapshot; if not, download it
1192        if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1193            if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1194                .into_iter()
1195                .any(|snapshot_archive| {
1196                    snapshot_archive.slot() == incremental_snapshot_hash.0
1197                        && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1198                        && snapshot_archive.base_slot() == full_snapshot_hash.0
1199                })
1200            {
1201                info!(
1202                    "Incremental snapshot archive already exists locally. Skipping download. \
1203                     slot: {}, hash: {}",
1204                    incremental_snapshot_hash.0, incremental_snapshot_hash.1
1205                );
1206            } else {
1207                download_snapshot(
1208                    full_snapshot_archives_dir,
1209                    incremental_snapshot_archives_dir,
1210                    validator_config,
1211                    bootstrap_config,
1212                    use_progress_bar,
1213                    start_progress,
1214                    minimal_snapshot_download_speed,
1215                    maximum_snapshot_download_abort,
1216                    download_abort_count,
1217                    rpc_contact_info,
1218                    incremental_snapshot_hash,
1219                    SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1220                )?;
1221            }
1222        }
1223    }
1224
1225    Ok(())
1226}
1227
1228/// Download a snapshot
1229#[allow(clippy::too_many_arguments)]
1230fn download_snapshot(
1231    full_snapshot_archives_dir: &Path,
1232    incremental_snapshot_archives_dir: &Path,
1233    validator_config: &ValidatorConfig,
1234    bootstrap_config: &RpcBootstrapConfig,
1235    use_progress_bar: bool,
1236    start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1237    minimal_snapshot_download_speed: f32,
1238    maximum_snapshot_download_abort: u64,
1239    download_abort_count: &mut u64,
1240    rpc_contact_info: &ContactInfo,
1241    desired_snapshot_hash: (Slot, Hash),
1242    snapshot_kind: SnapshotKind,
1243) -> Result<(), String> {
1244    let maximum_full_snapshot_archives_to_retain = validator_config
1245        .snapshot_config
1246        .maximum_full_snapshot_archives_to_retain;
1247    let maximum_incremental_snapshot_archives_to_retain = validator_config
1248        .snapshot_config
1249        .maximum_incremental_snapshot_archives_to_retain;
1250
1251    *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1252        slot: desired_snapshot_hash.0,
1253        rpc_addr: rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1254    };
1255    let desired_snapshot_hash = (
1256        desired_snapshot_hash.0,
1257        miraland_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1258    );
1259    download_snapshot_archive(
1260        &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1261        full_snapshot_archives_dir,
1262        incremental_snapshot_archives_dir,
1263        desired_snapshot_hash,
1264        snapshot_kind,
1265        maximum_full_snapshot_archives_to_retain,
1266        maximum_incremental_snapshot_archives_to_retain,
1267        use_progress_bar,
1268        &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1269            debug!("Download progress: {download_progress:?}");
1270            if download_progress.last_throughput < minimal_snapshot_download_speed
1271                && download_progress.notification_count <= 1
1272                && download_progress.percentage_done <= 2_f32
1273                && download_progress.estimated_remaining_time > 60_f32
1274                && *download_abort_count < maximum_snapshot_download_abort
1275            {
1276                if let Some(ref known_validators) = validator_config.known_validators {
1277                    if known_validators.contains(rpc_contact_info.pubkey())
1278                        && known_validators.len() == 1
1279                        && bootstrap_config.only_known_rpc
1280                    {
1281                        warn!(
1282                            "The snapshot download is too slow, throughput: {} < min speed {} \
1283                             bytes/sec, but will NOT abort and try a different node as it is the \
1284                             only known validator and the --only-known-rpc flag is set. Abort \
1285                             count: {}, Progress detail: {:?}",
1286                            download_progress.last_throughput,
1287                            minimal_snapshot_download_speed,
1288                            download_abort_count,
1289                            download_progress,
1290                        );
1291                        return true; // Do not abort download from the one-and-only known validator
1292                    }
1293                }
1294                warn!(
1295                    "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1296                     will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1297                    download_progress.last_throughput,
1298                    minimal_snapshot_download_speed,
1299                    download_abort_count,
1300                    download_progress,
1301                );
1302                *download_abort_count += 1;
1303                false
1304            } else {
1305                true
1306            }
1307        })),
1308    )
1309}
1310
1311/// Check to see if bootstrap should load from its local snapshots or not.  If not, then snapshots
1312/// will be downloaded.
1313fn should_use_local_snapshot(
1314    full_snapshot_archives_dir: &Path,
1315    incremental_snapshot_archives_dir: &Path,
1316    maximum_local_snapshot_age: Slot,
1317    full_snapshot_hash: (Slot, Hash),
1318    incremental_snapshot_hash: Option<(Slot, Hash)>,
1319    incremental_snapshot_fetch: bool,
1320) -> bool {
1321    let cluster_snapshot_slot = incremental_snapshot_hash
1322        .map(|(slot, _)| slot)
1323        .unwrap_or(full_snapshot_hash.0);
1324
1325    match get_highest_local_snapshot_hash(
1326        full_snapshot_archives_dir,
1327        incremental_snapshot_archives_dir,
1328        incremental_snapshot_fetch,
1329    ) {
1330        None => {
1331            info!(
1332                "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1333                 local snapshot."
1334            );
1335            false
1336        }
1337        Some((local_snapshot_slot, _)) => {
1338            if local_snapshot_slot
1339                >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1340            {
1341                info!(
1342                    "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1343                     a snapshot for slot {cluster_snapshot_slot}."
1344                );
1345                true
1346            } else {
1347                info!(
1348                    "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1349                     newer snapshot for slot {cluster_snapshot_slot}."
1350                );
1351                false
1352            }
1353        }
1354    }
1355}
1356
1357/// Get the node's highest snapshot hashes from CRDS
1358fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1359    cluster_info.get_snapshot_hashes_for_node(node).map(
1360        |crds_value::SnapshotHashes {
1361             full, incremental, ..
1362         }| {
1363            let highest_incremental_snapshot_hash = incremental.into_iter().max();
1364            SnapshotHash {
1365                full,
1366                incr: highest_incremental_snapshot_hash,
1367            }
1368        },
1369    )
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    use super::*;
1375
1376    impl PeerSnapshotHash {
1377        fn new(
1378            rpc_contact_info: ContactInfo,
1379            full_snapshot_hash: (Slot, Hash),
1380            incremental_snapshot_hash: Option<(Slot, Hash)>,
1381        ) -> Self {
1382            Self {
1383                rpc_contact_info,
1384                snapshot_hash: SnapshotHash {
1385                    full: full_snapshot_hash,
1386                    incr: incremental_snapshot_hash,
1387                },
1388            }
1389        }
1390    }
1391
1392    fn default_contact_info_for_tests() -> ContactInfo {
1393        ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
1394    }
1395
1396    #[test]
1397    fn test_build_known_snapshot_hashes() {
1398        miraland_logger::setup();
1399        let full_snapshot_hash1 = (400_000, Hash::new_unique());
1400        let full_snapshot_hash2 = (400_000, Hash::new_unique());
1401
1402        let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1403        let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1404
1405        // simulate a set of known validators with various snapshot hashes
1406        let oracle = {
1407            let mut oracle = HashMap::new();
1408
1409            for (full, incr) in [
1410                // only a full snapshot
1411                (full_snapshot_hash1, None),
1412                // full and incremental snapshots
1413                (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1414                // full and incremental snapshots, with different incremental hash
1415                (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1416                // ...and now with different full hashes
1417                (full_snapshot_hash2, None),
1418                (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1419                (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1420            ] {
1421                // also simulate multiple known validators having the same snapshot hashes
1422                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1423                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1424                oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1425            }
1426
1427            // no snapshots at all
1428            oracle.insert(Pubkey::new_unique(), None);
1429            oracle.insert(Pubkey::new_unique(), None);
1430            oracle.insert(Pubkey::new_unique(), None);
1431
1432            oracle
1433        };
1434
1435        let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1436
1437        let known_snapshot_hashes =
1438            build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1439
1440        // ensure there's only one full snapshot hash, since they all used the same slot and there
1441        // can be only one snapshot hash per slot
1442        let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1443        assert_eq!(known_full_snapshot_hashes.len(), 1);
1444        let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1445
1446        // and for the same reasons, ensure there is only one incremental snapshot hash
1447        let known_incremental_snapshot_hashes =
1448            known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1449        assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1450        let known_incremental_snapshot_hash =
1451            known_incremental_snapshot_hashes.iter().next().unwrap();
1452
1453        // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
1454        // `oracle.keys()` returns nodes during iteration.  Because of that, we cannot just assert
1455        // the full and incremental snapshot hashes are `full_snapshot_hash1` and
1456        // `incremental_snapshot_hash1`.  Instead, we assert that the full and incremental
1457        // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
1458        // "first" when building the known snapshot hashes.
1459        assert!(
1460            known_full_snapshot_hash == &full_snapshot_hash1
1461                || known_full_snapshot_hash == &full_snapshot_hash2
1462        );
1463        assert!(
1464            known_incremental_snapshot_hash == &incremental_snapshot_hash1
1465                || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1466        );
1467    }
1468
1469    #[test]
1470    fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1471        let known_snapshot_hashes: KnownSnapshotHashes = [
1472            (
1473                (200_000, Hash::new_unique()),
1474                [
1475                    (200_200, Hash::new_unique()),
1476                    (200_400, Hash::new_unique()),
1477                    (200_600, Hash::new_unique()),
1478                    (200_800, Hash::new_unique()),
1479                ]
1480                .iter()
1481                .cloned()
1482                .collect(),
1483            ),
1484            (
1485                (300_000, Hash::new_unique()),
1486                [
1487                    (300_200, Hash::new_unique()),
1488                    (300_400, Hash::new_unique()),
1489                    (300_600, Hash::new_unique()),
1490                ]
1491                .iter()
1492                .cloned()
1493                .collect(),
1494            ),
1495        ]
1496        .iter()
1497        .cloned()
1498        .collect();
1499
1500        let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1501        let known_full_snapshot_hash = known_snapshot_hash.0;
1502        let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1503
1504        let contact_info = default_contact_info_for_tests();
1505        let peer_snapshot_hashes = vec![
1506            // bad full snapshot hash, no incremental snapshot hash
1507            PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1508            // bad everything
1509            PeerSnapshotHash::new(
1510                contact_info.clone(),
1511                (111_000, Hash::default()),
1512                Some((111_111, Hash::default())),
1513            ),
1514            // good full snapshot hash, no incremental snapshot hash
1515            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1516            // bad full snapshot hash, good (not possible) incremental snapshot hash
1517            PeerSnapshotHash::new(
1518                contact_info.clone(),
1519                (111_000, Hash::default()),
1520                Some(*known_incremental_snapshot_hash),
1521            ),
1522            // good full snapshot hash, bad incremental snapshot hash
1523            PeerSnapshotHash::new(
1524                contact_info.clone(),
1525                *known_full_snapshot_hash,
1526                Some((111_111, Hash::default())),
1527            ),
1528            // good everything
1529            PeerSnapshotHash::new(
1530                contact_info.clone(),
1531                *known_full_snapshot_hash,
1532                Some(*known_incremental_snapshot_hash),
1533            ),
1534        ];
1535
1536        let expected = vec![
1537            PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1538            PeerSnapshotHash::new(
1539                contact_info,
1540                *known_full_snapshot_hash,
1541                Some(*known_incremental_snapshot_hash),
1542            ),
1543        ];
1544        let mut actual = peer_snapshot_hashes;
1545        retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1546            &known_snapshot_hashes,
1547            &mut actual,
1548        );
1549        assert_eq!(expected, actual);
1550    }
1551
1552    #[test]
1553    fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1554        let contact_info = default_contact_info_for_tests();
1555        let peer_snapshot_hashes = vec![
1556            // old
1557            PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1558            PeerSnapshotHash::new(
1559                contact_info.clone(),
1560                (100_000, Hash::default()),
1561                Some((100_100, Hash::default())),
1562            ),
1563            PeerSnapshotHash::new(
1564                contact_info.clone(),
1565                (100_000, Hash::default()),
1566                Some((100_200, Hash::default())),
1567            ),
1568            PeerSnapshotHash::new(
1569                contact_info.clone(),
1570                (100_000, Hash::default()),
1571                Some((100_300, Hash::default())),
1572            ),
1573            // new
1574            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1575            PeerSnapshotHash::new(
1576                contact_info.clone(),
1577                (200_000, Hash::default()),
1578                Some((200_100, Hash::default())),
1579            ),
1580            PeerSnapshotHash::new(
1581                contact_info.clone(),
1582                (200_000, Hash::default()),
1583                Some((200_200, Hash::default())),
1584            ),
1585            PeerSnapshotHash::new(
1586                contact_info.clone(),
1587                (200_000, Hash::default()),
1588                Some((200_300, Hash::default())),
1589            ),
1590        ];
1591
1592        let expected = vec![
1593            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1594            PeerSnapshotHash::new(
1595                contact_info.clone(),
1596                (200_000, Hash::default()),
1597                Some((200_100, Hash::default())),
1598            ),
1599            PeerSnapshotHash::new(
1600                contact_info.clone(),
1601                (200_000, Hash::default()),
1602                Some((200_200, Hash::default())),
1603            ),
1604            PeerSnapshotHash::new(
1605                contact_info,
1606                (200_000, Hash::default()),
1607                Some((200_300, Hash::default())),
1608            ),
1609        ];
1610        let mut actual = peer_snapshot_hashes;
1611        retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1612        assert_eq!(expected, actual);
1613    }
1614
1615    #[test]
1616    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1617        let contact_info = default_contact_info_for_tests();
1618        let peer_snapshot_hashes = vec![
1619            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1620            PeerSnapshotHash::new(
1621                contact_info.clone(),
1622                (200_000, Hash::default()),
1623                Some((200_100, Hash::default())),
1624            ),
1625            PeerSnapshotHash::new(
1626                contact_info.clone(),
1627                (200_000, Hash::default()),
1628                Some((200_200, Hash::default())),
1629            ),
1630            PeerSnapshotHash::new(
1631                contact_info.clone(),
1632                (200_000, Hash::default()),
1633                Some((200_300, Hash::default())),
1634            ),
1635            PeerSnapshotHash::new(
1636                contact_info.clone(),
1637                (200_000, Hash::default()),
1638                Some((200_010, Hash::default())),
1639            ),
1640            PeerSnapshotHash::new(
1641                contact_info.clone(),
1642                (200_000, Hash::default()),
1643                Some((200_020, Hash::default())),
1644            ),
1645            PeerSnapshotHash::new(
1646                contact_info.clone(),
1647                (200_000, Hash::default()),
1648                Some((200_030, Hash::default())),
1649            ),
1650        ];
1651
1652        let expected = vec![PeerSnapshotHash::new(
1653            contact_info,
1654            (200_000, Hash::default()),
1655            Some((200_300, Hash::default())),
1656        )];
1657        let mut actual = peer_snapshot_hashes;
1658        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1659        assert_eq!(expected, actual);
1660    }
1661
1662    /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
1663    /// there are *zero* peers with incremental snapshots.
1664    #[test]
1665    fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1666        let contact_info = default_contact_info_for_tests();
1667        let peer_snapshot_hashes = vec![
1668            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1669            PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1670            PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1671        ];
1672
1673        let expected = peer_snapshot_hashes.clone();
1674        let mut actual = peer_snapshot_hashes;
1675        retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1676        assert_eq!(expected, actual);
1677    }
1678
1679    /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
1680    /// peer snapshot hashes input is empty.
1681    #[test]
1682    fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1683        {
1684            let mut actual = vec![];
1685            let expected = actual.clone();
1686            retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1687            assert_eq!(expected, actual);
1688        }
1689        {
1690            let mut actual = vec![];
1691            let expected = actual.clone();
1692            retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1693            assert_eq!(expected, actual);
1694        }
1695    }
1696}