1use {
2 agave_snapshots::{
3 paths as snapshot_paths, snapshot_archive_info::SnapshotArchiveInfoGetter as _,
4 SnapshotKind,
5 },
6 itertools::Itertools,
7 log::*,
8 rand::{seq::SliceRandom, thread_rng, Rng},
9 rayon::prelude::*,
10 solana_account::ReadableAccount,
11 solana_clock::Slot,
12 solana_commitment_config::CommitmentConfig,
13 solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
14 solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
15 solana_genesis_utils::download_then_check_genesis_hash,
16 solana_gossip::{
17 cluster_info::ClusterInfo,
18 contact_info::{ContactInfo, Protocol},
19 crds_data,
20 gossip_service::GossipService,
21 node::Node,
22 },
23 solana_hash::Hash,
24 solana_keypair::Keypair,
25 solana_metrics::datapoint_info,
26 solana_pubkey::Pubkey,
27 solana_rpc_client::rpc_client::RpcClient,
28 solana_signer::Signer,
29 solana_streamer::socket::SocketAddrSpace,
30 solana_vote_program::vote_state::VoteStateV4,
31 std::{
32 collections::{hash_map::RandomState, HashMap, HashSet},
33 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
34 path::Path,
35 process::exit,
36 sync::{
37 atomic::{AtomicBool, Ordering},
38 Arc, RwLock,
39 },
40 time::{Duration, Instant},
41 },
42 thiserror::Error,
43};
44
45const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
49const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
52const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
55const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
57
58pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
59
60pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
61
62#[derive(Debug, PartialEq, Clone)]
63pub struct RpcBootstrapConfig {
64 pub no_genesis_fetch: bool,
65 pub no_snapshot_fetch: bool,
66 pub only_known_rpc: bool,
67 pub max_genesis_archive_unpacked_size: u64,
68 pub check_vote_account: Option<String>,
69 pub incremental_snapshot_fetch: bool,
70}
71
72fn verify_reachable_ports(
73 node: &Node,
74 cluster_entrypoint: &ContactInfo,
75 validator_config: &ValidatorConfig,
76 socket_addr_space: &SocketAddrSpace,
77) -> bool {
78 let verify_address = |addr: &Option<SocketAddr>| -> bool {
79 addr.as_ref()
80 .map(|addr| socket_addr_space.check(addr))
81 .unwrap_or_default()
82 };
83
84 let mut udp_sockets = vec![&node.sockets.repair];
85 udp_sockets.extend(node.sockets.gossip.iter());
86
87 if verify_address(&node.info.serve_repair(Protocol::UDP)) {
88 udp_sockets.push(&node.sockets.serve_repair);
89 }
90 if verify_address(&node.info.tpu(Protocol::UDP)) {
91 udp_sockets.extend(node.sockets.tpu.iter());
92 udp_sockets.extend(&node.sockets.tpu_quic);
93 }
94 if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
95 udp_sockets.extend(node.sockets.tpu_forwards.iter());
96 udp_sockets.extend(&node.sockets.tpu_forwards_quic);
97 }
98 if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
99 udp_sockets.extend(node.sockets.tpu_vote.iter());
100 }
101 if verify_address(&node.info.tvu(Protocol::UDP)) {
102 udp_sockets.extend(node.sockets.tvu.iter());
103 udp_sockets.extend(node.sockets.broadcast.iter());
104 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
105 }
106 if !solana_net_utils::verify_all_reachable_udp(
107 &cluster_entrypoint.gossip().unwrap(),
108 &udp_sockets,
109 ) {
110 return false;
111 }
112
113 let mut tcp_listeners = vec![];
114 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
115 for (purpose, bind_addr, public_addr) in &[
116 ("RPC", rpc_addr, node.info.rpc()),
117 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
118 ] {
119 if verify_address(public_addr) {
120 tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
121 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
122 exit(1);
123 }));
124 }
125 }
126 }
127
128 if let Some(ip_echo) = &node.sockets.ip_echo {
129 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
130 tcp_listeners.push(ip_echo);
131 }
132
133 solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
134}
135
136fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
137 if let Some(known_validators) = known_validators {
138 known_validators.contains(id)
139 } else {
140 false
141 }
142}
143
144fn start_gossip_node(
145 identity_keypair: Arc<Keypair>,
146 cluster_entrypoints: &[ContactInfo],
147 ledger_path: &Path,
148 gossip_addr: &SocketAddr,
149 gossip_sockets: Arc<[UdpSocket]>,
150 expected_shred_version: u16,
151 gossip_validators: Option<HashSet<Pubkey>>,
152 should_check_duplicate_instance: bool,
153 socket_addr_space: SocketAddrSpace,
154) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
155 let contact_info = ClusterInfo::gossip_contact_info(
156 identity_keypair.pubkey(),
157 *gossip_addr,
158 expected_shred_version,
159 );
160 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
161 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
162 cluster_info.restore_contact_info(ledger_path, 0);
163 let cluster_info = Arc::new(cluster_info);
164
165 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
166 let gossip_service = GossipService::new(
167 &cluster_info,
168 None,
169 gossip_sockets,
170 gossip_validators,
171 should_check_duplicate_instance,
172 None,
173 gossip_exit_flag.clone(),
174 );
175 (cluster_info, gossip_exit_flag, gossip_service)
176}
177
178fn get_rpc_peers(
179 cluster_info: &ClusterInfo,
180 validator_config: &ValidatorConfig,
181 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
182 blacklist_timeout: &Instant,
183 retry_reason: &mut Option<String>,
184 bootstrap_config: &RpcBootstrapConfig,
185) -> Vec<ContactInfo> {
186 let shred_version = validator_config
187 .expected_shred_version
188 .unwrap_or_else(|| cluster_info.my_shred_version());
189
190 info!(
191 "Searching for an RPC service with shred version {shred_version}{}...",
192 retry_reason
193 .as_ref()
194 .map(|s| format!(" (Retrying: {s})"))
195 .unwrap_or_default()
196 );
197
198 let mut rpc_peers = cluster_info.rpc_peers();
199 if bootstrap_config.only_known_rpc {
200 rpc_peers.retain(|rpc_peer| {
201 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
202 });
203 }
204
205 let rpc_peers_total = rpc_peers.len();
206
207 let rpc_peers: Vec<_> = rpc_peers
209 .into_iter()
210 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
211 .collect();
212 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
213 let rpc_known_peers = rpc_peers
214 .iter()
215 .filter(|rpc_peer| {
216 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
217 })
218 .count();
219
220 info!(
221 "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
222 {rpc_peers_blacklisted} blacklisted"
223 );
224
225 if rpc_peers_blacklisted == rpc_peers_total {
226 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
227 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
228 {
229 blacklisted_rpc_nodes.clear();
232 Some("Blacklist timeout expired".to_owned())
233 } else {
234 Some("Wait for known rpc peers".to_owned())
235 };
236 return vec![];
237 }
238 rpc_peers
239}
240
241fn check_vote_account(
242 rpc_client: &RpcClient,
243 identity_pubkey: &Pubkey,
244 vote_account_address: &Pubkey,
245 authorized_voter_pubkeys: &[Pubkey],
246) -> Result<(), String> {
247 let vote_account = rpc_client
248 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
249 .map_err(|err| format!("failed to fetch vote account: {err}"))?
250 .value
251 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
252
253 if vote_account.owner != solana_vote_program::id() {
254 return Err(format!(
255 "not a vote account (owned by {}): {}",
256 vote_account.owner, vote_account_address
257 ));
258 }
259
260 let identity_account = rpc_client
261 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
262 .map_err(|err| format!("failed to fetch identity account: {err}"))?
263 .value
264 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
265
266 let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
267 if let Some(vote_state) = vote_state {
268 if vote_state.authorized_voters.is_empty() {
269 return Err("Vote account not yet initialized".to_string());
270 }
271
272 if vote_state.node_pubkey != *identity_pubkey {
273 return Err(format!(
274 "vote account's identity ({}) does not match the validator's identity {}).",
275 vote_state.node_pubkey, identity_pubkey
276 ));
277 }
278
279 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
280 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
281 return Err(format!(
282 "authorized voter {vote_account_authorized_voter_pubkey} not available"
283 ));
284 }
285 }
286 } else {
287 return Err(format!(
288 "invalid vote account data for {vote_account_address}"
289 ));
290 }
291
292 if identity_account.lamports <= 1 {
294 return Err(format!(
295 "underfunded identity account ({}): only {} lamports available",
296 identity_pubkey, identity_account.lamports
297 ));
298 }
299
300 Ok(())
301}
302
303#[derive(Error, Debug)]
304pub enum GetRpcNodeError {
305 #[error("Unable to find any RPC peers")]
306 NoRpcPeersFound,
307
308 #[error("Giving up, did not get newer snapshots from the cluster")]
309 NoNewerSnapshots,
310}
311
312#[derive(Debug)]
316struct GetRpcNodeResult {
317 rpc_contact_info: ContactInfo,
318 snapshot_hash: Option<SnapshotHash>,
319}
320
321#[derive(Debug, PartialEq, Eq, Clone)]
323struct PeerSnapshotHash {
324 rpc_contact_info: ContactInfo,
325 snapshot_hash: SnapshotHash,
326}
327
328#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
331pub struct SnapshotHash {
332 full: (Slot, Hash),
333 incr: Option<(Slot, Hash)>,
334}
335
336pub fn fail_rpc_node(
337 err: String,
338 known_validators: &Option<HashSet<Pubkey, RandomState>>,
339 rpc_id: &Pubkey,
340 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
341) {
342 warn!("{err}");
343 if let Some(ref known_validators) = known_validators {
344 if known_validators.contains(rpc_id) {
345 return;
346 }
347 }
348
349 info!("Excluding {rpc_id} as a future RPC candidate");
350 blacklisted_rpc_nodes.insert(*rpc_id);
351}
352
353fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
354 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
355 cluster_info.save_contact_info();
356 gossip_exit_flag.store(true, Ordering::Relaxed);
357 gossip_service.join().unwrap();
358}
359
360#[allow(clippy::too_many_arguments)]
361pub fn attempt_download_genesis_and_snapshot(
362 rpc_contact_info: &ContactInfo,
363 ledger_path: &Path,
364 validator_config: &mut ValidatorConfig,
365 bootstrap_config: &RpcBootstrapConfig,
366 use_progress_bar: bool,
367 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
368 rpc_client: &RpcClient,
369 maximum_local_snapshot_age: Slot,
370 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
371 minimal_snapshot_download_speed: f32,
372 maximum_snapshot_download_abort: u64,
373 download_abort_count: &mut u64,
374 snapshot_hash: Option<SnapshotHash>,
375 identity_keypair: &Arc<Keypair>,
376 vote_account: &Pubkey,
377 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
378) -> Result<(), String> {
379 download_then_check_genesis_hash(
380 &rpc_contact_info
381 .rpc()
382 .ok_or_else(|| String::from("Invalid RPC address"))?,
383 ledger_path,
384 &mut validator_config.expected_genesis_hash,
385 bootstrap_config.max_genesis_archive_unpacked_size,
386 bootstrap_config.no_genesis_fetch,
387 use_progress_bar,
388 rpc_client,
389 )?;
390
391 if let Some(gossip) = gossip.take() {
392 shutdown_gossip_service(gossip);
393 }
394
395 let rpc_client_slot = rpc_client
396 .get_slot_with_commitment(CommitmentConfig::finalized())
397 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
398 info!("RPC node root slot: {rpc_client_slot}");
399
400 download_snapshots(
401 validator_config,
402 bootstrap_config,
403 use_progress_bar,
404 maximum_local_snapshot_age,
405 start_progress,
406 minimal_snapshot_download_speed,
407 maximum_snapshot_download_abort,
408 download_abort_count,
409 snapshot_hash,
410 rpc_contact_info,
411 )?;
412
413 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
414 let rpc_client = RpcClient::new(url);
415 check_vote_account(
416 &rpc_client,
417 &identity_keypair.pubkey(),
418 vote_account,
419 &authorized_voter_keypairs
420 .read()
421 .unwrap()
422 .iter()
423 .map(|k| k.pubkey())
424 .collect::<Vec<_>>(),
425 )
426 .unwrap_or_else(|err| {
427 error!("{err}");
434 exit(1);
435 });
436 }
437 Ok(())
438}
439
440fn ping(addr: &SocketAddr) -> Option<Duration> {
442 let start = Instant::now();
443 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
444 Ok(_) => Some(start.elapsed()),
445 Err(_) => None,
446 }
447}
448
449fn get_vetted_rpc_nodes(
453 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
454 cluster_info: &Arc<ClusterInfo>,
455 validator_config: &ValidatorConfig,
456 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
457 bootstrap_config: &RpcBootstrapConfig,
458) {
459 while vetted_rpc_nodes.is_empty() {
460 let rpc_node_details = match get_rpc_nodes(
461 cluster_info,
462 validator_config,
463 blacklisted_rpc_nodes,
464 bootstrap_config,
465 ) {
466 Ok(rpc_node_details) => rpc_node_details,
467 Err(err) => {
468 error!(
469 "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
470 `--no-port-check`, or adjusting `--known-validator ...` arguments as \
471 applicable"
472 );
473 exit(1);
474 }
475 };
476
477 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
478 vetted_rpc_nodes.extend(
479 rpc_node_details
480 .into_par_iter()
481 .filter_map(|rpc_node_details| {
482 let GetRpcNodeResult {
483 rpc_contact_info,
484 snapshot_hash,
485 } = rpc_node_details;
486
487 info!(
488 "Using RPC service from node {}: {:?}",
489 rpc_contact_info.pubkey(),
490 rpc_contact_info.rpc()
491 );
492
493 let rpc_addr = rpc_contact_info.rpc()?;
494 let ping_time = ping(&rpc_addr);
495
496 let rpc_client =
497 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
498
499 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
500 })
501 .filter(
502 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
503 .get_version()
504 {
505 Ok(rpc_version) => {
506 if let Some(ping_time) = ping_time {
507 info!(
508 "RPC node version: {} Ping: {}ms",
509 rpc_version.solana_core,
510 ping_time.as_millis()
511 );
512 true
513 } else {
514 fail_rpc_node(
515 "Failed to ping RPC".to_string(),
516 &validator_config.known_validators,
517 rpc_contact_info.pubkey(),
518 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
519 );
520 false
521 }
522 }
523 Err(err) => {
524 fail_rpc_node(
525 format!("Failed to get RPC node version: {err}"),
526 &validator_config.known_validators,
527 rpc_contact_info.pubkey(),
528 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
529 );
530 false
531 }
532 },
533 )
534 .collect::<Vec<(
535 ContactInfo,
536 Option<SnapshotHash>,
537 RpcClient,
538 Option<Duration>,
539 )>>()
540 .into_iter()
541 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
542 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
543 (rpc_contact_info, snapshot_hash, rpc_client)
544 })
545 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
546 );
547 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
548 }
549}
550
551#[allow(clippy::too_many_arguments)]
552pub fn rpc_bootstrap(
553 node: &Node,
554 identity_keypair: &Arc<Keypair>,
555 ledger_path: &Path,
556 vote_account: &Pubkey,
557 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
558 cluster_entrypoints: &[ContactInfo],
559 validator_config: &mut ValidatorConfig,
560 bootstrap_config: RpcBootstrapConfig,
561 do_port_check: bool,
562 use_progress_bar: bool,
563 maximum_local_snapshot_age: Slot,
564 should_check_duplicate_instance: bool,
565 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
566 minimal_snapshot_download_speed: f32,
567 maximum_snapshot_download_abort: u64,
568 socket_addr_space: SocketAddrSpace,
569) {
570 if do_port_check {
571 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
572 order.shuffle(&mut thread_rng());
573 if order.into_iter().all(|i| {
574 !verify_reachable_ports(
575 node,
576 &cluster_entrypoints[i],
577 validator_config,
578 &socket_addr_space,
579 )
580 }) {
581 exit(1);
582 }
583 }
584
585 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
586 return;
587 }
588
589 let total_snapshot_download_time = Instant::now();
590 let mut get_rpc_nodes_time = Duration::new(0, 0);
591 let mut snapshot_download_time = Duration::new(0, 0);
592 let mut blacklisted_rpc_nodes = HashSet::new();
593 let mut gossip = None;
594 let mut vetted_rpc_nodes = vec![];
595 let mut download_abort_count = 0;
596 loop {
597 if gossip.is_none() {
598 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
599
600 gossip = Some(start_gossip_node(
601 identity_keypair.clone(),
602 cluster_entrypoints,
603 ledger_path,
604 &node
605 .info
606 .gossip()
607 .expect("Operator must spin up node with valid gossip address"),
608 node.sockets.gossip.clone(),
609 validator_config
610 .expected_shred_version
611 .expect("expected_shred_version should not be None"),
612 validator_config.gossip_validators.clone(),
613 should_check_duplicate_instance,
614 socket_addr_space,
615 ));
616 }
617
618 let get_rpc_nodes_start = Instant::now();
619 get_vetted_rpc_nodes(
620 &mut vetted_rpc_nodes,
621 &gossip.as_ref().unwrap().0,
622 validator_config,
623 &mut blacklisted_rpc_nodes,
624 &bootstrap_config,
625 );
626 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
627 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
628
629 let snapshot_download_start = Instant::now();
630 let download_result = attempt_download_genesis_and_snapshot(
631 &rpc_contact_info,
632 ledger_path,
633 validator_config,
634 &bootstrap_config,
635 use_progress_bar,
636 &mut gossip,
637 &rpc_client,
638 maximum_local_snapshot_age,
639 start_progress,
640 minimal_snapshot_download_speed,
641 maximum_snapshot_download_abort,
642 &mut download_abort_count,
643 snapshot_hash,
644 identity_keypair,
645 vote_account,
646 authorized_voter_keypairs.clone(),
647 );
648 snapshot_download_time += snapshot_download_start.elapsed();
649 match download_result {
650 Ok(()) => break,
651 Err(err) => {
652 fail_rpc_node(
653 err,
654 &validator_config.known_validators,
655 rpc_contact_info.pubkey(),
656 &mut blacklisted_rpc_nodes,
657 );
658 }
659 }
660 }
661
662 if let Some(gossip) = gossip.take() {
663 shutdown_gossip_service(gossip);
664 }
665
666 datapoint_info!(
667 "bootstrap-snapshot-download",
668 (
669 "total_time_secs",
670 total_snapshot_download_time.elapsed().as_secs(),
671 i64
672 ),
673 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
674 (
675 "snapshot_download_time_secs",
676 snapshot_download_time.as_secs(),
677 i64
678 ),
679 ("download_abort_count", download_abort_count, i64),
680 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
681 );
682}
683
684fn get_rpc_nodes(
688 cluster_info: &ClusterInfo,
689 validator_config: &ValidatorConfig,
690 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
691 bootstrap_config: &RpcBootstrapConfig,
692) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
693 let mut blacklist_timeout = Instant::now();
694 let mut get_rpc_peers_timout = Instant::now();
695 let mut newer_cluster_snapshot_timeout = None;
696 let mut retry_reason = None;
697 loop {
698 std::thread::sleep(Duration::from_secs(1));
700 info!("\n{}", cluster_info.rpc_info_trace());
701
702 let rpc_peers = get_rpc_peers(
703 cluster_info,
704 validator_config,
705 blacklisted_rpc_nodes,
706 &blacklist_timeout,
707 &mut retry_reason,
708 bootstrap_config,
709 );
710 if rpc_peers.is_empty() {
711 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
712 return Err(GetRpcNodeError::NoRpcPeersFound);
713 }
714 continue;
715 }
716
717 blacklist_timeout = Instant::now();
719 get_rpc_peers_timout = Instant::now();
720 if bootstrap_config.no_snapshot_fetch {
721 let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
722 return Ok(vec![GetRpcNodeResult {
723 rpc_contact_info: random_peer.clone(),
724 snapshot_hash: None,
725 }]);
726 }
727
728 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
729 .as_ref()
730 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
731 .unwrap_or(true)
732 {
733 KnownValidatorsToWaitFor::All
734 } else {
735 KnownValidatorsToWaitFor::Any
736 };
737 let peer_snapshot_hashes = get_peer_snapshot_hashes(
738 cluster_info,
739 &rpc_peers,
740 validator_config.known_validators.as_ref(),
741 known_validators_to_wait_for,
742 bootstrap_config.incremental_snapshot_fetch,
743 );
744 if peer_snapshot_hashes.is_empty() {
745 match newer_cluster_snapshot_timeout {
746 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
747 Some(newer_cluster_snapshot_timeout) => {
748 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
749 return Err(GetRpcNodeError::NoNewerSnapshots);
750 }
751 }
752 }
753 retry_reason = Some("No snapshots available".to_owned());
754 continue;
755 } else {
756 let rpc_peers = peer_snapshot_hashes
757 .iter()
758 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
759 .collect::<Vec<_>>();
760 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
761 info!(
762 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
763 final_snapshot_hash
764 .incr
765 .map(|(slot, _hash)| slot)
766 .unwrap_or(final_snapshot_hash.full.0),
767 rpc_peers.len(),
768 if rpc_peers.len() > 1 { "s" } else { "" },
769 rpc_peers,
770 );
771 let rpc_node_results = peer_snapshot_hashes
772 .iter()
773 .map(|peer_snapshot_hash| GetRpcNodeResult {
774 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
775 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
776 })
777 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
778 .collect();
779 return Ok(rpc_node_results);
780 }
781 }
782}
783
784fn get_highest_local_snapshot_hash(
787 full_snapshot_archives_dir: impl AsRef<Path>,
788 incremental_snapshot_archives_dir: impl AsRef<Path>,
789 incremental_snapshot_fetch: bool,
790) -> Option<(Slot, Hash)> {
791 snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
792 .and_then(|full_snapshot_info| {
793 if incremental_snapshot_fetch {
794 snapshot_paths::get_highest_incremental_snapshot_archive_info(
795 incremental_snapshot_archives_dir,
796 full_snapshot_info.slot(),
797 )
798 .map(|incremental_snapshot_info| {
799 (
800 incremental_snapshot_info.slot(),
801 *incremental_snapshot_info.hash(),
802 )
803 })
804 } else {
805 None
806 }
807 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
808 })
809 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
810}
811
812fn get_peer_snapshot_hashes(
819 cluster_info: &ClusterInfo,
820 rpc_peers: &[ContactInfo],
821 known_validators: Option<&HashSet<Pubkey>>,
822 known_validators_to_wait_for: KnownValidatorsToWaitFor,
823 incremental_snapshot_fetch: bool,
824) -> Vec<PeerSnapshotHash> {
825 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
826 if let Some(known_validators) = known_validators {
827 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
828 cluster_info,
829 known_validators,
830 known_validators_to_wait_for,
831 );
832 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
833 &known_snapshot_hashes,
834 &mut peer_snapshot_hashes,
835 );
836 }
837 if incremental_snapshot_fetch {
838 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
846 &mut peer_snapshot_hashes,
847 );
848 }
849 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
850
851 peer_snapshot_hashes
852}
853
854type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
857
858fn get_snapshot_hashes_from_known_validators(
869 cluster_info: &ClusterInfo,
870 known_validators: &HashSet<Pubkey>,
871 known_validators_to_wait_for: KnownValidatorsToWaitFor,
872) -> KnownSnapshotHashes {
873 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
875
876 if !do_known_validators_have_all_snapshot_hashes(
877 known_validators,
878 known_validators_to_wait_for,
879 get_snapshot_hashes_for_node,
880 ) {
881 debug!(
882 "Snapshot hashes have not been discovered from known validators. This likely means \
883 the gossip tables are not fully populated. We will sleep and retry..."
884 );
885 return KnownSnapshotHashes::default();
886 }
887
888 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
889}
890
891fn do_known_validators_have_all_snapshot_hashes<'a>(
900 known_validators: impl IntoIterator<Item = &'a Pubkey>,
901 known_validators_to_wait_for: KnownValidatorsToWaitFor,
902 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
903) -> bool {
904 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
905
906 match known_validators_to_wait_for {
907 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
908 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
909 }
910}
911
912#[derive(Debug, Copy, Clone, Eq, PartialEq)]
915enum KnownValidatorsToWaitFor {
916 All,
917 Any,
918}
919
920fn build_known_snapshot_hashes<'a>(
926 nodes: impl IntoIterator<Item = &'a Pubkey>,
927 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
928) -> KnownSnapshotHashes {
929 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
930
931 fn is_any_same_slot_and_different_hash<'a>(
934 needle: &(Slot, Hash),
935 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
936 ) -> bool {
937 haystack
938 .into_iter()
939 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
940 }
941
942 'to_next_node: for node in nodes {
943 let Some(SnapshotHash {
944 full: full_snapshot_hash,
945 incr: incremental_snapshot_hash,
946 }) = get_snapshot_hashes_for_node(node)
947 else {
948 continue 'to_next_node;
949 };
950
951 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
956 warn!(
957 "Ignoring all snapshot hashes from node {node} since we've seen a different full \
958 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
959 );
960 debug!(
961 "known full snapshot hashes: {:#?}",
962 known_snapshot_hashes.keys(),
963 );
964 continue 'to_next_node;
965 }
966
967 let known_incremental_snapshot_hashes =
971 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
972
973 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
974 if is_any_same_slot_and_different_hash(
979 &incremental_snapshot_hash,
980 known_incremental_snapshot_hashes.iter(),
981 ) {
982 warn!(
983 "Ignoring incremental snapshot hash from node {node} since we've seen a \
984 different incremental snapshot hash with this slot. full snapshot hash: \
985 {full_snapshot_hash:?}, incremental snapshot hash: \
986 {incremental_snapshot_hash:?}"
987 );
988 debug!(
989 "known incremental snapshot hashes based on this slot: {:#?}",
990 known_incremental_snapshot_hashes.iter(),
991 );
992 continue 'to_next_node;
993 }
994
995 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
996 };
997 }
998
999 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1000 known_snapshot_hashes
1001}
1002
1003fn get_eligible_peer_snapshot_hashes(
1009 cluster_info: &ClusterInfo,
1010 rpc_peers: &[ContactInfo],
1011) -> Vec<PeerSnapshotHash> {
1012 let peer_snapshot_hashes = rpc_peers
1013 .iter()
1014 .flat_map(|rpc_peer| {
1015 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1016 PeerSnapshotHash {
1017 rpc_contact_info: rpc_peer.clone(),
1018 snapshot_hash,
1019 }
1020 })
1021 })
1022 .collect();
1023
1024 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1025 peer_snapshot_hashes
1026}
1027
1028fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1030 known_snapshot_hashes: &KnownSnapshotHashes,
1031 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1032) {
1033 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1034 known_snapshot_hashes
1035 .get(&peer_snapshot_hash.snapshot_hash.full)
1036 .map(|known_incremental_hashes| {
1037 if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1038 true
1041 } else {
1042 known_incremental_hashes
1043 .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1044 }
1045 })
1046 .unwrap_or(false)
1047 });
1048
1049 trace!(
1050 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1051 );
1052}
1053
1054fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1056 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1057) {
1058 let highest_full_snapshot_hash = peer_snapshot_hashes
1059 .iter()
1060 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1061 .max_by_key(|(slot, _hash)| *slot);
1062 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1063 return;
1067 };
1068
1069 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1070 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1071 });
1072
1073 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1074}
1075
1076fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1078 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1079) {
1080 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1081 .iter()
1082 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1083 .max_by_key(|(slot, _hash)| *slot);
1084
1085 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1086 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1087 });
1088
1089 trace!(
1090 "retain peer snapshot hashes with highest incremental snapshot slot: \
1091 {peer_snapshot_hashes:?}"
1092 );
1093}
1094
1095#[allow(clippy::too_many_arguments)]
1097fn download_snapshots(
1098 validator_config: &ValidatorConfig,
1099 bootstrap_config: &RpcBootstrapConfig,
1100 use_progress_bar: bool,
1101 maximum_local_snapshot_age: Slot,
1102 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1103 minimal_snapshot_download_speed: f32,
1104 maximum_snapshot_download_abort: u64,
1105 download_abort_count: &mut u64,
1106 snapshot_hash: Option<SnapshotHash>,
1107 rpc_contact_info: &ContactInfo,
1108) -> Result<(), String> {
1109 if snapshot_hash.is_none() {
1110 return Ok(());
1111 }
1112 let SnapshotHash {
1113 full: full_snapshot_hash,
1114 incr: incremental_snapshot_hash,
1115 } = snapshot_hash.unwrap();
1116 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1117 let incremental_snapshot_archives_dir = &validator_config
1118 .snapshot_config
1119 .incremental_snapshot_archives_dir;
1120
1121 if should_use_local_snapshot(
1123 full_snapshot_archives_dir,
1124 incremental_snapshot_archives_dir,
1125 maximum_local_snapshot_age,
1126 full_snapshot_hash,
1127 incremental_snapshot_hash,
1128 bootstrap_config.incremental_snapshot_fetch,
1129 ) {
1130 return Ok(());
1131 }
1132
1133 if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
1135 .into_iter()
1136 .any(|snapshot_archive| {
1137 snapshot_archive.slot() == full_snapshot_hash.0
1138 && snapshot_archive.hash().0 == full_snapshot_hash.1
1139 })
1140 {
1141 info!(
1142 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1143 full_snapshot_hash.0, full_snapshot_hash.1
1144 );
1145 } else {
1146 download_snapshot(
1147 validator_config,
1148 bootstrap_config,
1149 use_progress_bar,
1150 start_progress,
1151 minimal_snapshot_download_speed,
1152 maximum_snapshot_download_abort,
1153 download_abort_count,
1154 rpc_contact_info,
1155 full_snapshot_hash,
1156 SnapshotKind::FullSnapshot,
1157 )?;
1158 }
1159
1160 if bootstrap_config.incremental_snapshot_fetch {
1161 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1163 if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1164 .into_iter()
1165 .any(|snapshot_archive| {
1166 snapshot_archive.slot() == incremental_snapshot_hash.0
1167 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1168 && snapshot_archive.base_slot() == full_snapshot_hash.0
1169 })
1170 {
1171 info!(
1172 "Incremental snapshot archive already exists locally. Skipping download. \
1173 slot: {}, hash: {}",
1174 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1175 );
1176 } else {
1177 download_snapshot(
1178 validator_config,
1179 bootstrap_config,
1180 use_progress_bar,
1181 start_progress,
1182 minimal_snapshot_download_speed,
1183 maximum_snapshot_download_abort,
1184 download_abort_count,
1185 rpc_contact_info,
1186 incremental_snapshot_hash,
1187 SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1188 )?;
1189 }
1190 }
1191 }
1192
1193 Ok(())
1194}
1195
1196#[allow(clippy::too_many_arguments)]
1198fn download_snapshot(
1199 validator_config: &ValidatorConfig,
1200 bootstrap_config: &RpcBootstrapConfig,
1201 use_progress_bar: bool,
1202 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1203 minimal_snapshot_download_speed: f32,
1204 maximum_snapshot_download_abort: u64,
1205 download_abort_count: &mut u64,
1206 rpc_contact_info: &ContactInfo,
1207 desired_snapshot_hash: (Slot, Hash),
1208 snapshot_kind: SnapshotKind,
1209) -> Result<(), String> {
1210 let maximum_full_snapshot_archives_to_retain = validator_config
1211 .snapshot_config
1212 .maximum_full_snapshot_archives_to_retain;
1213 let maximum_incremental_snapshot_archives_to_retain = validator_config
1214 .snapshot_config
1215 .maximum_incremental_snapshot_archives_to_retain;
1216 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1217 let incremental_snapshot_archives_dir = &validator_config
1218 .snapshot_config
1219 .incremental_snapshot_archives_dir;
1220
1221 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1222 slot: desired_snapshot_hash.0,
1223 rpc_addr: rpc_contact_info
1224 .rpc()
1225 .ok_or_else(|| String::from("Invalid RPC address"))?,
1226 };
1227 let desired_snapshot_hash = (
1228 desired_snapshot_hash.0,
1229 agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1230 );
1231 download_snapshot_archive(
1232 &rpc_contact_info
1233 .rpc()
1234 .ok_or_else(|| String::from("Invalid RPC address"))?,
1235 full_snapshot_archives_dir,
1236 incremental_snapshot_archives_dir,
1237 desired_snapshot_hash,
1238 snapshot_kind,
1239 maximum_full_snapshot_archives_to_retain,
1240 maximum_incremental_snapshot_archives_to_retain,
1241 use_progress_bar,
1242 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1243 debug!("Download progress: {download_progress:?}");
1244 if download_progress.last_throughput < minimal_snapshot_download_speed
1245 && download_progress.notification_count <= 1
1246 && download_progress.percentage_done <= 2_f32
1247 && download_progress.estimated_remaining_time > 60_f32
1248 && *download_abort_count < maximum_snapshot_download_abort
1249 {
1250 if let Some(ref known_validators) = validator_config.known_validators {
1251 if known_validators.contains(rpc_contact_info.pubkey())
1252 && known_validators.len() == 1
1253 && bootstrap_config.only_known_rpc
1254 {
1255 warn!(
1256 "The snapshot download is too slow, throughput: {} < min speed {} \
1257 bytes/sec, but will NOT abort and try a different node as it is the \
1258 only known validator and the --only-known-rpc flag is set. Abort \
1259 count: {}, Progress detail: {:?}",
1260 download_progress.last_throughput,
1261 minimal_snapshot_download_speed,
1262 download_abort_count,
1263 download_progress,
1264 );
1265 return true; }
1267 }
1268 warn!(
1269 "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1270 will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1271 download_progress.last_throughput,
1272 minimal_snapshot_download_speed,
1273 download_abort_count,
1274 download_progress,
1275 );
1276 *download_abort_count += 1;
1277 false
1278 } else {
1279 true
1280 }
1281 })),
1282 )
1283}
1284
1285fn should_use_local_snapshot(
1288 full_snapshot_archives_dir: &Path,
1289 incremental_snapshot_archives_dir: &Path,
1290 maximum_local_snapshot_age: Slot,
1291 full_snapshot_hash: (Slot, Hash),
1292 incremental_snapshot_hash: Option<(Slot, Hash)>,
1293 incremental_snapshot_fetch: bool,
1294) -> bool {
1295 let cluster_snapshot_slot = incremental_snapshot_hash
1296 .map(|(slot, _)| slot)
1297 .unwrap_or(full_snapshot_hash.0);
1298
1299 match get_highest_local_snapshot_hash(
1300 full_snapshot_archives_dir,
1301 incremental_snapshot_archives_dir,
1302 incremental_snapshot_fetch,
1303 ) {
1304 None => {
1305 info!(
1306 "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1307 local snapshot."
1308 );
1309 false
1310 }
1311 Some((local_snapshot_slot, _)) => {
1312 if local_snapshot_slot
1313 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1314 {
1315 info!(
1316 "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1317 a snapshot for slot {cluster_snapshot_slot}."
1318 );
1319 true
1320 } else {
1321 info!(
1322 "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1323 newer snapshot for slot {cluster_snapshot_slot}."
1324 );
1325 false
1326 }
1327 }
1328 }
1329}
1330
1331fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1333 cluster_info.get_snapshot_hashes_for_node(node).map(
1334 |crds_data::SnapshotHashes {
1335 full, incremental, ..
1336 }| {
1337 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1338 SnapshotHash {
1339 full,
1340 incr: highest_incremental_snapshot_hash,
1341 }
1342 },
1343 )
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348 use super::*;
1349
1350 impl PeerSnapshotHash {
1351 fn new(
1352 rpc_contact_info: ContactInfo,
1353 full_snapshot_hash: (Slot, Hash),
1354 incremental_snapshot_hash: Option<(Slot, Hash)>,
1355 ) -> Self {
1356 Self {
1357 rpc_contact_info,
1358 snapshot_hash: SnapshotHash {
1359 full: full_snapshot_hash,
1360 incr: incremental_snapshot_hash,
1361 },
1362 }
1363 }
1364 }
1365
1366 fn default_contact_info_for_tests() -> ContactInfo {
1367 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1368 }
1369
1370 #[test]
1371 fn test_build_known_snapshot_hashes() {
1372 agave_logger::setup();
1373 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1374 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1375
1376 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1377 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1378
1379 let oracle = {
1381 let mut oracle = HashMap::new();
1382
1383 for (full, incr) in [
1384 (full_snapshot_hash1, None),
1386 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1388 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1390 (full_snapshot_hash2, None),
1392 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1393 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1394 ] {
1395 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1397 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1398 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1399 }
1400
1401 oracle.insert(Pubkey::new_unique(), None);
1403 oracle.insert(Pubkey::new_unique(), None);
1404 oracle.insert(Pubkey::new_unique(), None);
1405
1406 oracle
1407 };
1408
1409 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1410
1411 let known_snapshot_hashes =
1412 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1413
1414 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1417 assert_eq!(known_full_snapshot_hashes.len(), 1);
1418 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1419
1420 let known_incremental_snapshot_hashes =
1422 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1423 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1424 let known_incremental_snapshot_hash =
1425 known_incremental_snapshot_hashes.iter().next().unwrap();
1426
1427 assert!(
1434 known_full_snapshot_hash == &full_snapshot_hash1
1435 || known_full_snapshot_hash == &full_snapshot_hash2
1436 );
1437 assert!(
1438 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1439 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1440 );
1441 }
1442
1443 #[test]
1444 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1445 let known_snapshot_hashes: KnownSnapshotHashes = [
1446 (
1447 (200_000, Hash::new_unique()),
1448 [
1449 (200_200, Hash::new_unique()),
1450 (200_400, Hash::new_unique()),
1451 (200_600, Hash::new_unique()),
1452 (200_800, Hash::new_unique()),
1453 ]
1454 .iter()
1455 .cloned()
1456 .collect(),
1457 ),
1458 (
1459 (300_000, Hash::new_unique()),
1460 [
1461 (300_200, Hash::new_unique()),
1462 (300_400, Hash::new_unique()),
1463 (300_600, Hash::new_unique()),
1464 ]
1465 .iter()
1466 .cloned()
1467 .collect(),
1468 ),
1469 ]
1470 .iter()
1471 .cloned()
1472 .collect();
1473
1474 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1475 let known_full_snapshot_hash = known_snapshot_hash.0;
1476 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1477
1478 let contact_info = default_contact_info_for_tests();
1479 let peer_snapshot_hashes = vec![
1480 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1482 PeerSnapshotHash::new(
1484 contact_info.clone(),
1485 (111_000, Hash::default()),
1486 Some((111_111, Hash::default())),
1487 ),
1488 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1490 PeerSnapshotHash::new(
1492 contact_info.clone(),
1493 (111_000, Hash::default()),
1494 Some(*known_incremental_snapshot_hash),
1495 ),
1496 PeerSnapshotHash::new(
1498 contact_info.clone(),
1499 *known_full_snapshot_hash,
1500 Some((111_111, Hash::default())),
1501 ),
1502 PeerSnapshotHash::new(
1504 contact_info.clone(),
1505 *known_full_snapshot_hash,
1506 Some(*known_incremental_snapshot_hash),
1507 ),
1508 ];
1509
1510 let expected = vec![
1511 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1512 PeerSnapshotHash::new(
1513 contact_info,
1514 *known_full_snapshot_hash,
1515 Some(*known_incremental_snapshot_hash),
1516 ),
1517 ];
1518 let mut actual = peer_snapshot_hashes;
1519 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1520 &known_snapshot_hashes,
1521 &mut actual,
1522 );
1523 assert_eq!(expected, actual);
1524 }
1525
1526 #[test]
1527 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1528 let contact_info = default_contact_info_for_tests();
1529 let peer_snapshot_hashes = vec![
1530 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1532 PeerSnapshotHash::new(
1533 contact_info.clone(),
1534 (100_000, Hash::default()),
1535 Some((100_100, Hash::default())),
1536 ),
1537 PeerSnapshotHash::new(
1538 contact_info.clone(),
1539 (100_000, Hash::default()),
1540 Some((100_200, Hash::default())),
1541 ),
1542 PeerSnapshotHash::new(
1543 contact_info.clone(),
1544 (100_000, Hash::default()),
1545 Some((100_300, Hash::default())),
1546 ),
1547 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1549 PeerSnapshotHash::new(
1550 contact_info.clone(),
1551 (200_000, Hash::default()),
1552 Some((200_100, Hash::default())),
1553 ),
1554 PeerSnapshotHash::new(
1555 contact_info.clone(),
1556 (200_000, Hash::default()),
1557 Some((200_200, Hash::default())),
1558 ),
1559 PeerSnapshotHash::new(
1560 contact_info.clone(),
1561 (200_000, Hash::default()),
1562 Some((200_300, Hash::default())),
1563 ),
1564 ];
1565
1566 let expected = vec![
1567 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1568 PeerSnapshotHash::new(
1569 contact_info.clone(),
1570 (200_000, Hash::default()),
1571 Some((200_100, Hash::default())),
1572 ),
1573 PeerSnapshotHash::new(
1574 contact_info.clone(),
1575 (200_000, Hash::default()),
1576 Some((200_200, Hash::default())),
1577 ),
1578 PeerSnapshotHash::new(
1579 contact_info,
1580 (200_000, Hash::default()),
1581 Some((200_300, Hash::default())),
1582 ),
1583 ];
1584 let mut actual = peer_snapshot_hashes;
1585 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1586 assert_eq!(expected, actual);
1587 }
1588
1589 #[test]
1590 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1591 let contact_info = default_contact_info_for_tests();
1592 let peer_snapshot_hashes = vec![
1593 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1594 PeerSnapshotHash::new(
1595 contact_info.clone(),
1596 (200_000, Hash::default()),
1597 Some((200_100, Hash::default())),
1598 ),
1599 PeerSnapshotHash::new(
1600 contact_info.clone(),
1601 (200_000, Hash::default()),
1602 Some((200_200, Hash::default())),
1603 ),
1604 PeerSnapshotHash::new(
1605 contact_info.clone(),
1606 (200_000, Hash::default()),
1607 Some((200_300, Hash::default())),
1608 ),
1609 PeerSnapshotHash::new(
1610 contact_info.clone(),
1611 (200_000, Hash::default()),
1612 Some((200_010, Hash::default())),
1613 ),
1614 PeerSnapshotHash::new(
1615 contact_info.clone(),
1616 (200_000, Hash::default()),
1617 Some((200_020, Hash::default())),
1618 ),
1619 PeerSnapshotHash::new(
1620 contact_info.clone(),
1621 (200_000, Hash::default()),
1622 Some((200_030, Hash::default())),
1623 ),
1624 ];
1625
1626 let expected = vec![PeerSnapshotHash::new(
1627 contact_info,
1628 (200_000, Hash::default()),
1629 Some((200_300, Hash::default())),
1630 )];
1631 let mut actual = peer_snapshot_hashes;
1632 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1633 assert_eq!(expected, actual);
1634 }
1635
1636 #[test]
1639 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1640 let contact_info = default_contact_info_for_tests();
1641 let peer_snapshot_hashes = vec![
1642 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1643 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1644 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1645 ];
1646
1647 let expected = peer_snapshot_hashes.clone();
1648 let mut actual = peer_snapshot_hashes;
1649 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1650 assert_eq!(expected, actual);
1651 }
1652
1653 #[test]
1656 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1657 {
1658 let mut actual = vec![];
1659 let expected = actual.clone();
1660 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1661 assert_eq!(expected, actual);
1662 }
1663 {
1664 let mut actual = vec![];
1665 let expected = actual.clone();
1666 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1667 assert_eq!(expected, actual);
1668 }
1669 }
1670}