1use {
2 itertools::Itertools,
3 log::*,
4 rand::{seq::SliceRandom, thread_rng, Rng},
5 rayon::prelude::*,
6 solana_clock::Slot,
7 solana_commitment_config::CommitmentConfig,
8 solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
9 solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
10 solana_genesis_utils::download_then_check_genesis_hash,
11 solana_gossip::{
12 cluster_info::ClusterInfo,
13 contact_info::{ContactInfo, Protocol},
14 crds_data,
15 gossip_service::GossipService,
16 node::Node,
17 },
18 solana_hash::Hash,
19 solana_keypair::Keypair,
20 solana_metrics::datapoint_info,
21 solana_pubkey::Pubkey,
22 solana_rpc_client::rpc_client::RpcClient,
23 solana_runtime::{
24 snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
25 snapshot_utils,
26 },
27 solana_signer::Signer,
28 solana_streamer::socket::SocketAddrSpace,
29 std::{
30 collections::{hash_map::RandomState, HashMap, HashSet},
31 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
32 path::Path,
33 process::exit,
34 sync::{
35 atomic::{AtomicBool, Ordering},
36 Arc, RwLock,
37 },
38 time::{Duration, Instant},
39 },
40 thiserror::Error,
41};
42
43const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
47const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
50const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
53const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
55
56pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
57
58pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
59
60#[derive(Debug, PartialEq, Clone)]
61pub struct RpcBootstrapConfig {
62 pub no_genesis_fetch: bool,
63 pub no_snapshot_fetch: bool,
64 pub only_known_rpc: bool,
65 pub max_genesis_archive_unpacked_size: u64,
66 pub check_vote_account: Option<String>,
67 pub incremental_snapshot_fetch: bool,
68}
69
70fn verify_reachable_ports(
71 node: &Node,
72 cluster_entrypoint: &ContactInfo,
73 validator_config: &ValidatorConfig,
74 socket_addr_space: &SocketAddrSpace,
75) -> bool {
76 let verify_address = |addr: &Option<SocketAddr>| -> bool {
77 addr.as_ref()
78 .map(|addr| socket_addr_space.check(addr))
79 .unwrap_or_default()
80 };
81
82 let mut udp_sockets = vec![&node.sockets.repair];
83 udp_sockets.extend(node.sockets.gossip.iter());
84
85 if verify_address(&node.info.serve_repair(Protocol::UDP)) {
86 udp_sockets.push(&node.sockets.serve_repair);
87 }
88 if verify_address(&node.info.tpu(Protocol::UDP)) {
89 udp_sockets.extend(node.sockets.tpu.iter());
90 udp_sockets.extend(&node.sockets.tpu_quic);
91 }
92 if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
93 udp_sockets.extend(node.sockets.tpu_forwards.iter());
94 udp_sockets.extend(&node.sockets.tpu_forwards_quic);
95 }
96 if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
97 udp_sockets.extend(node.sockets.tpu_vote.iter());
98 }
99 if verify_address(&node.info.tvu(Protocol::UDP)) {
100 udp_sockets.extend(node.sockets.tvu.iter());
101 udp_sockets.extend(node.sockets.broadcast.iter());
102 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
103 }
104 if !solana_net_utils::verify_all_reachable_udp(
105 &cluster_entrypoint.gossip().unwrap(),
106 &udp_sockets,
107 ) {
108 return false;
109 }
110
111 let mut tcp_listeners = vec![];
112 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
113 for (purpose, bind_addr, public_addr) in &[
114 ("RPC", rpc_addr, node.info.rpc()),
115 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
116 ] {
117 if verify_address(public_addr) {
118 tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
119 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
120 exit(1);
121 }));
122 }
123 }
124 }
125
126 if let Some(ip_echo) = &node.sockets.ip_echo {
127 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
128 tcp_listeners.push(ip_echo);
129 }
130
131 solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
132}
133
134fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
135 if let Some(known_validators) = known_validators {
136 known_validators.contains(id)
137 } else {
138 false
139 }
140}
141
142fn start_gossip_node(
143 identity_keypair: Arc<Keypair>,
144 cluster_entrypoints: &[ContactInfo],
145 ledger_path: &Path,
146 gossip_addr: &SocketAddr,
147 gossip_sockets: Arc<[UdpSocket]>,
148 expected_shred_version: u16,
149 gossip_validators: Option<HashSet<Pubkey>>,
150 should_check_duplicate_instance: bool,
151 socket_addr_space: SocketAddrSpace,
152) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
153 let contact_info = ClusterInfo::gossip_contact_info(
154 identity_keypair.pubkey(),
155 *gossip_addr,
156 expected_shred_version,
157 );
158 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
159 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
160 cluster_info.restore_contact_info(ledger_path, 0);
161 let cluster_info = Arc::new(cluster_info);
162
163 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
164 let gossip_service = GossipService::new(
165 &cluster_info,
166 None,
167 gossip_sockets,
168 gossip_validators,
169 should_check_duplicate_instance,
170 None,
171 gossip_exit_flag.clone(),
172 );
173 (cluster_info, gossip_exit_flag, gossip_service)
174}
175
176fn get_rpc_peers(
177 cluster_info: &ClusterInfo,
178 validator_config: &ValidatorConfig,
179 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
180 blacklist_timeout: &Instant,
181 retry_reason: &mut Option<String>,
182 bootstrap_config: &RpcBootstrapConfig,
183) -> Vec<ContactInfo> {
184 let shred_version = validator_config
185 .expected_shred_version
186 .unwrap_or_else(|| cluster_info.my_shred_version());
187
188 info!(
189 "Searching for an RPC service with shred version {shred_version}{}...",
190 retry_reason
191 .as_ref()
192 .map(|s| format!(" (Retrying: {s})"))
193 .unwrap_or_default()
194 );
195
196 let mut rpc_peers = cluster_info.rpc_peers();
197 if bootstrap_config.only_known_rpc {
198 rpc_peers.retain(|rpc_peer| {
199 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
200 });
201 }
202
203 let rpc_peers_total = rpc_peers.len();
204
205 let rpc_peers: Vec<_> = rpc_peers
207 .into_iter()
208 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
209 .collect();
210 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
211 let rpc_known_peers = rpc_peers
212 .iter()
213 .filter(|rpc_peer| {
214 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
215 })
216 .count();
217
218 info!(
219 "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
220 {rpc_peers_blacklisted} blacklisted"
221 );
222
223 if rpc_peers_blacklisted == rpc_peers_total {
224 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
225 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
226 {
227 blacklisted_rpc_nodes.clear();
230 Some("Blacklist timeout expired".to_owned())
231 } else {
232 Some("Wait for known rpc peers".to_owned())
233 };
234 return vec![];
235 }
236 rpc_peers
237}
238
239fn check_vote_account(
240 rpc_client: &RpcClient,
241 identity_pubkey: &Pubkey,
242 vote_account_address: &Pubkey,
243 authorized_voter_pubkeys: &[Pubkey],
244) -> Result<(), String> {
245 let vote_account = rpc_client
246 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
247 .map_err(|err| format!("failed to fetch vote account: {err}"))?
248 .value
249 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
250
251 if vote_account.owner != solana_vote_program::id() {
252 return Err(format!(
253 "not a vote account (owned by {}): {}",
254 vote_account.owner, vote_account_address
255 ));
256 }
257
258 let identity_account = rpc_client
259 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
260 .map_err(|err| format!("failed to fetch identity account: {err}"))?
261 .value
262 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
263
264 let vote_state = solana_vote_program::vote_state::from(&vote_account);
265 if let Some(vote_state) = vote_state {
266 if vote_state.authorized_voters().is_empty() {
267 return Err("Vote account not yet initialized".to_string());
268 }
269
270 if vote_state.node_pubkey != *identity_pubkey {
271 return Err(format!(
272 "vote account's identity ({}) does not match the validator's identity {}).",
273 vote_state.node_pubkey, identity_pubkey
274 ));
275 }
276
277 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
278 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
279 return Err(format!(
280 "authorized voter {vote_account_authorized_voter_pubkey} not available"
281 ));
282 }
283 }
284 } else {
285 return Err(format!(
286 "invalid vote account data for {vote_account_address}"
287 ));
288 }
289
290 if identity_account.lamports <= 1 {
292 return Err(format!(
293 "underfunded identity account ({}): only {} lamports available",
294 identity_pubkey, identity_account.lamports
295 ));
296 }
297
298 Ok(())
299}
300
301#[derive(Error, Debug)]
302pub enum GetRpcNodeError {
303 #[error("Unable to find any RPC peers")]
304 NoRpcPeersFound,
305
306 #[error("Giving up, did not get newer snapshots from the cluster")]
307 NoNewerSnapshots,
308}
309
310#[derive(Debug)]
314struct GetRpcNodeResult {
315 rpc_contact_info: ContactInfo,
316 snapshot_hash: Option<SnapshotHash>,
317}
318
319#[derive(Debug, PartialEq, Eq, Clone)]
321struct PeerSnapshotHash {
322 rpc_contact_info: ContactInfo,
323 snapshot_hash: SnapshotHash,
324}
325
326#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
329pub struct SnapshotHash {
330 full: (Slot, Hash),
331 incr: Option<(Slot, Hash)>,
332}
333
334pub fn fail_rpc_node(
335 err: String,
336 known_validators: &Option<HashSet<Pubkey, RandomState>>,
337 rpc_id: &Pubkey,
338 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
339) {
340 warn!("{err}");
341 if let Some(ref known_validators) = known_validators {
342 if known_validators.contains(rpc_id) {
343 return;
344 }
345 }
346
347 info!("Excluding {rpc_id} as a future RPC candidate");
348 blacklisted_rpc_nodes.insert(*rpc_id);
349}
350
351fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
352 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
353 cluster_info.save_contact_info();
354 gossip_exit_flag.store(true, Ordering::Relaxed);
355 gossip_service.join().unwrap();
356}
357
358#[allow(clippy::too_many_arguments)]
359pub fn attempt_download_genesis_and_snapshot(
360 rpc_contact_info: &ContactInfo,
361 ledger_path: &Path,
362 validator_config: &mut ValidatorConfig,
363 bootstrap_config: &RpcBootstrapConfig,
364 use_progress_bar: bool,
365 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
366 rpc_client: &RpcClient,
367 maximum_local_snapshot_age: Slot,
368 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
369 minimal_snapshot_download_speed: f32,
370 maximum_snapshot_download_abort: u64,
371 download_abort_count: &mut u64,
372 snapshot_hash: Option<SnapshotHash>,
373 identity_keypair: &Arc<Keypair>,
374 vote_account: &Pubkey,
375 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
376) -> Result<(), String> {
377 download_then_check_genesis_hash(
378 &rpc_contact_info
379 .rpc()
380 .ok_or_else(|| String::from("Invalid RPC address"))?,
381 ledger_path,
382 &mut validator_config.expected_genesis_hash,
383 bootstrap_config.max_genesis_archive_unpacked_size,
384 bootstrap_config.no_genesis_fetch,
385 use_progress_bar,
386 rpc_client,
387 )?;
388
389 if let Some(gossip) = gossip.take() {
390 shutdown_gossip_service(gossip);
391 }
392
393 let rpc_client_slot = rpc_client
394 .get_slot_with_commitment(CommitmentConfig::finalized())
395 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
396 info!("RPC node root slot: {rpc_client_slot}");
397
398 download_snapshots(
399 validator_config,
400 bootstrap_config,
401 use_progress_bar,
402 maximum_local_snapshot_age,
403 start_progress,
404 minimal_snapshot_download_speed,
405 maximum_snapshot_download_abort,
406 download_abort_count,
407 snapshot_hash,
408 rpc_contact_info,
409 )?;
410
411 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
412 let rpc_client = RpcClient::new(url);
413 check_vote_account(
414 &rpc_client,
415 &identity_keypair.pubkey(),
416 vote_account,
417 &authorized_voter_keypairs
418 .read()
419 .unwrap()
420 .iter()
421 .map(|k| k.pubkey())
422 .collect::<Vec<_>>(),
423 )
424 .unwrap_or_else(|err| {
425 error!("{err}");
432 exit(1);
433 });
434 }
435 Ok(())
436}
437
438fn ping(addr: &SocketAddr) -> Option<Duration> {
440 let start = Instant::now();
441 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
442 Ok(_) => Some(start.elapsed()),
443 Err(_) => None,
444 }
445}
446
447fn get_vetted_rpc_nodes(
451 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
452 cluster_info: &Arc<ClusterInfo>,
453 validator_config: &ValidatorConfig,
454 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
455 bootstrap_config: &RpcBootstrapConfig,
456) {
457 while vetted_rpc_nodes.is_empty() {
458 let rpc_node_details = match get_rpc_nodes(
459 cluster_info,
460 validator_config,
461 blacklisted_rpc_nodes,
462 bootstrap_config,
463 ) {
464 Ok(rpc_node_details) => rpc_node_details,
465 Err(err) => {
466 error!(
467 "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
468 `--no-port-check`, or adjusting `--known-validator ...` arguments as \
469 applicable"
470 );
471 exit(1);
472 }
473 };
474
475 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
476 vetted_rpc_nodes.extend(
477 rpc_node_details
478 .into_par_iter()
479 .filter_map(|rpc_node_details| {
480 let GetRpcNodeResult {
481 rpc_contact_info,
482 snapshot_hash,
483 } = rpc_node_details;
484
485 info!(
486 "Using RPC service from node {}: {:?}",
487 rpc_contact_info.pubkey(),
488 rpc_contact_info.rpc()
489 );
490
491 let rpc_addr = rpc_contact_info.rpc()?;
492 let ping_time = ping(&rpc_addr);
493
494 let rpc_client =
495 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
496
497 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
498 })
499 .filter(
500 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
501 .get_version()
502 {
503 Ok(rpc_version) => {
504 if let Some(ping_time) = ping_time {
505 info!(
506 "RPC node version: {} Ping: {}ms",
507 rpc_version.solana_core,
508 ping_time.as_millis()
509 );
510 true
511 } else {
512 fail_rpc_node(
513 "Failed to ping RPC".to_string(),
514 &validator_config.known_validators,
515 rpc_contact_info.pubkey(),
516 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
517 );
518 false
519 }
520 }
521 Err(err) => {
522 fail_rpc_node(
523 format!("Failed to get RPC node version: {err}"),
524 &validator_config.known_validators,
525 rpc_contact_info.pubkey(),
526 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
527 );
528 false
529 }
530 },
531 )
532 .collect::<Vec<(
533 ContactInfo,
534 Option<SnapshotHash>,
535 RpcClient,
536 Option<Duration>,
537 )>>()
538 .into_iter()
539 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
540 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
541 (rpc_contact_info, snapshot_hash, rpc_client)
542 })
543 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
544 );
545 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
546 }
547}
548
549#[allow(clippy::too_many_arguments)]
550pub fn rpc_bootstrap(
551 node: &Node,
552 identity_keypair: &Arc<Keypair>,
553 ledger_path: &Path,
554 vote_account: &Pubkey,
555 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
556 cluster_entrypoints: &[ContactInfo],
557 validator_config: &mut ValidatorConfig,
558 bootstrap_config: RpcBootstrapConfig,
559 do_port_check: bool,
560 use_progress_bar: bool,
561 maximum_local_snapshot_age: Slot,
562 should_check_duplicate_instance: bool,
563 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
564 minimal_snapshot_download_speed: f32,
565 maximum_snapshot_download_abort: u64,
566 socket_addr_space: SocketAddrSpace,
567) {
568 if do_port_check {
569 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
570 order.shuffle(&mut thread_rng());
571 if order.into_iter().all(|i| {
572 !verify_reachable_ports(
573 node,
574 &cluster_entrypoints[i],
575 validator_config,
576 &socket_addr_space,
577 )
578 }) {
579 exit(1);
580 }
581 }
582
583 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
584 return;
585 }
586
587 let total_snapshot_download_time = Instant::now();
588 let mut get_rpc_nodes_time = Duration::new(0, 0);
589 let mut snapshot_download_time = Duration::new(0, 0);
590 let mut blacklisted_rpc_nodes = HashSet::new();
591 let mut gossip = None;
592 let mut vetted_rpc_nodes = vec![];
593 let mut download_abort_count = 0;
594 loop {
595 if gossip.is_none() {
596 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
597
598 gossip = Some(start_gossip_node(
599 identity_keypair.clone(),
600 cluster_entrypoints,
601 ledger_path,
602 &node
603 .info
604 .gossip()
605 .expect("Operator must spin up node with valid gossip address"),
606 node.sockets.gossip.clone(),
607 validator_config
608 .expected_shred_version
609 .expect("expected_shred_version should not be None"),
610 validator_config.gossip_validators.clone(),
611 should_check_duplicate_instance,
612 socket_addr_space,
613 ));
614 }
615
616 let get_rpc_nodes_start = Instant::now();
617 get_vetted_rpc_nodes(
618 &mut vetted_rpc_nodes,
619 &gossip.as_ref().unwrap().0,
620 validator_config,
621 &mut blacklisted_rpc_nodes,
622 &bootstrap_config,
623 );
624 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
625 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
626
627 let snapshot_download_start = Instant::now();
628 let download_result = attempt_download_genesis_and_snapshot(
629 &rpc_contact_info,
630 ledger_path,
631 validator_config,
632 &bootstrap_config,
633 use_progress_bar,
634 &mut gossip,
635 &rpc_client,
636 maximum_local_snapshot_age,
637 start_progress,
638 minimal_snapshot_download_speed,
639 maximum_snapshot_download_abort,
640 &mut download_abort_count,
641 snapshot_hash,
642 identity_keypair,
643 vote_account,
644 authorized_voter_keypairs.clone(),
645 );
646 snapshot_download_time += snapshot_download_start.elapsed();
647 match download_result {
648 Ok(()) => break,
649 Err(err) => {
650 fail_rpc_node(
651 err,
652 &validator_config.known_validators,
653 rpc_contact_info.pubkey(),
654 &mut blacklisted_rpc_nodes,
655 );
656 }
657 }
658 }
659
660 if let Some(gossip) = gossip.take() {
661 shutdown_gossip_service(gossip);
662 }
663
664 datapoint_info!(
665 "bootstrap-snapshot-download",
666 (
667 "total_time_secs",
668 total_snapshot_download_time.elapsed().as_secs(),
669 i64
670 ),
671 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
672 (
673 "snapshot_download_time_secs",
674 snapshot_download_time.as_secs(),
675 i64
676 ),
677 ("download_abort_count", download_abort_count, i64),
678 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
679 );
680}
681
682fn get_rpc_nodes(
686 cluster_info: &ClusterInfo,
687 validator_config: &ValidatorConfig,
688 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
689 bootstrap_config: &RpcBootstrapConfig,
690) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
691 let mut blacklist_timeout = Instant::now();
692 let mut get_rpc_peers_timout = Instant::now();
693 let mut newer_cluster_snapshot_timeout = None;
694 let mut retry_reason = None;
695 loop {
696 std::thread::sleep(Duration::from_secs(1));
698 info!("\n{}", cluster_info.rpc_info_trace());
699
700 let rpc_peers = get_rpc_peers(
701 cluster_info,
702 validator_config,
703 blacklisted_rpc_nodes,
704 &blacklist_timeout,
705 &mut retry_reason,
706 bootstrap_config,
707 );
708 if rpc_peers.is_empty() {
709 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
710 return Err(GetRpcNodeError::NoRpcPeersFound);
711 }
712 continue;
713 }
714
715 blacklist_timeout = Instant::now();
717 get_rpc_peers_timout = Instant::now();
718 if bootstrap_config.no_snapshot_fetch {
719 let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
720 return Ok(vec![GetRpcNodeResult {
721 rpc_contact_info: random_peer.clone(),
722 snapshot_hash: None,
723 }]);
724 }
725
726 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
727 .as_ref()
728 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
729 .unwrap_or(true)
730 {
731 KnownValidatorsToWaitFor::All
732 } else {
733 KnownValidatorsToWaitFor::Any
734 };
735 let peer_snapshot_hashes = get_peer_snapshot_hashes(
736 cluster_info,
737 &rpc_peers,
738 validator_config.known_validators.as_ref(),
739 known_validators_to_wait_for,
740 bootstrap_config.incremental_snapshot_fetch,
741 );
742 if peer_snapshot_hashes.is_empty() {
743 match newer_cluster_snapshot_timeout {
744 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
745 Some(newer_cluster_snapshot_timeout) => {
746 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
747 return Err(GetRpcNodeError::NoNewerSnapshots);
748 }
749 }
750 }
751 retry_reason = Some("No snapshots available".to_owned());
752 continue;
753 } else {
754 let rpc_peers = peer_snapshot_hashes
755 .iter()
756 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
757 .collect::<Vec<_>>();
758 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
759 info!(
760 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
761 final_snapshot_hash
762 .incr
763 .map(|(slot, _hash)| slot)
764 .unwrap_or(final_snapshot_hash.full.0),
765 rpc_peers.len(),
766 if rpc_peers.len() > 1 { "s" } else { "" },
767 rpc_peers,
768 );
769 let rpc_node_results = peer_snapshot_hashes
770 .iter()
771 .map(|peer_snapshot_hash| GetRpcNodeResult {
772 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
773 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
774 })
775 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
776 .collect();
777 return Ok(rpc_node_results);
778 }
779 }
780}
781
782fn get_highest_local_snapshot_hash(
785 full_snapshot_archives_dir: impl AsRef<Path>,
786 incremental_snapshot_archives_dir: impl AsRef<Path>,
787 incremental_snapshot_fetch: bool,
788) -> Option<(Slot, Hash)> {
789 snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
790 .and_then(|full_snapshot_info| {
791 if incremental_snapshot_fetch {
792 snapshot_utils::get_highest_incremental_snapshot_archive_info(
793 incremental_snapshot_archives_dir,
794 full_snapshot_info.slot(),
795 )
796 .map(|incremental_snapshot_info| {
797 (
798 incremental_snapshot_info.slot(),
799 *incremental_snapshot_info.hash(),
800 )
801 })
802 } else {
803 None
804 }
805 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
806 })
807 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
808}
809
810fn get_peer_snapshot_hashes(
817 cluster_info: &ClusterInfo,
818 rpc_peers: &[ContactInfo],
819 known_validators: Option<&HashSet<Pubkey>>,
820 known_validators_to_wait_for: KnownValidatorsToWaitFor,
821 incremental_snapshot_fetch: bool,
822) -> Vec<PeerSnapshotHash> {
823 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
824 if let Some(known_validators) = known_validators {
825 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
826 cluster_info,
827 known_validators,
828 known_validators_to_wait_for,
829 );
830 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
831 &known_snapshot_hashes,
832 &mut peer_snapshot_hashes,
833 );
834 }
835 if incremental_snapshot_fetch {
836 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
844 &mut peer_snapshot_hashes,
845 );
846 }
847 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
848
849 peer_snapshot_hashes
850}
851
852type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
855
856fn get_snapshot_hashes_from_known_validators(
867 cluster_info: &ClusterInfo,
868 known_validators: &HashSet<Pubkey>,
869 known_validators_to_wait_for: KnownValidatorsToWaitFor,
870) -> KnownSnapshotHashes {
871 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
873
874 if !do_known_validators_have_all_snapshot_hashes(
875 known_validators,
876 known_validators_to_wait_for,
877 get_snapshot_hashes_for_node,
878 ) {
879 debug!(
880 "Snapshot hashes have not been discovered from known validators. This likely means \
881 the gossip tables are not fully populated. We will sleep and retry..."
882 );
883 return KnownSnapshotHashes::default();
884 }
885
886 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
887}
888
889fn do_known_validators_have_all_snapshot_hashes<'a>(
898 known_validators: impl IntoIterator<Item = &'a Pubkey>,
899 known_validators_to_wait_for: KnownValidatorsToWaitFor,
900 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
901) -> bool {
902 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
903
904 match known_validators_to_wait_for {
905 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
906 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
907 }
908}
909
910#[derive(Debug, Copy, Clone, Eq, PartialEq)]
913enum KnownValidatorsToWaitFor {
914 All,
915 Any,
916}
917
918fn build_known_snapshot_hashes<'a>(
924 nodes: impl IntoIterator<Item = &'a Pubkey>,
925 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
926) -> KnownSnapshotHashes {
927 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
928
929 fn is_any_same_slot_and_different_hash<'a>(
932 needle: &(Slot, Hash),
933 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
934 ) -> bool {
935 haystack
936 .into_iter()
937 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
938 }
939
940 'to_next_node: for node in nodes {
941 let Some(SnapshotHash {
942 full: full_snapshot_hash,
943 incr: incremental_snapshot_hash,
944 }) = get_snapshot_hashes_for_node(node)
945 else {
946 continue 'to_next_node;
947 };
948
949 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
954 warn!(
955 "Ignoring all snapshot hashes from node {node} since we've seen a different full \
956 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
957 );
958 debug!(
959 "known full snapshot hashes: {:#?}",
960 known_snapshot_hashes.keys(),
961 );
962 continue 'to_next_node;
963 }
964
965 let known_incremental_snapshot_hashes =
969 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
970
971 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
972 if is_any_same_slot_and_different_hash(
977 &incremental_snapshot_hash,
978 known_incremental_snapshot_hashes.iter(),
979 ) {
980 warn!(
981 "Ignoring incremental snapshot hash from node {node} since we've seen a \
982 different incremental snapshot hash with this slot. full snapshot hash: \
983 {full_snapshot_hash:?}, incremental snapshot hash: \
984 {incremental_snapshot_hash:?}"
985 );
986 debug!(
987 "known incremental snapshot hashes based on this slot: {:#?}",
988 known_incremental_snapshot_hashes.iter(),
989 );
990 continue 'to_next_node;
991 }
992
993 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
994 };
995 }
996
997 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
998 known_snapshot_hashes
999}
1000
1001fn get_eligible_peer_snapshot_hashes(
1007 cluster_info: &ClusterInfo,
1008 rpc_peers: &[ContactInfo],
1009) -> Vec<PeerSnapshotHash> {
1010 let peer_snapshot_hashes = rpc_peers
1011 .iter()
1012 .flat_map(|rpc_peer| {
1013 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1014 PeerSnapshotHash {
1015 rpc_contact_info: rpc_peer.clone(),
1016 snapshot_hash,
1017 }
1018 })
1019 })
1020 .collect();
1021
1022 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1023 peer_snapshot_hashes
1024}
1025
1026fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1028 known_snapshot_hashes: &KnownSnapshotHashes,
1029 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1030) {
1031 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1032 known_snapshot_hashes
1033 .get(&peer_snapshot_hash.snapshot_hash.full)
1034 .map(|known_incremental_hashes| {
1035 if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1036 true
1039 } else {
1040 known_incremental_hashes
1041 .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1042 }
1043 })
1044 .unwrap_or(false)
1045 });
1046
1047 trace!(
1048 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1049 );
1050}
1051
1052fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1054 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1055) {
1056 let highest_full_snapshot_hash = peer_snapshot_hashes
1057 .iter()
1058 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1059 .max_by_key(|(slot, _hash)| *slot);
1060 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1061 return;
1065 };
1066
1067 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1068 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1069 });
1070
1071 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1072}
1073
1074fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1076 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1077) {
1078 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1079 .iter()
1080 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1081 .max_by_key(|(slot, _hash)| *slot);
1082
1083 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1084 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1085 });
1086
1087 trace!(
1088 "retain peer snapshot hashes with highest incremental snapshot slot: \
1089 {peer_snapshot_hashes:?}"
1090 );
1091}
1092
1093#[allow(clippy::too_many_arguments)]
1095fn download_snapshots(
1096 validator_config: &ValidatorConfig,
1097 bootstrap_config: &RpcBootstrapConfig,
1098 use_progress_bar: bool,
1099 maximum_local_snapshot_age: Slot,
1100 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1101 minimal_snapshot_download_speed: f32,
1102 maximum_snapshot_download_abort: u64,
1103 download_abort_count: &mut u64,
1104 snapshot_hash: Option<SnapshotHash>,
1105 rpc_contact_info: &ContactInfo,
1106) -> Result<(), String> {
1107 if snapshot_hash.is_none() {
1108 return Ok(());
1109 }
1110 let SnapshotHash {
1111 full: full_snapshot_hash,
1112 incr: incremental_snapshot_hash,
1113 } = snapshot_hash.unwrap();
1114 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1115 let incremental_snapshot_archives_dir = &validator_config
1116 .snapshot_config
1117 .incremental_snapshot_archives_dir;
1118
1119 if should_use_local_snapshot(
1121 full_snapshot_archives_dir,
1122 incremental_snapshot_archives_dir,
1123 maximum_local_snapshot_age,
1124 full_snapshot_hash,
1125 incremental_snapshot_hash,
1126 bootstrap_config.incremental_snapshot_fetch,
1127 ) {
1128 return Ok(());
1129 }
1130
1131 if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1133 .into_iter()
1134 .any(|snapshot_archive| {
1135 snapshot_archive.slot() == full_snapshot_hash.0
1136 && snapshot_archive.hash().0 == full_snapshot_hash.1
1137 })
1138 {
1139 info!(
1140 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1141 full_snapshot_hash.0, full_snapshot_hash.1
1142 );
1143 } else {
1144 download_snapshot(
1145 validator_config,
1146 bootstrap_config,
1147 use_progress_bar,
1148 start_progress,
1149 minimal_snapshot_download_speed,
1150 maximum_snapshot_download_abort,
1151 download_abort_count,
1152 rpc_contact_info,
1153 full_snapshot_hash,
1154 SnapshotKind::FullSnapshot,
1155 )?;
1156 }
1157
1158 if bootstrap_config.incremental_snapshot_fetch {
1159 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1161 if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1162 .into_iter()
1163 .any(|snapshot_archive| {
1164 snapshot_archive.slot() == incremental_snapshot_hash.0
1165 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1166 && snapshot_archive.base_slot() == full_snapshot_hash.0
1167 })
1168 {
1169 info!(
1170 "Incremental snapshot archive already exists locally. Skipping download. \
1171 slot: {}, hash: {}",
1172 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1173 );
1174 } else {
1175 download_snapshot(
1176 validator_config,
1177 bootstrap_config,
1178 use_progress_bar,
1179 start_progress,
1180 minimal_snapshot_download_speed,
1181 maximum_snapshot_download_abort,
1182 download_abort_count,
1183 rpc_contact_info,
1184 incremental_snapshot_hash,
1185 SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1186 )?;
1187 }
1188 }
1189 }
1190
1191 Ok(())
1192}
1193
1194#[allow(clippy::too_many_arguments)]
1196fn download_snapshot(
1197 validator_config: &ValidatorConfig,
1198 bootstrap_config: &RpcBootstrapConfig,
1199 use_progress_bar: bool,
1200 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1201 minimal_snapshot_download_speed: f32,
1202 maximum_snapshot_download_abort: u64,
1203 download_abort_count: &mut u64,
1204 rpc_contact_info: &ContactInfo,
1205 desired_snapshot_hash: (Slot, Hash),
1206 snapshot_kind: SnapshotKind,
1207) -> Result<(), String> {
1208 let maximum_full_snapshot_archives_to_retain = validator_config
1209 .snapshot_config
1210 .maximum_full_snapshot_archives_to_retain;
1211 let maximum_incremental_snapshot_archives_to_retain = validator_config
1212 .snapshot_config
1213 .maximum_incremental_snapshot_archives_to_retain;
1214 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1215 let incremental_snapshot_archives_dir = &validator_config
1216 .snapshot_config
1217 .incremental_snapshot_archives_dir;
1218
1219 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1220 slot: desired_snapshot_hash.0,
1221 rpc_addr: rpc_contact_info
1222 .rpc()
1223 .ok_or_else(|| String::from("Invalid RPC address"))?,
1224 };
1225 let desired_snapshot_hash = (
1226 desired_snapshot_hash.0,
1227 solana_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1228 );
1229 download_snapshot_archive(
1230 &rpc_contact_info
1231 .rpc()
1232 .ok_or_else(|| String::from("Invalid RPC address"))?,
1233 full_snapshot_archives_dir,
1234 incremental_snapshot_archives_dir,
1235 desired_snapshot_hash,
1236 snapshot_kind,
1237 maximum_full_snapshot_archives_to_retain,
1238 maximum_incremental_snapshot_archives_to_retain,
1239 use_progress_bar,
1240 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1241 debug!("Download progress: {download_progress:?}");
1242 if download_progress.last_throughput < minimal_snapshot_download_speed
1243 && download_progress.notification_count <= 1
1244 && download_progress.percentage_done <= 2_f32
1245 && download_progress.estimated_remaining_time > 60_f32
1246 && *download_abort_count < maximum_snapshot_download_abort
1247 {
1248 if let Some(ref known_validators) = validator_config.known_validators {
1249 if known_validators.contains(rpc_contact_info.pubkey())
1250 && known_validators.len() == 1
1251 && bootstrap_config.only_known_rpc
1252 {
1253 warn!(
1254 "The snapshot download is too slow, throughput: {} < min speed {} \
1255 bytes/sec, but will NOT abort and try a different node as it is the \
1256 only known validator and the --only-known-rpc flag is set. Abort \
1257 count: {}, Progress detail: {:?}",
1258 download_progress.last_throughput,
1259 minimal_snapshot_download_speed,
1260 download_abort_count,
1261 download_progress,
1262 );
1263 return true; }
1265 }
1266 warn!(
1267 "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1268 will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1269 download_progress.last_throughput,
1270 minimal_snapshot_download_speed,
1271 download_abort_count,
1272 download_progress,
1273 );
1274 *download_abort_count += 1;
1275 false
1276 } else {
1277 true
1278 }
1279 })),
1280 )
1281}
1282
1283fn should_use_local_snapshot(
1286 full_snapshot_archives_dir: &Path,
1287 incremental_snapshot_archives_dir: &Path,
1288 maximum_local_snapshot_age: Slot,
1289 full_snapshot_hash: (Slot, Hash),
1290 incremental_snapshot_hash: Option<(Slot, Hash)>,
1291 incremental_snapshot_fetch: bool,
1292) -> bool {
1293 let cluster_snapshot_slot = incremental_snapshot_hash
1294 .map(|(slot, _)| slot)
1295 .unwrap_or(full_snapshot_hash.0);
1296
1297 match get_highest_local_snapshot_hash(
1298 full_snapshot_archives_dir,
1299 incremental_snapshot_archives_dir,
1300 incremental_snapshot_fetch,
1301 ) {
1302 None => {
1303 info!(
1304 "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1305 local snapshot."
1306 );
1307 false
1308 }
1309 Some((local_snapshot_slot, _)) => {
1310 if local_snapshot_slot
1311 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1312 {
1313 info!(
1314 "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1315 a snapshot for slot {cluster_snapshot_slot}."
1316 );
1317 true
1318 } else {
1319 info!(
1320 "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1321 newer snapshot for slot {cluster_snapshot_slot}."
1322 );
1323 false
1324 }
1325 }
1326 }
1327}
1328
1329fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1331 cluster_info.get_snapshot_hashes_for_node(node).map(
1332 |crds_data::SnapshotHashes {
1333 full, incremental, ..
1334 }| {
1335 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1336 SnapshotHash {
1337 full,
1338 incr: highest_incremental_snapshot_hash,
1339 }
1340 },
1341 )
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346 use super::*;
1347
1348 impl PeerSnapshotHash {
1349 fn new(
1350 rpc_contact_info: ContactInfo,
1351 full_snapshot_hash: (Slot, Hash),
1352 incremental_snapshot_hash: Option<(Slot, Hash)>,
1353 ) -> Self {
1354 Self {
1355 rpc_contact_info,
1356 snapshot_hash: SnapshotHash {
1357 full: full_snapshot_hash,
1358 incr: incremental_snapshot_hash,
1359 },
1360 }
1361 }
1362 }
1363
1364 fn default_contact_info_for_tests() -> ContactInfo {
1365 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1366 }
1367
1368 #[test]
1369 fn test_build_known_snapshot_hashes() {
1370 solana_logger::setup();
1371 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1372 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1373
1374 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1375 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1376
1377 let oracle = {
1379 let mut oracle = HashMap::new();
1380
1381 for (full, incr) in [
1382 (full_snapshot_hash1, None),
1384 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1386 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1388 (full_snapshot_hash2, None),
1390 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1391 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1392 ] {
1393 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1395 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1396 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1397 }
1398
1399 oracle.insert(Pubkey::new_unique(), None);
1401 oracle.insert(Pubkey::new_unique(), None);
1402 oracle.insert(Pubkey::new_unique(), None);
1403
1404 oracle
1405 };
1406
1407 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1408
1409 let known_snapshot_hashes =
1410 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1411
1412 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1415 assert_eq!(known_full_snapshot_hashes.len(), 1);
1416 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1417
1418 let known_incremental_snapshot_hashes =
1420 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1421 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1422 let known_incremental_snapshot_hash =
1423 known_incremental_snapshot_hashes.iter().next().unwrap();
1424
1425 assert!(
1432 known_full_snapshot_hash == &full_snapshot_hash1
1433 || known_full_snapshot_hash == &full_snapshot_hash2
1434 );
1435 assert!(
1436 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1437 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1438 );
1439 }
1440
1441 #[test]
1442 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1443 let known_snapshot_hashes: KnownSnapshotHashes = [
1444 (
1445 (200_000, Hash::new_unique()),
1446 [
1447 (200_200, Hash::new_unique()),
1448 (200_400, Hash::new_unique()),
1449 (200_600, Hash::new_unique()),
1450 (200_800, Hash::new_unique()),
1451 ]
1452 .iter()
1453 .cloned()
1454 .collect(),
1455 ),
1456 (
1457 (300_000, Hash::new_unique()),
1458 [
1459 (300_200, Hash::new_unique()),
1460 (300_400, Hash::new_unique()),
1461 (300_600, Hash::new_unique()),
1462 ]
1463 .iter()
1464 .cloned()
1465 .collect(),
1466 ),
1467 ]
1468 .iter()
1469 .cloned()
1470 .collect();
1471
1472 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1473 let known_full_snapshot_hash = known_snapshot_hash.0;
1474 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1475
1476 let contact_info = default_contact_info_for_tests();
1477 let peer_snapshot_hashes = vec![
1478 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1480 PeerSnapshotHash::new(
1482 contact_info.clone(),
1483 (111_000, Hash::default()),
1484 Some((111_111, Hash::default())),
1485 ),
1486 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1488 PeerSnapshotHash::new(
1490 contact_info.clone(),
1491 (111_000, Hash::default()),
1492 Some(*known_incremental_snapshot_hash),
1493 ),
1494 PeerSnapshotHash::new(
1496 contact_info.clone(),
1497 *known_full_snapshot_hash,
1498 Some((111_111, Hash::default())),
1499 ),
1500 PeerSnapshotHash::new(
1502 contact_info.clone(),
1503 *known_full_snapshot_hash,
1504 Some(*known_incremental_snapshot_hash),
1505 ),
1506 ];
1507
1508 let expected = vec![
1509 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1510 PeerSnapshotHash::new(
1511 contact_info,
1512 *known_full_snapshot_hash,
1513 Some(*known_incremental_snapshot_hash),
1514 ),
1515 ];
1516 let mut actual = peer_snapshot_hashes;
1517 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1518 &known_snapshot_hashes,
1519 &mut actual,
1520 );
1521 assert_eq!(expected, actual);
1522 }
1523
1524 #[test]
1525 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1526 let contact_info = default_contact_info_for_tests();
1527 let peer_snapshot_hashes = vec![
1528 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1530 PeerSnapshotHash::new(
1531 contact_info.clone(),
1532 (100_000, Hash::default()),
1533 Some((100_100, Hash::default())),
1534 ),
1535 PeerSnapshotHash::new(
1536 contact_info.clone(),
1537 (100_000, Hash::default()),
1538 Some((100_200, Hash::default())),
1539 ),
1540 PeerSnapshotHash::new(
1541 contact_info.clone(),
1542 (100_000, Hash::default()),
1543 Some((100_300, Hash::default())),
1544 ),
1545 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1547 PeerSnapshotHash::new(
1548 contact_info.clone(),
1549 (200_000, Hash::default()),
1550 Some((200_100, Hash::default())),
1551 ),
1552 PeerSnapshotHash::new(
1553 contact_info.clone(),
1554 (200_000, Hash::default()),
1555 Some((200_200, Hash::default())),
1556 ),
1557 PeerSnapshotHash::new(
1558 contact_info.clone(),
1559 (200_000, Hash::default()),
1560 Some((200_300, Hash::default())),
1561 ),
1562 ];
1563
1564 let expected = vec![
1565 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1566 PeerSnapshotHash::new(
1567 contact_info.clone(),
1568 (200_000, Hash::default()),
1569 Some((200_100, Hash::default())),
1570 ),
1571 PeerSnapshotHash::new(
1572 contact_info.clone(),
1573 (200_000, Hash::default()),
1574 Some((200_200, Hash::default())),
1575 ),
1576 PeerSnapshotHash::new(
1577 contact_info,
1578 (200_000, Hash::default()),
1579 Some((200_300, Hash::default())),
1580 ),
1581 ];
1582 let mut actual = peer_snapshot_hashes;
1583 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1584 assert_eq!(expected, actual);
1585 }
1586
1587 #[test]
1588 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1589 let contact_info = default_contact_info_for_tests();
1590 let peer_snapshot_hashes = vec![
1591 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1592 PeerSnapshotHash::new(
1593 contact_info.clone(),
1594 (200_000, Hash::default()),
1595 Some((200_100, Hash::default())),
1596 ),
1597 PeerSnapshotHash::new(
1598 contact_info.clone(),
1599 (200_000, Hash::default()),
1600 Some((200_200, Hash::default())),
1601 ),
1602 PeerSnapshotHash::new(
1603 contact_info.clone(),
1604 (200_000, Hash::default()),
1605 Some((200_300, Hash::default())),
1606 ),
1607 PeerSnapshotHash::new(
1608 contact_info.clone(),
1609 (200_000, Hash::default()),
1610 Some((200_010, Hash::default())),
1611 ),
1612 PeerSnapshotHash::new(
1613 contact_info.clone(),
1614 (200_000, Hash::default()),
1615 Some((200_020, Hash::default())),
1616 ),
1617 PeerSnapshotHash::new(
1618 contact_info.clone(),
1619 (200_000, Hash::default()),
1620 Some((200_030, Hash::default())),
1621 ),
1622 ];
1623
1624 let expected = vec![PeerSnapshotHash::new(
1625 contact_info,
1626 (200_000, Hash::default()),
1627 Some((200_300, Hash::default())),
1628 )];
1629 let mut actual = peer_snapshot_hashes;
1630 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1631 assert_eq!(expected, actual);
1632 }
1633
1634 #[test]
1637 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1638 let contact_info = default_contact_info_for_tests();
1639 let peer_snapshot_hashes = vec![
1640 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1641 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1642 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1643 ];
1644
1645 let expected = peer_snapshot_hashes.clone();
1646 let mut actual = peer_snapshot_hashes;
1647 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1648 assert_eq!(expected, actual);
1649 }
1650
1651 #[test]
1654 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1655 {
1656 let mut actual = vec![];
1657 let expected = actual.clone();
1658 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1659 assert_eq!(expected, actual);
1660 }
1661 {
1662 let mut actual = vec![];
1663 let expected = actual.clone();
1664 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1665 assert_eq!(expected, actual);
1666 }
1667 }
1668}