1use {
2 itertools::Itertools,
3 log::*,
4 rand::{seq::SliceRandom, thread_rng, Rng},
5 rayon::prelude::*,
6 solana_clock::Slot,
7 solana_commitment_config::CommitmentConfig,
8 solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
9 solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
10 solana_genesis_utils::download_then_check_genesis_hash,
11 solana_gossip::{
12 cluster_info::{ClusterInfo, Node},
13 contact_info::{ContactInfo, Protocol},
14 crds_data,
15 gossip_service::GossipService,
16 },
17 solana_hash::Hash,
18 solana_keypair::Keypair,
19 solana_metrics::datapoint_info,
20 solana_pubkey::Pubkey,
21 solana_rpc_client::rpc_client::RpcClient,
22 solana_runtime::{
23 snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
24 snapshot_utils,
25 },
26 solana_signer::Signer,
27 solana_streamer::socket::SocketAddrSpace,
28 std::{
29 collections::{hash_map::RandomState, HashMap, HashSet},
30 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
31 path::Path,
32 process::exit,
33 sync::{
34 atomic::{AtomicBool, Ordering},
35 Arc, RwLock,
36 },
37 time::{Duration, Instant},
38 },
39 thiserror::Error,
40};
41
42const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
46const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
49const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
52const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
54
55pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
56
57pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
58
59#[derive(Debug)]
60pub struct RpcBootstrapConfig {
61 pub no_genesis_fetch: bool,
62 pub no_snapshot_fetch: bool,
63 pub only_known_rpc: bool,
64 pub max_genesis_archive_unpacked_size: u64,
65 pub check_vote_account: Option<String>,
66 pub incremental_snapshot_fetch: bool,
67}
68
69fn verify_reachable_ports(
70 node: &Node,
71 cluster_entrypoint: &ContactInfo,
72 validator_config: &ValidatorConfig,
73 socket_addr_space: &SocketAddrSpace,
74) -> bool {
75 let verify_address = |addr: &Option<SocketAddr>| -> bool {
76 addr.as_ref()
77 .map(|addr| socket_addr_space.check(addr))
78 .unwrap_or_default()
79 };
80 let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
81
82 if verify_address(&node.info.serve_repair(Protocol::UDP)) {
83 udp_sockets.push(&node.sockets.serve_repair);
84 }
85 if verify_address(&node.info.tpu(Protocol::UDP)) {
86 udp_sockets.extend(node.sockets.tpu.iter());
87 udp_sockets.extend(&node.sockets.tpu_quic);
88 }
89 if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
90 udp_sockets.extend(node.sockets.tpu_forwards.iter());
91 udp_sockets.extend(&node.sockets.tpu_forwards_quic);
92 }
93 if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
94 udp_sockets.extend(node.sockets.tpu_vote.iter());
95 }
96 if verify_address(&node.info.tvu(Protocol::UDP)) {
97 udp_sockets.extend(node.sockets.tvu.iter());
98 udp_sockets.extend(node.sockets.broadcast.iter());
99 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
100 }
101 if !solana_net_utils::verify_all_reachable_udp(
102 &cluster_entrypoint.gossip().unwrap(),
103 &udp_sockets,
104 ) {
105 return false;
106 }
107
108 let mut tcp_listeners = vec![];
109 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
110 for (purpose, bind_addr, public_addr) in &[
111 ("RPC", rpc_addr, node.info.rpc()),
112 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
113 ] {
114 if verify_address(public_addr) {
115 tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
116 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
117 exit(1);
118 }));
119 }
120 }
121 }
122
123 if let Some(ip_echo) = &node.sockets.ip_echo {
124 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
125 tcp_listeners.push(ip_echo);
126 }
127
128 solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
129}
130
131fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
132 if let Some(known_validators) = known_validators {
133 known_validators.contains(id)
134 } else {
135 false
136 }
137}
138
139fn start_gossip_node(
140 identity_keypair: Arc<Keypair>,
141 cluster_entrypoints: &[ContactInfo],
142 ledger_path: &Path,
143 gossip_addr: &SocketAddr,
144 gossip_socket: UdpSocket,
145 expected_shred_version: u16,
146 gossip_validators: Option<HashSet<Pubkey>>,
147 should_check_duplicate_instance: bool,
148 socket_addr_space: SocketAddrSpace,
149) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
150 let contact_info = ClusterInfo::gossip_contact_info(
151 identity_keypair.pubkey(),
152 *gossip_addr,
153 expected_shred_version,
154 );
155 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
156 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
157 cluster_info.restore_contact_info(ledger_path, 0);
158 let cluster_info = Arc::new(cluster_info);
159
160 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
161 let gossip_service = GossipService::new(
162 &cluster_info,
163 None,
164 gossip_socket,
165 gossip_validators,
166 should_check_duplicate_instance,
167 None,
168 gossip_exit_flag.clone(),
169 );
170 (cluster_info, gossip_exit_flag, gossip_service)
171}
172
173fn get_rpc_peers(
174 cluster_info: &ClusterInfo,
175 validator_config: &ValidatorConfig,
176 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
177 blacklist_timeout: &Instant,
178 retry_reason: &mut Option<String>,
179 bootstrap_config: &RpcBootstrapConfig,
180) -> Vec<ContactInfo> {
181 let shred_version = validator_config
182 .expected_shred_version
183 .unwrap_or_else(|| cluster_info.my_shred_version());
184
185 info!(
186 "Searching for an RPC service with shred version {shred_version}{}...",
187 retry_reason
188 .as_ref()
189 .map(|s| format!(" (Retrying: {s})"))
190 .unwrap_or_default()
191 );
192
193 let mut rpc_peers = cluster_info
194 .all_rpc_peers()
195 .into_iter()
196 .filter(|contact_info| contact_info.shred_version() == shred_version)
197 .collect::<Vec<_>>();
198
199 if bootstrap_config.only_known_rpc {
200 rpc_peers.retain(|rpc_peer| {
201 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
202 });
203 }
204
205 let rpc_peers_total = rpc_peers.len();
206
207 let rpc_peers: Vec<_> = rpc_peers
209 .into_iter()
210 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
211 .collect();
212 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
213 let rpc_known_peers = rpc_peers
214 .iter()
215 .filter(|rpc_peer| {
216 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
217 })
218 .count();
219
220 info!(
221 "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
222 {rpc_peers_blacklisted} blacklisted"
223 );
224
225 if rpc_peers_blacklisted == rpc_peers_total {
226 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
227 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
228 {
229 blacklisted_rpc_nodes.clear();
232 Some("Blacklist timeout expired".to_owned())
233 } else {
234 Some("Wait for known rpc peers".to_owned())
235 };
236 return vec![];
237 }
238 rpc_peers
239}
240
241fn check_vote_account(
242 rpc_client: &RpcClient,
243 identity_pubkey: &Pubkey,
244 vote_account_address: &Pubkey,
245 authorized_voter_pubkeys: &[Pubkey],
246) -> Result<(), String> {
247 let vote_account = rpc_client
248 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
249 .map_err(|err| format!("failed to fetch vote account: {err}"))?
250 .value
251 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
252
253 if vote_account.owner != solana_vote_program::id() {
254 return Err(format!(
255 "not a vote account (owned by {}): {}",
256 vote_account.owner, vote_account_address
257 ));
258 }
259
260 let identity_account = rpc_client
261 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
262 .map_err(|err| format!("failed to fetch identity account: {err}"))?
263 .value
264 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
265
266 let vote_state = solana_vote_program::vote_state::from(&vote_account);
267 if let Some(vote_state) = vote_state {
268 if vote_state.authorized_voters().is_empty() {
269 return Err("Vote account not yet initialized".to_string());
270 }
271
272 if vote_state.node_pubkey != *identity_pubkey {
273 return Err(format!(
274 "vote account's identity ({}) does not match the validator's identity {}).",
275 vote_state.node_pubkey, identity_pubkey
276 ));
277 }
278
279 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
280 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
281 return Err(format!(
282 "authorized voter {vote_account_authorized_voter_pubkey} not available"
283 ));
284 }
285 }
286 } else {
287 return Err(format!(
288 "invalid vote account data for {vote_account_address}"
289 ));
290 }
291
292 if identity_account.lamports <= 1 {
294 return Err(format!(
295 "underfunded identity account ({}): only {} lamports available",
296 identity_pubkey, identity_account.lamports
297 ));
298 }
299
300 Ok(())
301}
302
303#[derive(Error, Debug)]
304pub enum GetRpcNodeError {
305 #[error("Unable to find any RPC peers")]
306 NoRpcPeersFound,
307
308 #[error("Giving up, did not get newer snapshots from the cluster")]
309 NoNewerSnapshots,
310}
311
312#[derive(Debug)]
316struct GetRpcNodeResult {
317 rpc_contact_info: ContactInfo,
318 snapshot_hash: Option<SnapshotHash>,
319}
320
321#[derive(Debug, PartialEq, Eq, Clone)]
323struct PeerSnapshotHash {
324 rpc_contact_info: ContactInfo,
325 snapshot_hash: SnapshotHash,
326}
327
328#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
331pub struct SnapshotHash {
332 full: (Slot, Hash),
333 incr: Option<(Slot, Hash)>,
334}
335
336pub fn fail_rpc_node(
337 err: String,
338 known_validators: &Option<HashSet<Pubkey, RandomState>>,
339 rpc_id: &Pubkey,
340 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
341) {
342 warn!("{err}");
343 if let Some(ref known_validators) = known_validators {
344 if known_validators.contains(rpc_id) {
345 return;
346 }
347 }
348
349 info!("Excluding {rpc_id} as a future RPC candidate");
350 blacklisted_rpc_nodes.insert(*rpc_id);
351}
352
353fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
354 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
355 cluster_info.save_contact_info();
356 gossip_exit_flag.store(true, Ordering::Relaxed);
357 gossip_service.join().unwrap();
358}
359
360#[allow(clippy::too_many_arguments)]
361pub fn attempt_download_genesis_and_snapshot(
362 rpc_contact_info: &ContactInfo,
363 ledger_path: &Path,
364 validator_config: &mut ValidatorConfig,
365 bootstrap_config: &RpcBootstrapConfig,
366 use_progress_bar: bool,
367 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
368 rpc_client: &RpcClient,
369 full_snapshot_archives_dir: &Path,
370 incremental_snapshot_archives_dir: &Path,
371 maximum_local_snapshot_age: Slot,
372 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
373 minimal_snapshot_download_speed: f32,
374 maximum_snapshot_download_abort: u64,
375 download_abort_count: &mut u64,
376 snapshot_hash: Option<SnapshotHash>,
377 identity_keypair: &Arc<Keypair>,
378 vote_account: &Pubkey,
379 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
380) -> Result<(), String> {
381 download_then_check_genesis_hash(
382 &rpc_contact_info
383 .rpc()
384 .ok_or_else(|| String::from("Invalid RPC address"))?,
385 ledger_path,
386 &mut validator_config.expected_genesis_hash,
387 bootstrap_config.max_genesis_archive_unpacked_size,
388 bootstrap_config.no_genesis_fetch,
389 use_progress_bar,
390 rpc_client,
391 )?;
392
393 if let Some(gossip) = gossip.take() {
394 shutdown_gossip_service(gossip);
395 }
396
397 let rpc_client_slot = rpc_client
398 .get_slot_with_commitment(CommitmentConfig::finalized())
399 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
400 info!("RPC node root slot: {rpc_client_slot}");
401
402 download_snapshots(
403 full_snapshot_archives_dir,
404 incremental_snapshot_archives_dir,
405 validator_config,
406 bootstrap_config,
407 use_progress_bar,
408 maximum_local_snapshot_age,
409 start_progress,
410 minimal_snapshot_download_speed,
411 maximum_snapshot_download_abort,
412 download_abort_count,
413 snapshot_hash,
414 rpc_contact_info,
415 )?;
416
417 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
418 let rpc_client = RpcClient::new(url);
419 check_vote_account(
420 &rpc_client,
421 &identity_keypair.pubkey(),
422 vote_account,
423 &authorized_voter_keypairs
424 .read()
425 .unwrap()
426 .iter()
427 .map(|k| k.pubkey())
428 .collect::<Vec<_>>(),
429 )
430 .unwrap_or_else(|err| {
431 error!("{err}");
438 exit(1);
439 });
440 }
441 Ok(())
442}
443
444fn ping(addr: &SocketAddr) -> Option<Duration> {
446 let start = Instant::now();
447 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
448 Ok(_) => Some(start.elapsed()),
449 Err(_) => None,
450 }
451}
452
453fn get_vetted_rpc_nodes(
457 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
458 cluster_info: &Arc<ClusterInfo>,
459 validator_config: &ValidatorConfig,
460 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
461 bootstrap_config: &RpcBootstrapConfig,
462) {
463 while vetted_rpc_nodes.is_empty() {
464 let rpc_node_details = match get_rpc_nodes(
465 cluster_info,
466 validator_config,
467 blacklisted_rpc_nodes,
468 bootstrap_config,
469 ) {
470 Ok(rpc_node_details) => rpc_node_details,
471 Err(err) => {
472 error!(
473 "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
474 `--no-port-check`, or adjusting `--known-validator ...` arguments as \
475 applicable"
476 );
477 exit(1);
478 }
479 };
480
481 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
482 vetted_rpc_nodes.extend(
483 rpc_node_details
484 .into_par_iter()
485 .filter_map(|rpc_node_details| {
486 let GetRpcNodeResult {
487 rpc_contact_info,
488 snapshot_hash,
489 } = rpc_node_details;
490
491 info!(
492 "Using RPC service from node {}: {:?}",
493 rpc_contact_info.pubkey(),
494 rpc_contact_info.rpc()
495 );
496
497 let rpc_addr = rpc_contact_info.rpc()?;
498 let ping_time = ping(&rpc_addr);
499
500 let rpc_client =
501 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
502
503 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
504 })
505 .filter(
506 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
507 .get_version()
508 {
509 Ok(rpc_version) => {
510 if let Some(ping_time) = ping_time {
511 info!(
512 "RPC node version: {} Ping: {}ms",
513 rpc_version.solana_core,
514 ping_time.as_millis()
515 );
516 true
517 } else {
518 fail_rpc_node(
519 "Failed to ping RPC".to_string(),
520 &validator_config.known_validators,
521 rpc_contact_info.pubkey(),
522 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
523 );
524 false
525 }
526 }
527 Err(err) => {
528 fail_rpc_node(
529 format!("Failed to get RPC node version: {err}"),
530 &validator_config.known_validators,
531 rpc_contact_info.pubkey(),
532 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
533 );
534 false
535 }
536 },
537 )
538 .collect::<Vec<(
539 ContactInfo,
540 Option<SnapshotHash>,
541 RpcClient,
542 Option<Duration>,
543 )>>()
544 .into_iter()
545 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
546 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
547 (rpc_contact_info, snapshot_hash, rpc_client)
548 })
549 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
550 );
551 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
552 }
553}
554
555#[allow(clippy::too_many_arguments)]
556pub fn rpc_bootstrap(
557 node: &Node,
558 identity_keypair: &Arc<Keypair>,
559 ledger_path: &Path,
560 full_snapshot_archives_dir: &Path,
561 incremental_snapshot_archives_dir: &Path,
562 vote_account: &Pubkey,
563 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
564 cluster_entrypoints: &[ContactInfo],
565 validator_config: &mut ValidatorConfig,
566 bootstrap_config: RpcBootstrapConfig,
567 do_port_check: bool,
568 use_progress_bar: bool,
569 maximum_local_snapshot_age: Slot,
570 should_check_duplicate_instance: bool,
571 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
572 minimal_snapshot_download_speed: f32,
573 maximum_snapshot_download_abort: u64,
574 socket_addr_space: SocketAddrSpace,
575) {
576 if do_port_check {
577 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
578 order.shuffle(&mut thread_rng());
579 if order.into_iter().all(|i| {
580 !verify_reachable_ports(
581 node,
582 &cluster_entrypoints[i],
583 validator_config,
584 &socket_addr_space,
585 )
586 }) {
587 exit(1);
588 }
589 }
590
591 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
592 return;
593 }
594
595 let total_snapshot_download_time = Instant::now();
596 let mut get_rpc_nodes_time = Duration::new(0, 0);
597 let mut snapshot_download_time = Duration::new(0, 0);
598 let mut blacklisted_rpc_nodes = HashSet::new();
599 let mut gossip = None;
600 let mut vetted_rpc_nodes = vec![];
601 let mut download_abort_count = 0;
602 loop {
603 if gossip.is_none() {
604 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
605
606 gossip = Some(start_gossip_node(
607 identity_keypair.clone(),
608 cluster_entrypoints,
609 ledger_path,
610 &node
611 .info
612 .gossip()
613 .expect("Operator must spin up node with valid gossip address"),
614 node.sockets.gossip.try_clone().unwrap(),
615 validator_config
616 .expected_shred_version
617 .expect("expected_shred_version should not be None"),
618 validator_config.gossip_validators.clone(),
619 should_check_duplicate_instance,
620 socket_addr_space,
621 ));
622 }
623
624 let get_rpc_nodes_start = Instant::now();
625 get_vetted_rpc_nodes(
626 &mut vetted_rpc_nodes,
627 &gossip.as_ref().unwrap().0,
628 validator_config,
629 &mut blacklisted_rpc_nodes,
630 &bootstrap_config,
631 );
632 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
633 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
634
635 let snapshot_download_start = Instant::now();
636 let download_result = attempt_download_genesis_and_snapshot(
637 &rpc_contact_info,
638 ledger_path,
639 validator_config,
640 &bootstrap_config,
641 use_progress_bar,
642 &mut gossip,
643 &rpc_client,
644 full_snapshot_archives_dir,
645 incremental_snapshot_archives_dir,
646 maximum_local_snapshot_age,
647 start_progress,
648 minimal_snapshot_download_speed,
649 maximum_snapshot_download_abort,
650 &mut download_abort_count,
651 snapshot_hash,
652 identity_keypair,
653 vote_account,
654 authorized_voter_keypairs.clone(),
655 );
656 snapshot_download_time += snapshot_download_start.elapsed();
657 match download_result {
658 Ok(()) => break,
659 Err(err) => {
660 fail_rpc_node(
661 err,
662 &validator_config.known_validators,
663 rpc_contact_info.pubkey(),
664 &mut blacklisted_rpc_nodes,
665 );
666 }
667 }
668 }
669
670 if let Some(gossip) = gossip.take() {
671 shutdown_gossip_service(gossip);
672 }
673
674 datapoint_info!(
675 "bootstrap-snapshot-download",
676 (
677 "total_time_secs",
678 total_snapshot_download_time.elapsed().as_secs(),
679 i64
680 ),
681 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
682 (
683 "snapshot_download_time_secs",
684 snapshot_download_time.as_secs(),
685 i64
686 ),
687 ("download_abort_count", download_abort_count, i64),
688 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
689 );
690}
691
692fn get_rpc_nodes(
696 cluster_info: &ClusterInfo,
697 validator_config: &ValidatorConfig,
698 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
699 bootstrap_config: &RpcBootstrapConfig,
700) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
701 let mut blacklist_timeout = Instant::now();
702 let mut get_rpc_peers_timout = Instant::now();
703 let mut newer_cluster_snapshot_timeout = None;
704 let mut retry_reason = None;
705 loop {
706 std::thread::sleep(Duration::from_secs(1));
708 info!("\n{}", cluster_info.rpc_info_trace());
709
710 let rpc_peers = get_rpc_peers(
711 cluster_info,
712 validator_config,
713 blacklisted_rpc_nodes,
714 &blacklist_timeout,
715 &mut retry_reason,
716 bootstrap_config,
717 );
718 if rpc_peers.is_empty() {
719 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
720 return Err(GetRpcNodeError::NoRpcPeersFound);
721 }
722 continue;
723 }
724
725 blacklist_timeout = Instant::now();
727 get_rpc_peers_timout = Instant::now();
728 if bootstrap_config.no_snapshot_fetch {
729 let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
730 return Ok(vec![GetRpcNodeResult {
731 rpc_contact_info: random_peer.clone(),
732 snapshot_hash: None,
733 }]);
734 }
735
736 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
737 .as_ref()
738 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
739 .unwrap_or(true)
740 {
741 KnownValidatorsToWaitFor::All
742 } else {
743 KnownValidatorsToWaitFor::Any
744 };
745 let peer_snapshot_hashes = get_peer_snapshot_hashes(
746 cluster_info,
747 &rpc_peers,
748 validator_config.known_validators.as_ref(),
749 known_validators_to_wait_for,
750 bootstrap_config.incremental_snapshot_fetch,
751 );
752 if peer_snapshot_hashes.is_empty() {
753 match newer_cluster_snapshot_timeout {
754 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
755 Some(newer_cluster_snapshot_timeout) => {
756 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
757 return Err(GetRpcNodeError::NoNewerSnapshots);
758 }
759 }
760 }
761 retry_reason = Some("No snapshots available".to_owned());
762 continue;
763 } else {
764 let rpc_peers = peer_snapshot_hashes
765 .iter()
766 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
767 .collect::<Vec<_>>();
768 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
769 info!(
770 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
771 final_snapshot_hash
772 .incr
773 .map(|(slot, _hash)| slot)
774 .unwrap_or(final_snapshot_hash.full.0),
775 rpc_peers.len(),
776 if rpc_peers.len() > 1 { "s" } else { "" },
777 rpc_peers,
778 );
779 let rpc_node_results = peer_snapshot_hashes
780 .iter()
781 .map(|peer_snapshot_hash| GetRpcNodeResult {
782 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
783 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
784 })
785 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
786 .collect();
787 return Ok(rpc_node_results);
788 }
789 }
790}
791
792fn get_highest_local_snapshot_hash(
795 full_snapshot_archives_dir: impl AsRef<Path>,
796 incremental_snapshot_archives_dir: impl AsRef<Path>,
797 incremental_snapshot_fetch: bool,
798) -> Option<(Slot, Hash)> {
799 snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
800 .and_then(|full_snapshot_info| {
801 if incremental_snapshot_fetch {
802 snapshot_utils::get_highest_incremental_snapshot_archive_info(
803 incremental_snapshot_archives_dir,
804 full_snapshot_info.slot(),
805 )
806 .map(|incremental_snapshot_info| {
807 (
808 incremental_snapshot_info.slot(),
809 *incremental_snapshot_info.hash(),
810 )
811 })
812 } else {
813 None
814 }
815 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
816 })
817 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
818}
819
820fn get_peer_snapshot_hashes(
827 cluster_info: &ClusterInfo,
828 rpc_peers: &[ContactInfo],
829 known_validators: Option<&HashSet<Pubkey>>,
830 known_validators_to_wait_for: KnownValidatorsToWaitFor,
831 incremental_snapshot_fetch: bool,
832) -> Vec<PeerSnapshotHash> {
833 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
834 if let Some(known_validators) = known_validators {
835 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
836 cluster_info,
837 known_validators,
838 known_validators_to_wait_for,
839 );
840 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
841 &known_snapshot_hashes,
842 &mut peer_snapshot_hashes,
843 );
844 }
845 if incremental_snapshot_fetch {
846 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
854 &mut peer_snapshot_hashes,
855 );
856 }
857 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
858
859 peer_snapshot_hashes
860}
861
862type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
865
866fn get_snapshot_hashes_from_known_validators(
877 cluster_info: &ClusterInfo,
878 known_validators: &HashSet<Pubkey>,
879 known_validators_to_wait_for: KnownValidatorsToWaitFor,
880) -> KnownSnapshotHashes {
881 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
883
884 if !do_known_validators_have_all_snapshot_hashes(
885 known_validators,
886 known_validators_to_wait_for,
887 get_snapshot_hashes_for_node,
888 ) {
889 debug!(
890 "Snapshot hashes have not been discovered from known validators. This likely means \
891 the gossip tables are not fully populated. We will sleep and retry..."
892 );
893 return KnownSnapshotHashes::default();
894 }
895
896 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
897}
898
899fn do_known_validators_have_all_snapshot_hashes<'a>(
908 known_validators: impl IntoIterator<Item = &'a Pubkey>,
909 known_validators_to_wait_for: KnownValidatorsToWaitFor,
910 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
911) -> bool {
912 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
913
914 match known_validators_to_wait_for {
915 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
916 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
917 }
918}
919
920#[derive(Debug, Copy, Clone, Eq, PartialEq)]
923enum KnownValidatorsToWaitFor {
924 All,
925 Any,
926}
927
928fn build_known_snapshot_hashes<'a>(
934 nodes: impl IntoIterator<Item = &'a Pubkey>,
935 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
936) -> KnownSnapshotHashes {
937 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
938
939 fn is_any_same_slot_and_different_hash<'a>(
942 needle: &(Slot, Hash),
943 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
944 ) -> bool {
945 haystack
946 .into_iter()
947 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
948 }
949
950 'to_next_node: for node in nodes {
951 let Some(SnapshotHash {
952 full: full_snapshot_hash,
953 incr: incremental_snapshot_hash,
954 }) = get_snapshot_hashes_for_node(node)
955 else {
956 continue 'to_next_node;
957 };
958
959 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
964 warn!(
965 "Ignoring all snapshot hashes from node {node} since we've seen a different full \
966 snapshot hash with this slot.\
967 \nfull snapshot hash: {full_snapshot_hash:?}"
968 );
969 debug!(
970 "known full snapshot hashes: {:#?}",
971 known_snapshot_hashes.keys(),
972 );
973 continue 'to_next_node;
974 }
975
976 let known_incremental_snapshot_hashes =
980 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
981
982 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
983 if is_any_same_slot_and_different_hash(
988 &incremental_snapshot_hash,
989 known_incremental_snapshot_hashes.iter(),
990 ) {
991 warn!(
992 "Ignoring incremental snapshot hash from node {node} since we've seen a \
993 different incremental snapshot hash with this slot.\
994 \nfull snapshot hash: {full_snapshot_hash:?}\
995 \nincremental snapshot hash: {incremental_snapshot_hash:?}"
996 );
997 debug!(
998 "known incremental snapshot hashes based on this slot: {:#?}",
999 known_incremental_snapshot_hashes.iter(),
1000 );
1001 continue 'to_next_node;
1002 }
1003
1004 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1005 };
1006 }
1007
1008 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1009 known_snapshot_hashes
1010}
1011
1012fn get_eligible_peer_snapshot_hashes(
1018 cluster_info: &ClusterInfo,
1019 rpc_peers: &[ContactInfo],
1020) -> Vec<PeerSnapshotHash> {
1021 let peer_snapshot_hashes = rpc_peers
1022 .iter()
1023 .flat_map(|rpc_peer| {
1024 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1025 PeerSnapshotHash {
1026 rpc_contact_info: rpc_peer.clone(),
1027 snapshot_hash,
1028 }
1029 })
1030 })
1031 .collect();
1032
1033 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1034 peer_snapshot_hashes
1035}
1036
1037fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1039 known_snapshot_hashes: &KnownSnapshotHashes,
1040 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1041) {
1042 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1043 known_snapshot_hashes
1044 .get(&peer_snapshot_hash.snapshot_hash.full)
1045 .map(|known_incremental_hashes| {
1046 if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1047 true
1050 } else {
1051 known_incremental_hashes
1052 .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1053 }
1054 })
1055 .unwrap_or(false)
1056 });
1057
1058 trace!(
1059 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1060 );
1061}
1062
1063fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1065 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1066) {
1067 let highest_full_snapshot_hash = peer_snapshot_hashes
1068 .iter()
1069 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1070 .max_by_key(|(slot, _hash)| *slot);
1071 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1072 return;
1076 };
1077
1078 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1079 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1080 });
1081
1082 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1083}
1084
1085fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1087 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1088) {
1089 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1090 .iter()
1091 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1092 .max_by_key(|(slot, _hash)| *slot);
1093
1094 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1095 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1096 });
1097
1098 trace!(
1099 "retain peer snapshot hashes with highest incremental snapshot slot: \
1100 {peer_snapshot_hashes:?}"
1101 );
1102}
1103
1104#[allow(clippy::too_many_arguments)]
1106fn download_snapshots(
1107 full_snapshot_archives_dir: &Path,
1108 incremental_snapshot_archives_dir: &Path,
1109 validator_config: &ValidatorConfig,
1110 bootstrap_config: &RpcBootstrapConfig,
1111 use_progress_bar: bool,
1112 maximum_local_snapshot_age: Slot,
1113 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1114 minimal_snapshot_download_speed: f32,
1115 maximum_snapshot_download_abort: u64,
1116 download_abort_count: &mut u64,
1117 snapshot_hash: Option<SnapshotHash>,
1118 rpc_contact_info: &ContactInfo,
1119) -> Result<(), String> {
1120 if snapshot_hash.is_none() {
1121 return Ok(());
1122 }
1123 let SnapshotHash {
1124 full: full_snapshot_hash,
1125 incr: incremental_snapshot_hash,
1126 } = snapshot_hash.unwrap();
1127
1128 if should_use_local_snapshot(
1130 full_snapshot_archives_dir,
1131 incremental_snapshot_archives_dir,
1132 maximum_local_snapshot_age,
1133 full_snapshot_hash,
1134 incremental_snapshot_hash,
1135 bootstrap_config.incremental_snapshot_fetch,
1136 ) {
1137 return Ok(());
1138 }
1139
1140 if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1142 .into_iter()
1143 .any(|snapshot_archive| {
1144 snapshot_archive.slot() == full_snapshot_hash.0
1145 && snapshot_archive.hash().0 == full_snapshot_hash.1
1146 })
1147 {
1148 info!(
1149 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1150 full_snapshot_hash.0, full_snapshot_hash.1
1151 );
1152 } else {
1153 download_snapshot(
1154 full_snapshot_archives_dir,
1155 incremental_snapshot_archives_dir,
1156 validator_config,
1157 bootstrap_config,
1158 use_progress_bar,
1159 start_progress,
1160 minimal_snapshot_download_speed,
1161 maximum_snapshot_download_abort,
1162 download_abort_count,
1163 rpc_contact_info,
1164 full_snapshot_hash,
1165 SnapshotKind::FullSnapshot,
1166 )?;
1167 }
1168
1169 if bootstrap_config.incremental_snapshot_fetch {
1170 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1172 if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1173 .into_iter()
1174 .any(|snapshot_archive| {
1175 snapshot_archive.slot() == incremental_snapshot_hash.0
1176 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1177 && snapshot_archive.base_slot() == full_snapshot_hash.0
1178 })
1179 {
1180 info!(
1181 "Incremental snapshot archive already exists locally. Skipping download. \
1182 slot: {}, hash: {}",
1183 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1184 );
1185 } else {
1186 download_snapshot(
1187 full_snapshot_archives_dir,
1188 incremental_snapshot_archives_dir,
1189 validator_config,
1190 bootstrap_config,
1191 use_progress_bar,
1192 start_progress,
1193 minimal_snapshot_download_speed,
1194 maximum_snapshot_download_abort,
1195 download_abort_count,
1196 rpc_contact_info,
1197 incremental_snapshot_hash,
1198 SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1199 )?;
1200 }
1201 }
1202 }
1203
1204 Ok(())
1205}
1206
1207#[allow(clippy::too_many_arguments)]
1209fn download_snapshot(
1210 full_snapshot_archives_dir: &Path,
1211 incremental_snapshot_archives_dir: &Path,
1212 validator_config: &ValidatorConfig,
1213 bootstrap_config: &RpcBootstrapConfig,
1214 use_progress_bar: bool,
1215 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1216 minimal_snapshot_download_speed: f32,
1217 maximum_snapshot_download_abort: u64,
1218 download_abort_count: &mut u64,
1219 rpc_contact_info: &ContactInfo,
1220 desired_snapshot_hash: (Slot, Hash),
1221 snapshot_kind: SnapshotKind,
1222) -> Result<(), String> {
1223 let maximum_full_snapshot_archives_to_retain = validator_config
1224 .snapshot_config
1225 .maximum_full_snapshot_archives_to_retain;
1226 let maximum_incremental_snapshot_archives_to_retain = validator_config
1227 .snapshot_config
1228 .maximum_incremental_snapshot_archives_to_retain;
1229
1230 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1231 slot: desired_snapshot_hash.0,
1232 rpc_addr: rpc_contact_info
1233 .rpc()
1234 .ok_or_else(|| String::from("Invalid RPC address"))?,
1235 };
1236 let desired_snapshot_hash = (
1237 desired_snapshot_hash.0,
1238 solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1239 );
1240 download_snapshot_archive(
1241 &rpc_contact_info
1242 .rpc()
1243 .ok_or_else(|| String::from("Invalid RPC address"))?,
1244 full_snapshot_archives_dir,
1245 incremental_snapshot_archives_dir,
1246 desired_snapshot_hash,
1247 snapshot_kind,
1248 maximum_full_snapshot_archives_to_retain,
1249 maximum_incremental_snapshot_archives_to_retain,
1250 use_progress_bar,
1251 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1252 debug!("Download progress: {download_progress:?}");
1253 if download_progress.last_throughput < minimal_snapshot_download_speed
1254 && download_progress.notification_count <= 1
1255 && download_progress.percentage_done <= 2_f32
1256 && download_progress.estimated_remaining_time > 60_f32
1257 && *download_abort_count < maximum_snapshot_download_abort
1258 {
1259 if let Some(ref known_validators) = validator_config.known_validators {
1260 if known_validators.contains(rpc_contact_info.pubkey())
1261 && known_validators.len() == 1
1262 && bootstrap_config.only_known_rpc
1263 {
1264 warn!(
1265 "The snapshot download is too slow, throughput: {} < min speed {} \
1266 bytes/sec, but will NOT abort and try a different node as it is the \
1267 only known validator and the --only-known-rpc flag is set. Abort \
1268 count: {}, Progress detail: {:?}",
1269 download_progress.last_throughput,
1270 minimal_snapshot_download_speed,
1271 download_abort_count,
1272 download_progress,
1273 );
1274 return true; }
1276 }
1277 warn!(
1278 "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1279 will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1280 download_progress.last_throughput,
1281 minimal_snapshot_download_speed,
1282 download_abort_count,
1283 download_progress,
1284 );
1285 *download_abort_count += 1;
1286 false
1287 } else {
1288 true
1289 }
1290 })),
1291 )
1292}
1293
1294fn should_use_local_snapshot(
1297 full_snapshot_archives_dir: &Path,
1298 incremental_snapshot_archives_dir: &Path,
1299 maximum_local_snapshot_age: Slot,
1300 full_snapshot_hash: (Slot, Hash),
1301 incremental_snapshot_hash: Option<(Slot, Hash)>,
1302 incremental_snapshot_fetch: bool,
1303) -> bool {
1304 let cluster_snapshot_slot = incremental_snapshot_hash
1305 .map(|(slot, _)| slot)
1306 .unwrap_or(full_snapshot_hash.0);
1307
1308 match get_highest_local_snapshot_hash(
1309 full_snapshot_archives_dir,
1310 incremental_snapshot_archives_dir,
1311 incremental_snapshot_fetch,
1312 ) {
1313 None => {
1314 info!(
1315 "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1316 local snapshot."
1317 );
1318 false
1319 }
1320 Some((local_snapshot_slot, _)) => {
1321 if local_snapshot_slot
1322 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1323 {
1324 info!(
1325 "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1326 a snapshot for slot {cluster_snapshot_slot}."
1327 );
1328 true
1329 } else {
1330 info!(
1331 "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1332 newer snapshot for slot {cluster_snapshot_slot}."
1333 );
1334 false
1335 }
1336 }
1337 }
1338}
1339
1340fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1342 cluster_info.get_snapshot_hashes_for_node(node).map(
1343 |crds_data::SnapshotHashes {
1344 full, incremental, ..
1345 }| {
1346 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1347 SnapshotHash {
1348 full,
1349 incr: highest_incremental_snapshot_hash,
1350 }
1351 },
1352 )
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357 use super::*;
1358
1359 impl PeerSnapshotHash {
1360 fn new(
1361 rpc_contact_info: ContactInfo,
1362 full_snapshot_hash: (Slot, Hash),
1363 incremental_snapshot_hash: Option<(Slot, Hash)>,
1364 ) -> Self {
1365 Self {
1366 rpc_contact_info,
1367 snapshot_hash: SnapshotHash {
1368 full: full_snapshot_hash,
1369 incr: incremental_snapshot_hash,
1370 },
1371 }
1372 }
1373 }
1374
1375 fn default_contact_info_for_tests() -> ContactInfo {
1376 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1377 }
1378
1379 #[test]
1380 fn test_build_known_snapshot_hashes() {
1381 solana_logger::setup();
1382 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1383 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1384
1385 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1386 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1387
1388 let oracle = {
1390 let mut oracle = HashMap::new();
1391
1392 for (full, incr) in [
1393 (full_snapshot_hash1, None),
1395 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1397 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1399 (full_snapshot_hash2, None),
1401 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1402 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1403 ] {
1404 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1406 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1407 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1408 }
1409
1410 oracle.insert(Pubkey::new_unique(), None);
1412 oracle.insert(Pubkey::new_unique(), None);
1413 oracle.insert(Pubkey::new_unique(), None);
1414
1415 oracle
1416 };
1417
1418 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1419
1420 let known_snapshot_hashes =
1421 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1422
1423 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1426 assert_eq!(known_full_snapshot_hashes.len(), 1);
1427 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1428
1429 let known_incremental_snapshot_hashes =
1431 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1432 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1433 let known_incremental_snapshot_hash =
1434 known_incremental_snapshot_hashes.iter().next().unwrap();
1435
1436 assert!(
1443 known_full_snapshot_hash == &full_snapshot_hash1
1444 || known_full_snapshot_hash == &full_snapshot_hash2
1445 );
1446 assert!(
1447 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1448 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1449 );
1450 }
1451
1452 #[test]
1453 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1454 let known_snapshot_hashes: KnownSnapshotHashes = [
1455 (
1456 (200_000, Hash::new_unique()),
1457 [
1458 (200_200, Hash::new_unique()),
1459 (200_400, Hash::new_unique()),
1460 (200_600, Hash::new_unique()),
1461 (200_800, Hash::new_unique()),
1462 ]
1463 .iter()
1464 .cloned()
1465 .collect(),
1466 ),
1467 (
1468 (300_000, Hash::new_unique()),
1469 [
1470 (300_200, Hash::new_unique()),
1471 (300_400, Hash::new_unique()),
1472 (300_600, Hash::new_unique()),
1473 ]
1474 .iter()
1475 .cloned()
1476 .collect(),
1477 ),
1478 ]
1479 .iter()
1480 .cloned()
1481 .collect();
1482
1483 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1484 let known_full_snapshot_hash = known_snapshot_hash.0;
1485 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1486
1487 let contact_info = default_contact_info_for_tests();
1488 let peer_snapshot_hashes = vec![
1489 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1491 PeerSnapshotHash::new(
1493 contact_info.clone(),
1494 (111_000, Hash::default()),
1495 Some((111_111, Hash::default())),
1496 ),
1497 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1499 PeerSnapshotHash::new(
1501 contact_info.clone(),
1502 (111_000, Hash::default()),
1503 Some(*known_incremental_snapshot_hash),
1504 ),
1505 PeerSnapshotHash::new(
1507 contact_info.clone(),
1508 *known_full_snapshot_hash,
1509 Some((111_111, Hash::default())),
1510 ),
1511 PeerSnapshotHash::new(
1513 contact_info.clone(),
1514 *known_full_snapshot_hash,
1515 Some(*known_incremental_snapshot_hash),
1516 ),
1517 ];
1518
1519 let expected = vec![
1520 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1521 PeerSnapshotHash::new(
1522 contact_info,
1523 *known_full_snapshot_hash,
1524 Some(*known_incremental_snapshot_hash),
1525 ),
1526 ];
1527 let mut actual = peer_snapshot_hashes;
1528 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1529 &known_snapshot_hashes,
1530 &mut actual,
1531 );
1532 assert_eq!(expected, actual);
1533 }
1534
1535 #[test]
1536 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1537 let contact_info = default_contact_info_for_tests();
1538 let peer_snapshot_hashes = vec![
1539 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1541 PeerSnapshotHash::new(
1542 contact_info.clone(),
1543 (100_000, Hash::default()),
1544 Some((100_100, Hash::default())),
1545 ),
1546 PeerSnapshotHash::new(
1547 contact_info.clone(),
1548 (100_000, Hash::default()),
1549 Some((100_200, Hash::default())),
1550 ),
1551 PeerSnapshotHash::new(
1552 contact_info.clone(),
1553 (100_000, Hash::default()),
1554 Some((100_300, Hash::default())),
1555 ),
1556 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1558 PeerSnapshotHash::new(
1559 contact_info.clone(),
1560 (200_000, Hash::default()),
1561 Some((200_100, Hash::default())),
1562 ),
1563 PeerSnapshotHash::new(
1564 contact_info.clone(),
1565 (200_000, Hash::default()),
1566 Some((200_200, Hash::default())),
1567 ),
1568 PeerSnapshotHash::new(
1569 contact_info.clone(),
1570 (200_000, Hash::default()),
1571 Some((200_300, Hash::default())),
1572 ),
1573 ];
1574
1575 let expected = vec![
1576 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1577 PeerSnapshotHash::new(
1578 contact_info.clone(),
1579 (200_000, Hash::default()),
1580 Some((200_100, Hash::default())),
1581 ),
1582 PeerSnapshotHash::new(
1583 contact_info.clone(),
1584 (200_000, Hash::default()),
1585 Some((200_200, Hash::default())),
1586 ),
1587 PeerSnapshotHash::new(
1588 contact_info,
1589 (200_000, Hash::default()),
1590 Some((200_300, Hash::default())),
1591 ),
1592 ];
1593 let mut actual = peer_snapshot_hashes;
1594 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1595 assert_eq!(expected, actual);
1596 }
1597
1598 #[test]
1599 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1600 let contact_info = default_contact_info_for_tests();
1601 let peer_snapshot_hashes = vec![
1602 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1603 PeerSnapshotHash::new(
1604 contact_info.clone(),
1605 (200_000, Hash::default()),
1606 Some((200_100, Hash::default())),
1607 ),
1608 PeerSnapshotHash::new(
1609 contact_info.clone(),
1610 (200_000, Hash::default()),
1611 Some((200_200, Hash::default())),
1612 ),
1613 PeerSnapshotHash::new(
1614 contact_info.clone(),
1615 (200_000, Hash::default()),
1616 Some((200_300, Hash::default())),
1617 ),
1618 PeerSnapshotHash::new(
1619 contact_info.clone(),
1620 (200_000, Hash::default()),
1621 Some((200_010, Hash::default())),
1622 ),
1623 PeerSnapshotHash::new(
1624 contact_info.clone(),
1625 (200_000, Hash::default()),
1626 Some((200_020, Hash::default())),
1627 ),
1628 PeerSnapshotHash::new(
1629 contact_info.clone(),
1630 (200_000, Hash::default()),
1631 Some((200_030, Hash::default())),
1632 ),
1633 ];
1634
1635 let expected = vec![PeerSnapshotHash::new(
1636 contact_info,
1637 (200_000, Hash::default()),
1638 Some((200_300, Hash::default())),
1639 )];
1640 let mut actual = peer_snapshot_hashes;
1641 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1642 assert_eq!(expected, actual);
1643 }
1644
1645 #[test]
1648 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1649 let contact_info = default_contact_info_for_tests();
1650 let peer_snapshot_hashes = vec![
1651 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1652 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1653 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1654 ];
1655
1656 let expected = peer_snapshot_hashes.clone();
1657 let mut actual = peer_snapshot_hashes;
1658 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1659 assert_eq!(expected, actual);
1660 }
1661
1662 #[test]
1665 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1666 {
1667 let mut actual = vec![];
1668 let expected = actual.clone();
1669 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1670 assert_eq!(expected, actual);
1671 }
1672 {
1673 let mut actual = vec![];
1674 let expected = actual.clone();
1675 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1676 assert_eq!(expected, actual);
1677 }
1678 }
1679}