1use {
2 itertools::Itertools,
3 log::*,
4 rand::{seq::SliceRandom, thread_rng, Rng},
5 rayon::prelude::*,
6 solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
7 solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
8 solana_genesis_utils::download_then_check_genesis_hash,
9 solana_gossip::{
10 cluster_info::{ClusterInfo, Node},
11 contact_info::Protocol,
12 crds_value,
13 gossip_service::GossipService,
14 legacy_contact_info::LegacyContactInfo as ContactInfo,
15 },
16 solana_metrics::datapoint_info,
17 solana_rpc_client::rpc_client::RpcClient,
18 solana_runtime::{
19 snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
20 snapshot_utils,
21 },
22 solana_sdk::{
23 clock::Slot,
24 commitment_config::CommitmentConfig,
25 hash::Hash,
26 pubkey::Pubkey,
27 signature::{Keypair, Signer},
28 },
29 solana_streamer::socket::SocketAddrSpace,
30 std::{
31 collections::{hash_map::RandomState, HashMap, HashSet},
32 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
33 path::Path,
34 process::exit,
35 sync::{
36 atomic::{AtomicBool, Ordering},
37 Arc, RwLock,
38 },
39 time::{Duration, Instant},
40 },
41 thiserror::Error,
42};
43
44const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
48const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
51const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
54const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
56
57pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
58
59pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
60
61#[derive(Debug)]
62pub struct RpcBootstrapConfig {
63 pub no_genesis_fetch: bool,
64 pub no_snapshot_fetch: bool,
65 pub only_known_rpc: bool,
66 pub max_genesis_archive_unpacked_size: u64,
67 pub check_vote_account: Option<String>,
68 pub incremental_snapshot_fetch: bool,
69}
70
71fn verify_reachable_ports(
72 node: &Node,
73 cluster_entrypoint: &ContactInfo,
74 validator_config: &ValidatorConfig,
75 socket_addr_space: &SocketAddrSpace,
76) -> bool {
77 let verify_address = |addr: &Option<SocketAddr>| -> bool {
78 addr.as_ref()
79 .map(|addr| socket_addr_space.check(addr))
80 .unwrap_or_default()
81 };
82 let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
83
84 if verify_address(&node.info.serve_repair(Protocol::UDP).ok()) {
85 udp_sockets.push(&node.sockets.serve_repair);
86 }
87 if verify_address(&node.info.tpu(Protocol::UDP).ok()) {
88 udp_sockets.extend(node.sockets.tpu.iter());
89 udp_sockets.push(&node.sockets.tpu_quic);
90 }
91 if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) {
92 udp_sockets.extend(node.sockets.tpu_forwards.iter());
93 udp_sockets.push(&node.sockets.tpu_forwards_quic);
94 }
95 if verify_address(&node.info.tpu_vote().ok()) {
96 udp_sockets.extend(node.sockets.tpu_vote.iter());
97 }
98 if verify_address(&node.info.tvu(Protocol::UDP).ok()) {
99 udp_sockets.extend(node.sockets.tvu.iter());
100 udp_sockets.extend(node.sockets.broadcast.iter());
101 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
102 }
103
104 let mut tcp_listeners = vec![];
105 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
106 for (purpose, bind_addr, public_addr) in &[
107 ("RPC", rpc_addr, node.info.rpc()),
108 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
109 ] {
110 if verify_address(&public_addr.as_ref().ok().copied()) {
111 tcp_listeners.push((
112 bind_addr.port(),
113 TcpListener::bind(bind_addr).unwrap_or_else(|err| {
114 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
115 exit(1);
116 }),
117 ));
118 }
119 }
120 }
121
122 if let Some(ip_echo) = &node.sockets.ip_echo {
123 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
124 tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo));
125 }
126
127 solana_net_utils::verify_reachable_ports(
128 &cluster_entrypoint.gossip().unwrap(),
129 tcp_listeners,
130 &udp_sockets,
131 )
132}
133
134fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
135 if let Some(known_validators) = known_validators {
136 known_validators.contains(id)
137 } else {
138 false
139 }
140}
141
142fn start_gossip_node(
143 identity_keypair: Arc<Keypair>,
144 cluster_entrypoints: &[ContactInfo],
145 ledger_path: &Path,
146 gossip_addr: &SocketAddr,
147 gossip_socket: UdpSocket,
148 expected_shred_version: Option<u16>,
149 gossip_validators: Option<HashSet<Pubkey>>,
150 should_check_duplicate_instance: bool,
151 socket_addr_space: SocketAddrSpace,
152) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
153 let contact_info = ClusterInfo::gossip_contact_info(
154 identity_keypair.pubkey(),
155 *gossip_addr,
156 expected_shred_version.unwrap_or(0),
157 );
158 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
159 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
160 cluster_info.restore_contact_info(ledger_path, 0);
161 let cluster_info = Arc::new(cluster_info);
162
163 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
164 let gossip_service = GossipService::new(
165 &cluster_info,
166 None,
167 gossip_socket,
168 gossip_validators,
169 should_check_duplicate_instance,
170 None,
171 gossip_exit_flag.clone(),
172 );
173 (cluster_info, gossip_exit_flag, gossip_service)
174}
175
176fn get_rpc_peers(
177 cluster_info: &ClusterInfo,
178 cluster_entrypoints: &[ContactInfo],
179 validator_config: &ValidatorConfig,
180 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
181 blacklist_timeout: &Instant,
182 retry_reason: &mut Option<String>,
183 bootstrap_config: &RpcBootstrapConfig,
184) -> Vec<ContactInfo> {
185 let shred_version = validator_config
186 .expected_shred_version
187 .unwrap_or_else(|| cluster_info.my_shred_version());
188 if shred_version == 0 {
189 let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
190 cluster_entrypoint
191 .gossip()
192 .ok()
193 .and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr))
194 .map_or(false, |entrypoint| entrypoint.shred_version() == 0)
195 });
196
197 if all_zero_shred_versions {
198 eprintln!("Entrypoint shred version is zero. Restart with --expected-shred-version");
199 exit(1);
200 }
201 info!("Waiting to adopt entrypoint shred version...");
202 return vec![];
203 }
204
205 info!(
206 "Searching for an RPC service with shred version {shred_version}{}...",
207 retry_reason
208 .as_ref()
209 .map(|s| format!(" (Retrying: {s})"))
210 .unwrap_or_default()
211 );
212
213 let mut rpc_peers = cluster_info
214 .all_rpc_peers()
215 .into_iter()
216 .filter(|contact_info| contact_info.shred_version() == shred_version)
217 .collect::<Vec<_>>();
218
219 if bootstrap_config.only_known_rpc {
220 rpc_peers.retain(|rpc_peer| {
221 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
222 });
223 }
224
225 let rpc_peers_total = rpc_peers.len();
226
227 let rpc_peers: Vec<_> = rpc_peers
229 .into_iter()
230 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
231 .collect();
232 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
233 let rpc_known_peers = rpc_peers
234 .iter()
235 .filter(|rpc_peer| {
236 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
237 })
238 .count();
239
240 info!("Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, {rpc_peers_blacklisted} blacklisted");
241
242 if rpc_peers_blacklisted == rpc_peers_total {
243 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
244 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
245 {
246 blacklisted_rpc_nodes.clear();
249 Some("Blacklist timeout expired".to_owned())
250 } else {
251 Some("Wait for known rpc peers".to_owned())
252 };
253 return vec![];
254 }
255 rpc_peers
256}
257
258fn check_vote_account(
259 rpc_client: &RpcClient,
260 identity_pubkey: &Pubkey,
261 vote_account_address: &Pubkey,
262 authorized_voter_pubkeys: &[Pubkey],
263) -> Result<(), String> {
264 let vote_account = rpc_client
265 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
266 .map_err(|err| format!("failed to fetch vote account: {err}"))?
267 .value
268 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
269
270 if vote_account.owner != solana_vote_program::id() {
271 return Err(format!(
272 "not a vote account (owned by {}): {}",
273 vote_account.owner, vote_account_address
274 ));
275 }
276
277 let identity_account = rpc_client
278 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
279 .map_err(|err| format!("failed to fetch identity account: {err}"))?
280 .value
281 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
282
283 let vote_state = solana_vote_program::vote_state::from(&vote_account);
284 if let Some(vote_state) = vote_state {
285 if vote_state.authorized_voters().is_empty() {
286 return Err("Vote account not yet initialized".to_string());
287 }
288
289 if vote_state.node_pubkey != *identity_pubkey {
290 return Err(format!(
291 "vote account's identity ({}) does not match the validator's identity {}).",
292 vote_state.node_pubkey, identity_pubkey
293 ));
294 }
295
296 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
297 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
298 return Err(format!(
299 "authorized voter {vote_account_authorized_voter_pubkey} not available"
300 ));
301 }
302 }
303 } else {
304 return Err(format!(
305 "invalid vote account data for {vote_account_address}"
306 ));
307 }
308
309 if identity_account.lamports <= 1 {
311 return Err(format!(
312 "underfunded identity account ({}): only {} lamports available",
313 identity_pubkey, identity_account.lamports
314 ));
315 }
316
317 Ok(())
318}
319
320#[derive(Error, Debug)]
321pub enum GetRpcNodeError {
322 #[error("Unable to find any RPC peers")]
323 NoRpcPeersFound,
324
325 #[error("Giving up, did not get newer snapshots from the cluster")]
326 NoNewerSnapshots,
327}
328
329#[derive(Debug)]
333struct GetRpcNodeResult {
334 rpc_contact_info: ContactInfo,
335 snapshot_hash: Option<SnapshotHash>,
336}
337
338#[derive(Debug, PartialEq, Eq, Clone)]
340struct PeerSnapshotHash {
341 rpc_contact_info: ContactInfo,
342 snapshot_hash: SnapshotHash,
343}
344
345#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
348pub struct SnapshotHash {
349 full: (Slot, Hash),
350 incr: Option<(Slot, Hash)>,
351}
352
353pub fn fail_rpc_node(
354 err: String,
355 known_validators: &Option<HashSet<Pubkey, RandomState>>,
356 rpc_id: &Pubkey,
357 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
358) {
359 warn!("{err}");
360 if let Some(ref known_validators) = known_validators {
361 if known_validators.contains(rpc_id) {
362 return;
363 }
364 }
365
366 info!("Excluding {rpc_id} as a future RPC candidate");
367 blacklisted_rpc_nodes.insert(*rpc_id);
368}
369
370fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
371 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
372 cluster_info.save_contact_info();
373 gossip_exit_flag.store(true, Ordering::Relaxed);
374 gossip_service.join().unwrap();
375}
376
377#[allow(clippy::too_many_arguments)]
378pub fn attempt_download_genesis_and_snapshot(
379 rpc_contact_info: &ContactInfo,
380 ledger_path: &Path,
381 validator_config: &mut ValidatorConfig,
382 bootstrap_config: &RpcBootstrapConfig,
383 use_progress_bar: bool,
384 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
385 rpc_client: &RpcClient,
386 full_snapshot_archives_dir: &Path,
387 incremental_snapshot_archives_dir: &Path,
388 maximum_local_snapshot_age: Slot,
389 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
390 minimal_snapshot_download_speed: f32,
391 maximum_snapshot_download_abort: u64,
392 download_abort_count: &mut u64,
393 snapshot_hash: Option<SnapshotHash>,
394 identity_keypair: &Arc<Keypair>,
395 vote_account: &Pubkey,
396 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
397) -> Result<(), String> {
398 download_then_check_genesis_hash(
399 &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
400 ledger_path,
401 &mut validator_config.expected_genesis_hash,
402 bootstrap_config.max_genesis_archive_unpacked_size,
403 bootstrap_config.no_genesis_fetch,
404 use_progress_bar,
405 rpc_client,
406 )?;
407
408 if let Some(gossip) = gossip.take() {
409 shutdown_gossip_service(gossip);
410 }
411
412 let rpc_client_slot = rpc_client
413 .get_slot_with_commitment(CommitmentConfig::finalized())
414 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
415 info!("RPC node root slot: {rpc_client_slot}");
416
417 download_snapshots(
418 full_snapshot_archives_dir,
419 incremental_snapshot_archives_dir,
420 validator_config,
421 bootstrap_config,
422 use_progress_bar,
423 maximum_local_snapshot_age,
424 start_progress,
425 minimal_snapshot_download_speed,
426 maximum_snapshot_download_abort,
427 download_abort_count,
428 snapshot_hash,
429 rpc_contact_info,
430 )?;
431
432 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
433 let rpc_client = RpcClient::new(url);
434 check_vote_account(
435 &rpc_client,
436 &identity_keypair.pubkey(),
437 vote_account,
438 &authorized_voter_keypairs
439 .read()
440 .unwrap()
441 .iter()
442 .map(|k| k.pubkey())
443 .collect::<Vec<_>>(),
444 )
445 .unwrap_or_else(|err| {
446 error!("{err}");
453 exit(1);
454 });
455 }
456 Ok(())
457}
458
459fn ping(addr: &SocketAddr) -> Option<Duration> {
461 let start = Instant::now();
462 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
463 Ok(_) => Some(start.elapsed()),
464 Err(_) => None,
465 }
466}
467
468fn get_vetted_rpc_nodes(
472 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
473 cluster_info: &Arc<ClusterInfo>,
474 cluster_entrypoints: &[ContactInfo],
475 validator_config: &ValidatorConfig,
476 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
477 bootstrap_config: &RpcBootstrapConfig,
478) {
479 while vetted_rpc_nodes.is_empty() {
480 let rpc_node_details = match get_rpc_nodes(
481 cluster_info,
482 cluster_entrypoints,
483 validator_config,
484 blacklisted_rpc_nodes,
485 bootstrap_config,
486 ) {
487 Ok(rpc_node_details) => rpc_node_details,
488 Err(err) => {
489 error!(
490 "Failed to get RPC nodes: {err}. Consider checking system \
491 clock, removing `--no-port-check`, or adjusting \
492 `--known-validator ...` arguments as applicable"
493 );
494 exit(1);
495 }
496 };
497
498 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
499 vetted_rpc_nodes.extend(
500 rpc_node_details
501 .into_par_iter()
502 .filter_map(|rpc_node_details| {
503 let GetRpcNodeResult {
504 rpc_contact_info,
505 snapshot_hash,
506 } = rpc_node_details;
507
508 info!(
509 "Using RPC service from node {}: {:?}",
510 rpc_contact_info.pubkey(),
511 rpc_contact_info.rpc()
512 );
513
514 let rpc_addr = rpc_contact_info.rpc().ok()?;
515 let ping_time = ping(&rpc_addr);
516
517 let rpc_client =
518 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
519
520 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
521 })
522 .filter(
523 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
524 .get_version()
525 {
526 Ok(rpc_version) => {
527 if let Some(ping_time) = ping_time {
528 info!(
529 "RPC node version: {} Ping: {}ms",
530 rpc_version.solana_core,
531 ping_time.as_millis()
532 );
533 true
534 } else {
535 fail_rpc_node(
536 "Failed to ping RPC".to_string(),
537 &validator_config.known_validators,
538 rpc_contact_info.pubkey(),
539 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
540 );
541 false
542 }
543 }
544 Err(err) => {
545 fail_rpc_node(
546 format!("Failed to get RPC node version: {err}"),
547 &validator_config.known_validators,
548 rpc_contact_info.pubkey(),
549 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
550 );
551 false
552 }
553 },
554 )
555 .collect::<Vec<(
556 ContactInfo,
557 Option<SnapshotHash>,
558 RpcClient,
559 Option<Duration>,
560 )>>()
561 .into_iter()
562 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
563 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
564 (rpc_contact_info, snapshot_hash, rpc_client)
565 })
566 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
567 );
568 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
569 }
570}
571
572#[allow(clippy::too_many_arguments)]
573pub fn rpc_bootstrap(
574 node: &Node,
575 identity_keypair: &Arc<Keypair>,
576 ledger_path: &Path,
577 full_snapshot_archives_dir: &Path,
578 incremental_snapshot_archives_dir: &Path,
579 vote_account: &Pubkey,
580 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
581 cluster_entrypoints: &[ContactInfo],
582 validator_config: &mut ValidatorConfig,
583 bootstrap_config: RpcBootstrapConfig,
584 do_port_check: bool,
585 use_progress_bar: bool,
586 maximum_local_snapshot_age: Slot,
587 should_check_duplicate_instance: bool,
588 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
589 minimal_snapshot_download_speed: f32,
590 maximum_snapshot_download_abort: u64,
591 socket_addr_space: SocketAddrSpace,
592) {
593 if do_port_check {
594 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
595 order.shuffle(&mut thread_rng());
596 if order.into_iter().all(|i| {
597 !verify_reachable_ports(
598 node,
599 &cluster_entrypoints[i],
600 validator_config,
601 &socket_addr_space,
602 )
603 }) {
604 exit(1);
605 }
606 }
607
608 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
609 return;
610 }
611
612 let total_snapshot_download_time = Instant::now();
613 let mut get_rpc_nodes_time = Duration::new(0, 0);
614 let mut snapshot_download_time = Duration::new(0, 0);
615 let mut blacklisted_rpc_nodes = HashSet::new();
616 let mut gossip = None;
617 let mut vetted_rpc_nodes = vec![];
618 let mut download_abort_count = 0;
619 loop {
620 if gossip.is_none() {
621 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
622
623 gossip = Some(start_gossip_node(
624 identity_keypair.clone(),
625 cluster_entrypoints,
626 ledger_path,
627 &node
628 .info
629 .gossip()
630 .expect("Operator must spin up node with valid gossip address"),
631 node.sockets.gossip.try_clone().unwrap(),
632 validator_config.expected_shred_version,
633 validator_config.gossip_validators.clone(),
634 should_check_duplicate_instance,
635 socket_addr_space,
636 ));
637 }
638
639 let get_rpc_nodes_start = Instant::now();
640 get_vetted_rpc_nodes(
641 &mut vetted_rpc_nodes,
642 &gossip.as_ref().unwrap().0,
643 cluster_entrypoints,
644 validator_config,
645 &mut blacklisted_rpc_nodes,
646 &bootstrap_config,
647 );
648 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
649 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
650
651 let snapshot_download_start = Instant::now();
652 let download_result = attempt_download_genesis_and_snapshot(
653 &rpc_contact_info,
654 ledger_path,
655 validator_config,
656 &bootstrap_config,
657 use_progress_bar,
658 &mut gossip,
659 &rpc_client,
660 full_snapshot_archives_dir,
661 incremental_snapshot_archives_dir,
662 maximum_local_snapshot_age,
663 start_progress,
664 minimal_snapshot_download_speed,
665 maximum_snapshot_download_abort,
666 &mut download_abort_count,
667 snapshot_hash,
668 identity_keypair,
669 vote_account,
670 authorized_voter_keypairs.clone(),
671 );
672 snapshot_download_time += snapshot_download_start.elapsed();
673 match download_result {
674 Ok(()) => break,
675 Err(err) => {
676 fail_rpc_node(
677 err,
678 &validator_config.known_validators,
679 rpc_contact_info.pubkey(),
680 &mut blacklisted_rpc_nodes,
681 );
682 }
683 }
684 }
685
686 if let Some(gossip) = gossip.take() {
687 shutdown_gossip_service(gossip);
688 }
689
690 datapoint_info!(
691 "bootstrap-snapshot-download",
692 (
693 "total_time_secs",
694 total_snapshot_download_time.elapsed().as_secs(),
695 i64
696 ),
697 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
698 (
699 "snapshot_download_time_secs",
700 snapshot_download_time.as_secs(),
701 i64
702 ),
703 ("download_abort_count", download_abort_count, i64),
704 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
705 );
706}
707
708fn get_rpc_nodes(
712 cluster_info: &ClusterInfo,
713 cluster_entrypoints: &[ContactInfo],
714 validator_config: &ValidatorConfig,
715 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
716 bootstrap_config: &RpcBootstrapConfig,
717) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
718 let mut blacklist_timeout = Instant::now();
719 let mut get_rpc_peers_timout = Instant::now();
720 let mut newer_cluster_snapshot_timeout = None;
721 let mut retry_reason = None;
722 loop {
723 std::thread::sleep(Duration::from_secs(1));
725 info!("\n{}", cluster_info.rpc_info_trace());
726
727 let rpc_peers = get_rpc_peers(
728 cluster_info,
729 cluster_entrypoints,
730 validator_config,
731 blacklisted_rpc_nodes,
732 &blacklist_timeout,
733 &mut retry_reason,
734 bootstrap_config,
735 );
736 if rpc_peers.is_empty() {
737 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
738 return Err(GetRpcNodeError::NoRpcPeersFound);
739 }
740 continue;
741 }
742
743 blacklist_timeout = Instant::now();
745 get_rpc_peers_timout = Instant::now();
746 if bootstrap_config.no_snapshot_fetch {
747 let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
748 return Ok(vec![GetRpcNodeResult {
749 rpc_contact_info: random_peer.clone(),
750 snapshot_hash: None,
751 }]);
752 }
753
754 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
755 .as_ref()
756 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
757 .unwrap_or(true)
758 {
759 KnownValidatorsToWaitFor::All
760 } else {
761 KnownValidatorsToWaitFor::Any
762 };
763 let peer_snapshot_hashes = get_peer_snapshot_hashes(
764 cluster_info,
765 &rpc_peers,
766 validator_config.known_validators.as_ref(),
767 known_validators_to_wait_for,
768 bootstrap_config.incremental_snapshot_fetch,
769 );
770 if peer_snapshot_hashes.is_empty() {
771 match newer_cluster_snapshot_timeout {
772 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
773 Some(newer_cluster_snapshot_timeout) => {
774 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
775 return Err(GetRpcNodeError::NoNewerSnapshots);
776 }
777 }
778 }
779 retry_reason = Some("No snapshots available".to_owned());
780 continue;
781 } else {
782 let rpc_peers = peer_snapshot_hashes
783 .iter()
784 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
785 .collect::<Vec<_>>();
786 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
787 info!(
788 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
789 final_snapshot_hash
790 .incr
791 .map(|(slot, _hash)| slot)
792 .unwrap_or(final_snapshot_hash.full.0),
793 rpc_peers.len(),
794 if rpc_peers.len() > 1 { "s" } else { "" },
795 rpc_peers,
796 );
797 let rpc_node_results = peer_snapshot_hashes
798 .iter()
799 .map(|peer_snapshot_hash| GetRpcNodeResult {
800 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
801 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
802 })
803 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
804 .collect();
805 return Ok(rpc_node_results);
806 }
807 }
808}
809
810fn get_highest_local_snapshot_hash(
813 full_snapshot_archives_dir: impl AsRef<Path>,
814 incremental_snapshot_archives_dir: impl AsRef<Path>,
815 incremental_snapshot_fetch: bool,
816) -> Option<(Slot, Hash)> {
817 snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
818 .and_then(|full_snapshot_info| {
819 if incremental_snapshot_fetch {
820 snapshot_utils::get_highest_incremental_snapshot_archive_info(
821 incremental_snapshot_archives_dir,
822 full_snapshot_info.slot(),
823 )
824 .map(|incremental_snapshot_info| {
825 (
826 incremental_snapshot_info.slot(),
827 *incremental_snapshot_info.hash(),
828 )
829 })
830 } else {
831 None
832 }
833 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
834 })
835 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
836}
837
838fn get_peer_snapshot_hashes(
845 cluster_info: &ClusterInfo,
846 rpc_peers: &[ContactInfo],
847 known_validators: Option<&HashSet<Pubkey>>,
848 known_validators_to_wait_for: KnownValidatorsToWaitFor,
849 incremental_snapshot_fetch: bool,
850) -> Vec<PeerSnapshotHash> {
851 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
852 if let Some(known_validators) = known_validators {
853 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
854 cluster_info,
855 known_validators,
856 known_validators_to_wait_for,
857 );
858 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
859 &known_snapshot_hashes,
860 &mut peer_snapshot_hashes,
861 );
862 }
863 if incremental_snapshot_fetch {
864 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
872 &mut peer_snapshot_hashes,
873 );
874 }
875 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
876
877 peer_snapshot_hashes
878}
879
880type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
883
884fn get_snapshot_hashes_from_known_validators(
895 cluster_info: &ClusterInfo,
896 known_validators: &HashSet<Pubkey>,
897 known_validators_to_wait_for: KnownValidatorsToWaitFor,
898) -> KnownSnapshotHashes {
899 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
901
902 if !do_known_validators_have_all_snapshot_hashes(
903 known_validators,
904 known_validators_to_wait_for,
905 get_snapshot_hashes_for_node,
906 ) {
907 debug!(
908 "Snapshot hashes have not been discovered from known validators. \
909 This likely means the gossip tables are not fully populated. \
910 We will sleep and retry..."
911 );
912 return KnownSnapshotHashes::default();
913 }
914
915 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
916}
917
918fn do_known_validators_have_all_snapshot_hashes<'a>(
927 known_validators: impl IntoIterator<Item = &'a Pubkey>,
928 known_validators_to_wait_for: KnownValidatorsToWaitFor,
929 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
930) -> bool {
931 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
932
933 match known_validators_to_wait_for {
934 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
935 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
936 }
937}
938
939#[derive(Debug, Copy, Clone, Eq, PartialEq)]
942enum KnownValidatorsToWaitFor {
943 All,
944 Any,
945}
946
947fn build_known_snapshot_hashes<'a>(
953 nodes: impl IntoIterator<Item = &'a Pubkey>,
954 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
955) -> KnownSnapshotHashes {
956 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
957
958 fn is_any_same_slot_and_different_hash<'a>(
961 needle: &(Slot, Hash),
962 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
963 ) -> bool {
964 haystack
965 .into_iter()
966 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
967 }
968
969 'to_next_node: for node in nodes {
970 let Some(SnapshotHash {
971 full: full_snapshot_hash,
972 incr: incremental_snapshot_hash,
973 }) = get_snapshot_hashes_for_node(node)
974 else {
975 continue 'to_next_node;
976 };
977
978 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
983 warn!(
984 "Ignoring all snapshot hashes from node {node} since we've seen a different full snapshot hash with this slot.\
985 \nfull snapshot hash: {full_snapshot_hash:?}"
986 );
987 debug!(
988 "known full snapshot hashes: {:#?}",
989 known_snapshot_hashes.keys(),
990 );
991 continue 'to_next_node;
992 }
993
994 let known_incremental_snapshot_hashes =
998 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
999
1000 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1001 if is_any_same_slot_and_different_hash(
1006 &incremental_snapshot_hash,
1007 known_incremental_snapshot_hashes.iter(),
1008 ) {
1009 warn!(
1010 "Ignoring incremental snapshot hash from node {node} since we've seen a different incremental snapshot hash with this slot.\
1011 \nfull snapshot hash: {full_snapshot_hash:?}\
1012 \nincremental snapshot hash: {incremental_snapshot_hash:?}"
1013 );
1014 debug!(
1015 "known incremental snapshot hashes based on this slot: {:#?}",
1016 known_incremental_snapshot_hashes.iter(),
1017 );
1018 continue 'to_next_node;
1019 }
1020
1021 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1022 };
1023 }
1024
1025 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1026 known_snapshot_hashes
1027}
1028
1029fn get_eligible_peer_snapshot_hashes(
1035 cluster_info: &ClusterInfo,
1036 rpc_peers: &[ContactInfo],
1037) -> Vec<PeerSnapshotHash> {
1038 let peer_snapshot_hashes = rpc_peers
1039 .iter()
1040 .flat_map(|rpc_peer| {
1041 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1042 PeerSnapshotHash {
1043 rpc_contact_info: rpc_peer.clone(),
1044 snapshot_hash,
1045 }
1046 })
1047 })
1048 .collect();
1049
1050 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1051 peer_snapshot_hashes
1052}
1053
1054fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1056 known_snapshot_hashes: &KnownSnapshotHashes,
1057 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1058) {
1059 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1060 known_snapshot_hashes
1061 .get(&peer_snapshot_hash.snapshot_hash.full)
1062 .map(|known_incremental_hashes| {
1063 if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1064 true
1067 } else {
1068 known_incremental_hashes
1069 .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1070 }
1071 })
1072 .unwrap_or(false)
1073 });
1074
1075 trace!(
1076 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1077 );
1078}
1079
1080fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1082 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1083) {
1084 let highest_full_snapshot_hash = peer_snapshot_hashes
1085 .iter()
1086 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1087 .max_by_key(|(slot, _hash)| *slot);
1088 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1089 return;
1093 };
1094
1095 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1096 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1097 });
1098
1099 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1100}
1101
1102fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1104 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1105) {
1106 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1107 .iter()
1108 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1109 .max_by_key(|(slot, _hash)| *slot);
1110
1111 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1112 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1113 });
1114
1115 trace!("retain peer snapshot hashes with highest incremental snapshot slot: {peer_snapshot_hashes:?}");
1116}
1117
1118#[allow(clippy::too_many_arguments)]
1120fn download_snapshots(
1121 full_snapshot_archives_dir: &Path,
1122 incremental_snapshot_archives_dir: &Path,
1123 validator_config: &ValidatorConfig,
1124 bootstrap_config: &RpcBootstrapConfig,
1125 use_progress_bar: bool,
1126 maximum_local_snapshot_age: Slot,
1127 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1128 minimal_snapshot_download_speed: f32,
1129 maximum_snapshot_download_abort: u64,
1130 download_abort_count: &mut u64,
1131 snapshot_hash: Option<SnapshotHash>,
1132 rpc_contact_info: &ContactInfo,
1133) -> Result<(), String> {
1134 if snapshot_hash.is_none() {
1135 return Ok(());
1136 }
1137 let SnapshotHash {
1138 full: full_snapshot_hash,
1139 incr: incremental_snapshot_hash,
1140 } = snapshot_hash.unwrap();
1141
1142 if should_use_local_snapshot(
1144 full_snapshot_archives_dir,
1145 incremental_snapshot_archives_dir,
1146 maximum_local_snapshot_age,
1147 full_snapshot_hash,
1148 incremental_snapshot_hash,
1149 bootstrap_config.incremental_snapshot_fetch,
1150 ) {
1151 return Ok(());
1152 }
1153
1154 if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1156 .into_iter()
1157 .any(|snapshot_archive| {
1158 snapshot_archive.slot() == full_snapshot_hash.0
1159 && snapshot_archive.hash().0 == full_snapshot_hash.1
1160 })
1161 {
1162 info!(
1163 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1164 full_snapshot_hash.0, full_snapshot_hash.1
1165 );
1166 } else {
1167 download_snapshot(
1168 full_snapshot_archives_dir,
1169 incremental_snapshot_archives_dir,
1170 validator_config,
1171 bootstrap_config,
1172 use_progress_bar,
1173 start_progress,
1174 minimal_snapshot_download_speed,
1175 maximum_snapshot_download_abort,
1176 download_abort_count,
1177 rpc_contact_info,
1178 full_snapshot_hash,
1179 SnapshotKind::FullSnapshot,
1180 )?;
1181 }
1182
1183 if bootstrap_config.incremental_snapshot_fetch {
1184 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1186 if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1187 .into_iter()
1188 .any(|snapshot_archive| {
1189 snapshot_archive.slot() == incremental_snapshot_hash.0
1190 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1191 && snapshot_archive.base_slot() == full_snapshot_hash.0
1192 })
1193 {
1194 info!(
1195 "Incremental snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1196 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1197 );
1198 } else {
1199 download_snapshot(
1200 full_snapshot_archives_dir,
1201 incremental_snapshot_archives_dir,
1202 validator_config,
1203 bootstrap_config,
1204 use_progress_bar,
1205 start_progress,
1206 minimal_snapshot_download_speed,
1207 maximum_snapshot_download_abort,
1208 download_abort_count,
1209 rpc_contact_info,
1210 incremental_snapshot_hash,
1211 SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1212 )?;
1213 }
1214 }
1215 }
1216
1217 Ok(())
1218}
1219
1220#[allow(clippy::too_many_arguments)]
1222fn download_snapshot(
1223 full_snapshot_archives_dir: &Path,
1224 incremental_snapshot_archives_dir: &Path,
1225 validator_config: &ValidatorConfig,
1226 bootstrap_config: &RpcBootstrapConfig,
1227 use_progress_bar: bool,
1228 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1229 minimal_snapshot_download_speed: f32,
1230 maximum_snapshot_download_abort: u64,
1231 download_abort_count: &mut u64,
1232 rpc_contact_info: &ContactInfo,
1233 desired_snapshot_hash: (Slot, Hash),
1234 snapshot_kind: SnapshotKind,
1235) -> Result<(), String> {
1236 let maximum_full_snapshot_archives_to_retain = validator_config
1237 .snapshot_config
1238 .maximum_full_snapshot_archives_to_retain;
1239 let maximum_incremental_snapshot_archives_to_retain = validator_config
1240 .snapshot_config
1241 .maximum_incremental_snapshot_archives_to_retain;
1242
1243 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1244 slot: desired_snapshot_hash.0,
1245 rpc_addr: rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1246 };
1247 let desired_snapshot_hash = (
1248 desired_snapshot_hash.0,
1249 solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1250 );
1251 download_snapshot_archive(
1252 &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1253 full_snapshot_archives_dir,
1254 incremental_snapshot_archives_dir,
1255 desired_snapshot_hash,
1256 snapshot_kind,
1257 maximum_full_snapshot_archives_to_retain,
1258 maximum_incremental_snapshot_archives_to_retain,
1259 use_progress_bar,
1260 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1261 debug!("Download progress: {download_progress:?}");
1262 if download_progress.last_throughput < minimal_snapshot_download_speed
1263 && download_progress.notification_count <= 1
1264 && download_progress.percentage_done <= 2_f32
1265 && download_progress.estimated_remaining_time > 60_f32
1266 && *download_abort_count < maximum_snapshot_download_abort
1267 {
1268 if let Some(ref known_validators) = validator_config.known_validators {
1269 if known_validators.contains(rpc_contact_info.pubkey())
1270 && known_validators.len() == 1
1271 && bootstrap_config.only_known_rpc
1272 {
1273 warn!(
1274 "The snapshot download is too slow, throughput: {} < min speed {} \
1275 bytes/sec, but will NOT abort and try a different node as it is the \
1276 only known validator and the --only-known-rpc flag is set. \
1277 Abort count: {}, Progress detail: {:?}",
1278 download_progress.last_throughput,
1279 minimal_snapshot_download_speed,
1280 download_abort_count,
1281 download_progress,
1282 );
1283 return true; }
1285 }
1286 warn!(
1287 "The snapshot download is too slow, throughput: {} < min speed {} \
1288 bytes/sec, will abort and try a different node. \
1289 Abort count: {}, Progress detail: {:?}",
1290 download_progress.last_throughput,
1291 minimal_snapshot_download_speed,
1292 download_abort_count,
1293 download_progress,
1294 );
1295 *download_abort_count += 1;
1296 false
1297 } else {
1298 true
1299 }
1300 })),
1301 )
1302}
1303
1304fn should_use_local_snapshot(
1307 full_snapshot_archives_dir: &Path,
1308 incremental_snapshot_archives_dir: &Path,
1309 maximum_local_snapshot_age: Slot,
1310 full_snapshot_hash: (Slot, Hash),
1311 incremental_snapshot_hash: Option<(Slot, Hash)>,
1312 incremental_snapshot_fetch: bool,
1313) -> bool {
1314 let cluster_snapshot_slot = incremental_snapshot_hash
1315 .map(|(slot, _)| slot)
1316 .unwrap_or(full_snapshot_hash.0);
1317
1318 match get_highest_local_snapshot_hash(
1319 full_snapshot_archives_dir,
1320 incremental_snapshot_archives_dir,
1321 incremental_snapshot_fetch,
1322 ) {
1323 None => {
1324 info!("Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a local snapshot.");
1325 false
1326 }
1327 Some((local_snapshot_slot, _)) => {
1328 if local_snapshot_slot
1329 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1330 {
1331 info!("Reusing local snapshot at slot {local_snapshot_slot} instead of downloading a snapshot for slot {cluster_snapshot_slot}.");
1332 true
1333 } else {
1334 info!("Local snapshot from slot {local_snapshot_slot} is too old. Downloading a newer snapshot for slot {cluster_snapshot_slot}.");
1335 false
1336 }
1337 }
1338 }
1339}
1340
1341fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1343 cluster_info.get_snapshot_hashes_for_node(node).map(
1344 |crds_value::SnapshotHashes {
1345 full, incremental, ..
1346 }| {
1347 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1348 SnapshotHash {
1349 full,
1350 incr: highest_incremental_snapshot_hash,
1351 }
1352 },
1353 )
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358 use super::*;
1359
1360 impl PeerSnapshotHash {
1361 fn new(
1362 rpc_contact_info: ContactInfo,
1363 full_snapshot_hash: (Slot, Hash),
1364 incremental_snapshot_hash: Option<(Slot, Hash)>,
1365 ) -> Self {
1366 Self {
1367 rpc_contact_info,
1368 snapshot_hash: SnapshotHash {
1369 full: full_snapshot_hash,
1370 incr: incremental_snapshot_hash,
1371 },
1372 }
1373 }
1374 }
1375
1376 fn default_contact_info_for_tests() -> ContactInfo {
1377 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1378 }
1379
1380 #[test]
1381 fn test_build_known_snapshot_hashes() {
1382 solana_logger::setup();
1383 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1384 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1385
1386 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1387 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1388
1389 let oracle = {
1391 let mut oracle = HashMap::new();
1392
1393 for (full, incr) in [
1394 (full_snapshot_hash1, None),
1396 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1398 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1400 (full_snapshot_hash2, None),
1402 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1403 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1404 ] {
1405 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1407 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1408 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1409 }
1410
1411 oracle.insert(Pubkey::new_unique(), None);
1413 oracle.insert(Pubkey::new_unique(), None);
1414 oracle.insert(Pubkey::new_unique(), None);
1415
1416 oracle
1417 };
1418
1419 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1420
1421 let known_snapshot_hashes =
1422 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1423
1424 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1427 assert_eq!(known_full_snapshot_hashes.len(), 1);
1428 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1429
1430 let known_incremental_snapshot_hashes =
1432 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1433 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1434 let known_incremental_snapshot_hash =
1435 known_incremental_snapshot_hashes.iter().next().unwrap();
1436
1437 assert!(
1444 known_full_snapshot_hash == &full_snapshot_hash1
1445 || known_full_snapshot_hash == &full_snapshot_hash2
1446 );
1447 assert!(
1448 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1449 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1450 );
1451 }
1452
1453 #[test]
1454 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1455 let known_snapshot_hashes: KnownSnapshotHashes = [
1456 (
1457 (200_000, Hash::new_unique()),
1458 [
1459 (200_200, Hash::new_unique()),
1460 (200_400, Hash::new_unique()),
1461 (200_600, Hash::new_unique()),
1462 (200_800, Hash::new_unique()),
1463 ]
1464 .iter()
1465 .cloned()
1466 .collect(),
1467 ),
1468 (
1469 (300_000, Hash::new_unique()),
1470 [
1471 (300_200, Hash::new_unique()),
1472 (300_400, Hash::new_unique()),
1473 (300_600, Hash::new_unique()),
1474 ]
1475 .iter()
1476 .cloned()
1477 .collect(),
1478 ),
1479 ]
1480 .iter()
1481 .cloned()
1482 .collect();
1483
1484 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1485 let known_full_snapshot_hash = known_snapshot_hash.0;
1486 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1487
1488 let contact_info = default_contact_info_for_tests();
1489 let peer_snapshot_hashes = vec![
1490 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1492 PeerSnapshotHash::new(
1494 contact_info.clone(),
1495 (111_000, Hash::default()),
1496 Some((111_111, Hash::default())),
1497 ),
1498 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1500 PeerSnapshotHash::new(
1502 contact_info.clone(),
1503 (111_000, Hash::default()),
1504 Some(*known_incremental_snapshot_hash),
1505 ),
1506 PeerSnapshotHash::new(
1508 contact_info.clone(),
1509 *known_full_snapshot_hash,
1510 Some((111_111, Hash::default())),
1511 ),
1512 PeerSnapshotHash::new(
1514 contact_info.clone(),
1515 *known_full_snapshot_hash,
1516 Some(*known_incremental_snapshot_hash),
1517 ),
1518 ];
1519
1520 let expected = vec![
1521 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1522 PeerSnapshotHash::new(
1523 contact_info,
1524 *known_full_snapshot_hash,
1525 Some(*known_incremental_snapshot_hash),
1526 ),
1527 ];
1528 let mut actual = peer_snapshot_hashes;
1529 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1530 &known_snapshot_hashes,
1531 &mut actual,
1532 );
1533 assert_eq!(expected, actual);
1534 }
1535
1536 #[test]
1537 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1538 let contact_info = default_contact_info_for_tests();
1539 let peer_snapshot_hashes = vec![
1540 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1542 PeerSnapshotHash::new(
1543 contact_info.clone(),
1544 (100_000, Hash::default()),
1545 Some((100_100, Hash::default())),
1546 ),
1547 PeerSnapshotHash::new(
1548 contact_info.clone(),
1549 (100_000, Hash::default()),
1550 Some((100_200, Hash::default())),
1551 ),
1552 PeerSnapshotHash::new(
1553 contact_info.clone(),
1554 (100_000, Hash::default()),
1555 Some((100_300, Hash::default())),
1556 ),
1557 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1559 PeerSnapshotHash::new(
1560 contact_info.clone(),
1561 (200_000, Hash::default()),
1562 Some((200_100, Hash::default())),
1563 ),
1564 PeerSnapshotHash::new(
1565 contact_info.clone(),
1566 (200_000, Hash::default()),
1567 Some((200_200, Hash::default())),
1568 ),
1569 PeerSnapshotHash::new(
1570 contact_info.clone(),
1571 (200_000, Hash::default()),
1572 Some((200_300, Hash::default())),
1573 ),
1574 ];
1575
1576 let expected = vec![
1577 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1578 PeerSnapshotHash::new(
1579 contact_info.clone(),
1580 (200_000, Hash::default()),
1581 Some((200_100, Hash::default())),
1582 ),
1583 PeerSnapshotHash::new(
1584 contact_info.clone(),
1585 (200_000, Hash::default()),
1586 Some((200_200, Hash::default())),
1587 ),
1588 PeerSnapshotHash::new(
1589 contact_info,
1590 (200_000, Hash::default()),
1591 Some((200_300, Hash::default())),
1592 ),
1593 ];
1594 let mut actual = peer_snapshot_hashes;
1595 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1596 assert_eq!(expected, actual);
1597 }
1598
1599 #[test]
1600 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1601 let contact_info = default_contact_info_for_tests();
1602 let peer_snapshot_hashes = vec![
1603 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1604 PeerSnapshotHash::new(
1605 contact_info.clone(),
1606 (200_000, Hash::default()),
1607 Some((200_100, Hash::default())),
1608 ),
1609 PeerSnapshotHash::new(
1610 contact_info.clone(),
1611 (200_000, Hash::default()),
1612 Some((200_200, Hash::default())),
1613 ),
1614 PeerSnapshotHash::new(
1615 contact_info.clone(),
1616 (200_000, Hash::default()),
1617 Some((200_300, Hash::default())),
1618 ),
1619 PeerSnapshotHash::new(
1620 contact_info.clone(),
1621 (200_000, Hash::default()),
1622 Some((200_010, Hash::default())),
1623 ),
1624 PeerSnapshotHash::new(
1625 contact_info.clone(),
1626 (200_000, Hash::default()),
1627 Some((200_020, Hash::default())),
1628 ),
1629 PeerSnapshotHash::new(
1630 contact_info.clone(),
1631 (200_000, Hash::default()),
1632 Some((200_030, Hash::default())),
1633 ),
1634 ];
1635
1636 let expected = vec![PeerSnapshotHash::new(
1637 contact_info,
1638 (200_000, Hash::default()),
1639 Some((200_300, Hash::default())),
1640 )];
1641 let mut actual = peer_snapshot_hashes;
1642 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1643 assert_eq!(expected, actual);
1644 }
1645
1646 #[test]
1649 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1650 let contact_info = default_contact_info_for_tests();
1651 let peer_snapshot_hashes = vec![
1652 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1653 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1654 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1655 ];
1656
1657 let expected = peer_snapshot_hashes.clone();
1658 let mut actual = peer_snapshot_hashes;
1659 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1660 assert_eq!(expected, actual);
1661 }
1662
1663 #[test]
1666 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1667 {
1668 let mut actual = vec![];
1669 let expected = actual.clone();
1670 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1671 assert_eq!(expected, actual);
1672 }
1673 {
1674 let mut actual = vec![];
1675 let expected = actual.clone();
1676 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1677 assert_eq!(expected, actual);
1678 }
1679 }
1680}