1use {
2 agave_snapshots::{
3 SnapshotArchiveKind, paths as snapshot_paths,
4 snapshot_archive_info::SnapshotArchiveInfoGetter as _,
5 },
6 itertools::Itertools,
7 log::*,
8 rand::{Rng, rng, seq::SliceRandom},
9 rayon::prelude::*,
10 solana_account::ReadableAccount,
11 solana_clock::Slot,
12 solana_commitment_config::CommitmentConfig,
13 solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
14 solana_download_utils::{DownloadProgressRecord, download_snapshot_archive},
15 solana_genesis_utils::download_then_check_genesis_hash,
16 solana_gossip::{
17 cluster_info::ClusterInfo,
18 contact_info::{ContactInfo, Protocol},
19 crds_data,
20 gossip_service::GossipService,
21 node::Node,
22 },
23 solana_hash::Hash,
24 solana_keypair::Keypair,
25 solana_metrics::datapoint_info,
26 solana_net_utils::SocketAddrSpace,
27 solana_pubkey::Pubkey,
28 solana_rpc_client::rpc_client::RpcClient,
29 solana_signer::Signer,
30 solana_vote_program::vote_state::VoteStateV4,
31 std::{
32 collections::{HashMap, HashSet, hash_map::RandomState},
33 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
34 path::Path,
35 process::exit,
36 sync::{
37 Arc, RwLock,
38 atomic::{AtomicBool, Ordering},
39 },
40 time::{Duration, Instant},
41 },
42 thiserror::Error,
43};
44
45const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
49const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
52const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
55const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
57
58pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
59
60pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
61
62#[derive(Debug, PartialEq, Clone)]
63pub struct RpcBootstrapConfig {
64 pub no_genesis_fetch: bool,
65 pub no_snapshot_fetch: bool,
66 pub only_known_rpc: bool,
67 pub max_genesis_archive_unpacked_size: u64,
68 pub check_vote_account: Option<String>,
69 pub incremental_snapshot_fetch: bool,
70}
71
72fn verify_reachable_ports(
73 node: &Node,
74 cluster_entrypoint: &ContactInfo,
75 validator_config: &ValidatorConfig,
76 socket_addr_space: &SocketAddrSpace,
77) -> bool {
78 let verify_address = |addr: &Option<SocketAddr>| -> bool {
79 addr.as_ref()
80 .map(|addr| socket_addr_space.check(addr))
81 .unwrap_or_default()
82 };
83
84 let mut udp_sockets = vec![&node.sockets.repair];
85 udp_sockets.extend(node.sockets.gossip.iter());
86
87 if verify_address(&node.info.serve_repair(Protocol::UDP)) {
88 udp_sockets.push(&node.sockets.serve_repair);
89 }
90 if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
91 udp_sockets.extend(node.sockets.tpu_vote.iter());
92 }
93 if verify_address(&node.info.tvu(Protocol::UDP)) {
94 udp_sockets.extend(node.sockets.tvu.iter());
95 udp_sockets.extend(node.sockets.broadcast.iter());
96 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
97 }
98 if !solana_net_utils::verify_all_reachable_udp(
99 &cluster_entrypoint.gossip().unwrap(),
100 &udp_sockets,
101 ) {
102 return false;
103 }
104
105 let mut tcp_listeners = vec![];
106 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
107 for (purpose, bind_addr, public_addr) in &[
108 ("RPC", rpc_addr, node.info.rpc()),
109 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
110 ] {
111 if verify_address(public_addr) {
112 tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
113 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
114 exit(1);
115 }));
116 }
117 }
118 }
119
120 if let Some(ip_echo) = &node.sockets.ip_echo {
121 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
122 tcp_listeners.push(ip_echo);
123 }
124
125 solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
126}
127
128fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
129 if let Some(known_validators) = known_validators {
130 known_validators.contains(id)
131 } else {
132 false
133 }
134}
135
136#[allow(clippy::too_many_arguments)]
137fn start_gossip_node(
138 identity_keypair: Arc<Keypair>,
139 cluster_entrypoints: &[ContactInfo],
140 known_validators: Option<HashSet<Pubkey>>,
141 ledger_path: &Path,
142 gossip_addr: &SocketAddr,
143 gossip_sockets: Arc<[UdpSocket]>,
144 expected_shred_version: u16,
145 gossip_validators: Option<HashSet<Pubkey>>,
146 should_check_duplicate_instance: bool,
147 socket_addr_space: SocketAddrSpace,
148) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
149 let contact_info = ClusterInfo::gossip_contact_info(
150 identity_keypair.pubkey(),
151 *gossip_addr,
152 expected_shred_version,
153 );
154 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
155 if let Some(known_validators) = known_validators {
156 cluster_info
157 .set_trim_keep_pubkeys(known_validators)
158 .expect("set_trim_keep_pubkeys should succeed as ClusterInfo was just created");
159 }
160 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
161 cluster_info.restore_contact_info(ledger_path, 0);
162 let cluster_info = Arc::new(cluster_info);
163
164 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
165 let gossip_service = GossipService::new(
166 &cluster_info,
167 None,
168 gossip_sockets,
169 gossip_validators,
170 should_check_duplicate_instance,
171 None,
172 gossip_exit_flag.clone(),
173 );
174 (cluster_info, gossip_exit_flag, gossip_service)
175}
176
177fn get_rpc_peers(
178 cluster_info: &ClusterInfo,
179 validator_config: &ValidatorConfig,
180 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
181 blacklist_timeout: &Instant,
182 retry_reason: &mut Option<String>,
183 bootstrap_config: &RpcBootstrapConfig,
184) -> Vec<ContactInfo> {
185 let shred_version = validator_config
186 .expected_shred_version
187 .unwrap_or_else(|| cluster_info.my_shred_version());
188
189 info!(
190 "Searching for an RPC service with shred version {shred_version}{}...",
191 retry_reason
192 .as_ref()
193 .map(|s| format!(" (Retrying: {s})"))
194 .unwrap_or_default()
195 );
196
197 let mut rpc_peers = cluster_info.rpc_peers();
198 if bootstrap_config.only_known_rpc {
199 rpc_peers.retain(|rpc_peer| {
200 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
201 });
202 }
203
204 let rpc_peers_total = rpc_peers.len();
205
206 let rpc_peers: Vec<_> = rpc_peers
208 .into_iter()
209 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
210 .collect();
211 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
212 let rpc_known_peers = rpc_peers
213 .iter()
214 .filter(|rpc_peer| {
215 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
216 })
217 .count();
218
219 info!(
220 "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
221 {rpc_peers_blacklisted} blacklisted"
222 );
223
224 if rpc_peers_blacklisted == rpc_peers_total {
225 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
226 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
227 {
228 blacklisted_rpc_nodes.clear();
231 Some("Blacklist timeout expired".to_owned())
232 } else {
233 Some("Wait for known rpc peers".to_owned())
234 };
235 return vec![];
236 }
237 rpc_peers
238}
239
240fn check_vote_account(
241 rpc_client: &RpcClient,
242 identity_pubkey: &Pubkey,
243 vote_account_address: &Pubkey,
244 authorized_voter_pubkeys: &[Pubkey],
245) -> Result<(), String> {
246 let vote_account = rpc_client
247 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
248 .map_err(|err| format!("failed to fetch vote account: {err}"))?
249 .value
250 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
251
252 if vote_account.owner != solana_vote_program::id() {
253 return Err(format!(
254 "not a vote account (owned by {}): {}",
255 vote_account.owner, vote_account_address
256 ));
257 }
258
259 let identity_account = rpc_client
260 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
261 .map_err(|err| format!("failed to fetch identity account: {err}"))?
262 .value
263 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
264
265 let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
266 if let Some(vote_state) = vote_state {
267 if vote_state.authorized_voters.is_empty() {
268 return Err("Vote account not yet initialized".to_string());
269 }
270
271 if vote_state.node_pubkey != *identity_pubkey {
272 return Err(format!(
273 "vote account's identity ({}) does not match the validator's identity {}).",
274 vote_state.node_pubkey, identity_pubkey
275 ));
276 }
277
278 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
279 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
280 return Err(format!(
281 "authorized voter {vote_account_authorized_voter_pubkey} not available"
282 ));
283 }
284 }
285 } else {
286 return Err(format!(
287 "invalid vote account data for {vote_account_address}"
288 ));
289 }
290
291 if identity_account.lamports <= 1 {
293 return Err(format!(
294 "underfunded identity account ({}): only {} lamports available",
295 identity_pubkey, identity_account.lamports
296 ));
297 }
298
299 Ok(())
300}
301
302#[derive(Error, Debug)]
303pub enum GetRpcNodeError {
304 #[error("Unable to find any RPC peers")]
305 NoRpcPeersFound,
306
307 #[error("Giving up, did not get newer snapshots from the cluster")]
308 NoNewerSnapshots,
309}
310
311#[derive(Debug)]
315struct GetRpcNodeResult {
316 rpc_contact_info: ContactInfo,
317 snapshot_hash: Option<SnapshotHash>,
318}
319
320#[derive(Debug, PartialEq, Eq, Clone)]
322struct PeerSnapshotHash {
323 rpc_contact_info: ContactInfo,
324 snapshot_hash: SnapshotHash,
325}
326
327#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
330pub struct SnapshotHash {
331 full: (Slot, Hash),
332 incr: Option<(Slot, Hash)>,
333}
334
335pub fn fail_rpc_node(
336 err: String,
337 known_validators: &Option<HashSet<Pubkey, RandomState>>,
338 rpc_id: &Pubkey,
339 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
340) {
341 warn!("{err}");
342 if let Some(known_validators) = known_validators {
343 if known_validators.contains(rpc_id) {
344 return;
345 }
346 }
347
348 info!("Excluding {rpc_id} as a future RPC candidate");
349 blacklisted_rpc_nodes.insert(*rpc_id);
350}
351
352fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
353 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
354 cluster_info.save_contact_info();
355 gossip_exit_flag.store(true, Ordering::Relaxed);
356 gossip_service.join().unwrap();
357}
358
359#[allow(clippy::too_many_arguments)]
360pub fn attempt_download_genesis_and_snapshot(
361 rpc_contact_info: &ContactInfo,
362 ledger_path: &Path,
363 validator_config: &mut ValidatorConfig,
364 bootstrap_config: &RpcBootstrapConfig,
365 use_progress_bar: bool,
366 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
367 rpc_client: &RpcClient,
368 maximum_local_snapshot_age: Slot,
369 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
370 minimal_snapshot_download_speed: f32,
371 maximum_snapshot_download_abort: u64,
372 download_abort_count: &mut u64,
373 snapshot_hash: Option<SnapshotHash>,
374 identity_keypair: &Arc<Keypair>,
375 vote_account: &Pubkey,
376 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
377) -> Result<(), String> {
378 download_then_check_genesis_hash(
379 &rpc_contact_info
380 .rpc()
381 .ok_or_else(|| String::from("Invalid RPC address"))?,
382 ledger_path,
383 &mut validator_config.expected_genesis_hash,
384 bootstrap_config.max_genesis_archive_unpacked_size,
385 bootstrap_config.no_genesis_fetch,
386 use_progress_bar,
387 rpc_client,
388 )?;
389
390 if let Some(gossip) = gossip.take() {
391 shutdown_gossip_service(gossip);
392 }
393
394 let rpc_client_slot = rpc_client
395 .get_slot_with_commitment(CommitmentConfig::finalized())
396 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
397 info!("RPC node root slot: {rpc_client_slot}");
398
399 download_snapshots(
400 validator_config,
401 bootstrap_config,
402 use_progress_bar,
403 maximum_local_snapshot_age,
404 start_progress,
405 minimal_snapshot_download_speed,
406 maximum_snapshot_download_abort,
407 download_abort_count,
408 snapshot_hash,
409 rpc_contact_info,
410 )?;
411
412 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
413 let rpc_client = RpcClient::new(url);
414 check_vote_account(
415 &rpc_client,
416 &identity_keypair.pubkey(),
417 vote_account,
418 &authorized_voter_keypairs
419 .read()
420 .unwrap()
421 .iter()
422 .map(|k| k.pubkey())
423 .collect::<Vec<_>>(),
424 )
425 .unwrap_or_else(|err| {
426 error!("{err}");
433 exit(1);
434 });
435 }
436 Ok(())
437}
438
439fn ping(addr: &SocketAddr) -> Option<Duration> {
441 let start = Instant::now();
442 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
443 Ok(_) => Some(start.elapsed()),
444 Err(_) => None,
445 }
446}
447
448fn get_vetted_rpc_nodes(
452 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
453 cluster_info: &Arc<ClusterInfo>,
454 validator_config: &ValidatorConfig,
455 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
456 bootstrap_config: &RpcBootstrapConfig,
457) {
458 while vetted_rpc_nodes.is_empty() {
459 let rpc_node_details = match get_rpc_nodes(
460 cluster_info,
461 validator_config,
462 blacklisted_rpc_nodes,
463 bootstrap_config,
464 ) {
465 Ok(rpc_node_details) => rpc_node_details,
466 Err(err) => {
467 error!(
468 "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
469 `--no-port-check`, or adjusting `--known-validator ...` arguments as \
470 applicable"
471 );
472 exit(1);
473 }
474 };
475
476 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
477 vetted_rpc_nodes.extend(
478 rpc_node_details
479 .into_par_iter()
480 .filter_map(|rpc_node_details| {
481 let GetRpcNodeResult {
482 rpc_contact_info,
483 snapshot_hash,
484 } = rpc_node_details;
485
486 info!(
487 "Using RPC service from node {}: {:?}",
488 rpc_contact_info.pubkey(),
489 rpc_contact_info.rpc()
490 );
491
492 let rpc_addr = rpc_contact_info.rpc()?;
493 let ping_time = ping(&rpc_addr);
494
495 let rpc_client =
496 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
497
498 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
499 })
500 .filter(
501 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
502 .get_version()
503 {
504 Ok(rpc_version) => {
505 if let Some(ping_time) = ping_time {
506 info!(
507 "RPC node version: {} Ping: {}ms",
508 rpc_version.solana_core,
509 ping_time.as_millis()
510 );
511 true
512 } else {
513 fail_rpc_node(
514 "Failed to ping RPC".to_string(),
515 &validator_config.known_validators,
516 rpc_contact_info.pubkey(),
517 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
518 );
519 false
520 }
521 }
522 Err(err) => {
523 fail_rpc_node(
524 format!("Failed to get RPC node version: {err}"),
525 &validator_config.known_validators,
526 rpc_contact_info.pubkey(),
527 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
528 );
529 false
530 }
531 },
532 )
533 .collect::<Vec<(
534 ContactInfo,
535 Option<SnapshotHash>,
536 RpcClient,
537 Option<Duration>,
538 )>>()
539 .into_iter()
540 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
541 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
542 (rpc_contact_info, snapshot_hash, rpc_client)
543 })
544 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
545 );
546 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
547 }
548}
549
550#[allow(clippy::too_many_arguments)]
551pub fn rpc_bootstrap(
552 node: &Node,
553 identity_keypair: &Arc<Keypair>,
554 ledger_path: &Path,
555 vote_account: &Pubkey,
556 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
557 cluster_entrypoints: &[ContactInfo],
558 validator_config: &mut ValidatorConfig,
559 bootstrap_config: RpcBootstrapConfig,
560 do_port_check: bool,
561 use_progress_bar: bool,
562 maximum_local_snapshot_age: Slot,
563 should_check_duplicate_instance: bool,
564 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
565 minimal_snapshot_download_speed: f32,
566 maximum_snapshot_download_abort: u64,
567 socket_addr_space: SocketAddrSpace,
568) {
569 if do_port_check {
570 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
571 order.shuffle(&mut rng());
572 if order.into_iter().all(|i| {
573 !verify_reachable_ports(
574 node,
575 &cluster_entrypoints[i],
576 validator_config,
577 &socket_addr_space,
578 )
579 }) {
580 exit(1);
581 }
582 }
583
584 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
585 return;
586 }
587
588 let total_snapshot_download_time = Instant::now();
589 let mut get_rpc_nodes_time = Duration::new(0, 0);
590 let mut snapshot_download_time = Duration::new(0, 0);
591 let mut blacklisted_rpc_nodes = HashSet::new();
592 let mut gossip = None;
593 let mut vetted_rpc_nodes = vec![];
594 let mut download_abort_count = 0;
595 loop {
596 if gossip.is_none() {
597 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
598
599 gossip = Some(start_gossip_node(
600 identity_keypair.clone(),
601 cluster_entrypoints,
602 validator_config.known_validators.clone(),
603 ledger_path,
604 &node
605 .info
606 .gossip()
607 .expect("Operator must spin up node with valid gossip address"),
608 node.sockets.gossip.clone(),
609 validator_config
610 .expected_shred_version
611 .expect("expected_shred_version should not be None"),
612 validator_config.gossip_validators.clone(),
613 should_check_duplicate_instance,
614 socket_addr_space,
615 ));
616 }
617
618 let get_rpc_nodes_start = Instant::now();
619 get_vetted_rpc_nodes(
620 &mut vetted_rpc_nodes,
621 &gossip.as_ref().unwrap().0,
622 validator_config,
623 &mut blacklisted_rpc_nodes,
624 &bootstrap_config,
625 );
626 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
627 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
628
629 let snapshot_download_start = Instant::now();
630 let download_result = attempt_download_genesis_and_snapshot(
631 &rpc_contact_info,
632 ledger_path,
633 validator_config,
634 &bootstrap_config,
635 use_progress_bar,
636 &mut gossip,
637 &rpc_client,
638 maximum_local_snapshot_age,
639 start_progress,
640 minimal_snapshot_download_speed,
641 maximum_snapshot_download_abort,
642 &mut download_abort_count,
643 snapshot_hash,
644 identity_keypair,
645 vote_account,
646 authorized_voter_keypairs.clone(),
647 );
648 snapshot_download_time += snapshot_download_start.elapsed();
649 match download_result {
650 Ok(()) => break,
651 Err(err) => {
652 fail_rpc_node(
653 err,
654 &validator_config.known_validators,
655 rpc_contact_info.pubkey(),
656 &mut blacklisted_rpc_nodes,
657 );
658 }
659 }
660 }
661
662 if let Some(gossip) = gossip.take() {
663 shutdown_gossip_service(gossip);
664 }
665
666 datapoint_info!(
667 "bootstrap-snapshot-download",
668 (
669 "total_time_secs",
670 total_snapshot_download_time.elapsed().as_secs(),
671 i64
672 ),
673 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
674 (
675 "snapshot_download_time_secs",
676 snapshot_download_time.as_secs(),
677 i64
678 ),
679 ("download_abort_count", download_abort_count, i64),
680 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
681 );
682}
683
684fn get_rpc_nodes(
688 cluster_info: &ClusterInfo,
689 validator_config: &ValidatorConfig,
690 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
691 bootstrap_config: &RpcBootstrapConfig,
692) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
693 let mut blacklist_timeout = Instant::now();
694 let mut get_rpc_peers_timout = Instant::now();
695 let mut newer_cluster_snapshot_timeout = None;
696 let mut retry_reason = None;
697 loop {
698 std::thread::sleep(Duration::from_secs(1));
700 info!("\n{}", cluster_info.rpc_info_trace());
701
702 let rpc_peers = get_rpc_peers(
703 cluster_info,
704 validator_config,
705 blacklisted_rpc_nodes,
706 &blacklist_timeout,
707 &mut retry_reason,
708 bootstrap_config,
709 );
710 if rpc_peers.is_empty() {
711 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
712 return Err(GetRpcNodeError::NoRpcPeersFound);
713 }
714 continue;
715 }
716
717 blacklist_timeout = Instant::now();
719 get_rpc_peers_timout = Instant::now();
720 if bootstrap_config.no_snapshot_fetch {
721 let random_peer = &rpc_peers[rng().random_range(0..rpc_peers.len())];
722 return Ok(vec![GetRpcNodeResult {
723 rpc_contact_info: random_peer.clone(),
724 snapshot_hash: None,
725 }]);
726 }
727
728 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
729 .as_ref()
730 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
731 .unwrap_or(true)
732 {
733 KnownValidatorsToWaitFor::All
734 } else {
735 KnownValidatorsToWaitFor::Any
736 };
737 let peer_snapshot_hashes = get_peer_snapshot_hashes(
738 cluster_info,
739 &rpc_peers,
740 validator_config.known_validators.as_ref(),
741 known_validators_to_wait_for,
742 bootstrap_config.incremental_snapshot_fetch,
743 );
744 if peer_snapshot_hashes.is_empty() {
745 match newer_cluster_snapshot_timeout {
746 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
747 Some(newer_cluster_snapshot_timeout) => {
748 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
749 return Err(GetRpcNodeError::NoNewerSnapshots);
750 }
751 }
752 }
753 retry_reason = Some("No snapshots available".to_owned());
754 continue;
755 } else {
756 let rpc_peers = peer_snapshot_hashes
757 .iter()
758 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
759 .collect::<Vec<_>>();
760 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
761 info!(
762 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
763 final_snapshot_hash
764 .incr
765 .map(|(slot, _hash)| slot)
766 .unwrap_or(final_snapshot_hash.full.0),
767 rpc_peers.len(),
768 if rpc_peers.len() > 1 { "s" } else { "" },
769 rpc_peers,
770 );
771 let rpc_node_results = peer_snapshot_hashes
772 .iter()
773 .map(|peer_snapshot_hash| GetRpcNodeResult {
774 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
775 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
776 })
777 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
778 .collect();
779 return Ok(rpc_node_results);
780 }
781 }
782}
783
784fn get_highest_local_snapshot_hash(
787 full_snapshot_archives_dir: impl AsRef<Path>,
788 incremental_snapshot_archives_dir: impl AsRef<Path>,
789 incremental_snapshot_fetch: bool,
790) -> Option<(Slot, Hash)> {
791 snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
792 .and_then(|full_snapshot_info| {
793 if incremental_snapshot_fetch {
794 snapshot_paths::get_highest_incremental_snapshot_archive_info(
795 incremental_snapshot_archives_dir,
796 full_snapshot_info.slot(),
797 )
798 .map(|incremental_snapshot_info| {
799 (
800 incremental_snapshot_info.slot(),
801 *incremental_snapshot_info.hash(),
802 )
803 })
804 } else {
805 None
806 }
807 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
808 })
809 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
810}
811
812fn get_peer_snapshot_hashes(
819 cluster_info: &ClusterInfo,
820 rpc_peers: &[ContactInfo],
821 known_validators: Option<&HashSet<Pubkey>>,
822 known_validators_to_wait_for: KnownValidatorsToWaitFor,
823 incremental_snapshot_fetch: bool,
824) -> Vec<PeerSnapshotHash> {
825 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
826 if let Some(known_validators) = known_validators {
827 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
828 cluster_info,
829 known_validators,
830 known_validators_to_wait_for,
831 );
832 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
833 &known_snapshot_hashes,
834 &mut peer_snapshot_hashes,
835 );
836 }
837 if incremental_snapshot_fetch {
838 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
846 &mut peer_snapshot_hashes,
847 );
848 }
849 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
850
851 peer_snapshot_hashes
852}
853
854type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
857
858fn get_snapshot_hashes_from_known_validators(
869 cluster_info: &ClusterInfo,
870 known_validators: &HashSet<Pubkey>,
871 known_validators_to_wait_for: KnownValidatorsToWaitFor,
872) -> KnownSnapshotHashes {
873 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
875
876 if !do_known_validators_have_all_snapshot_hashes(
877 known_validators,
878 known_validators_to_wait_for,
879 get_snapshot_hashes_for_node,
880 ) {
881 debug!(
882 "Snapshot hashes have not been discovered from known validators. This likely means \
883 the gossip tables are not fully populated. We will sleep and retry..."
884 );
885 return KnownSnapshotHashes::default();
886 }
887
888 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
889}
890
891fn do_known_validators_have_all_snapshot_hashes<'a>(
900 known_validators: impl IntoIterator<Item = &'a Pubkey>,
901 known_validators_to_wait_for: KnownValidatorsToWaitFor,
902 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
903) -> bool {
904 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
905
906 match known_validators_to_wait_for {
907 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
908 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
909 }
910}
911
912#[derive(Debug, Copy, Clone, Eq, PartialEq)]
915enum KnownValidatorsToWaitFor {
916 All,
917 Any,
918}
919
920fn build_known_snapshot_hashes<'a>(
926 nodes: impl IntoIterator<Item = &'a Pubkey>,
927 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
928) -> KnownSnapshotHashes {
929 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
930
931 fn is_any_same_slot_and_different_hash<'a>(
934 needle: &(Slot, Hash),
935 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
936 ) -> bool {
937 haystack
938 .into_iter()
939 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
940 }
941
942 'to_next_node: for node in nodes {
943 let Some(SnapshotHash {
944 full: full_snapshot_hash,
945 incr: incremental_snapshot_hash,
946 }) = get_snapshot_hashes_for_node(node)
947 else {
948 continue 'to_next_node;
949 };
950
951 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
956 warn!(
957 "Ignoring all snapshot hashes from node {node} since we've seen a different full \
958 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
959 );
960 debug!(
961 "known full snapshot hashes: {:#?}",
962 known_snapshot_hashes.keys(),
963 );
964 continue 'to_next_node;
965 }
966
967 let known_incremental_snapshot_hashes =
971 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
972
973 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
974 if is_any_same_slot_and_different_hash(
979 &incremental_snapshot_hash,
980 known_incremental_snapshot_hashes.iter(),
981 ) {
982 warn!(
983 "Ignoring incremental snapshot hash from node {node} since we've seen a \
984 different incremental snapshot hash with this slot. full snapshot hash: \
985 {full_snapshot_hash:?}, incremental snapshot hash: \
986 {incremental_snapshot_hash:?}"
987 );
988 debug!(
989 "known incremental snapshot hashes based on this slot: {:#?}",
990 known_incremental_snapshot_hashes.iter(),
991 );
992 continue 'to_next_node;
993 }
994
995 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
996 };
997 }
998
999 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1000 known_snapshot_hashes
1001}
1002
1003fn get_eligible_peer_snapshot_hashes(
1009 cluster_info: &ClusterInfo,
1010 rpc_peers: &[ContactInfo],
1011) -> Vec<PeerSnapshotHash> {
1012 let peer_snapshot_hashes = rpc_peers
1013 .iter()
1014 .flat_map(|rpc_peer| {
1015 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1016 PeerSnapshotHash {
1017 rpc_contact_info: rpc_peer.clone(),
1018 snapshot_hash,
1019 }
1020 })
1021 })
1022 .collect();
1023
1024 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1025 peer_snapshot_hashes
1026}
1027
1028fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1030 known_snapshot_hashes: &KnownSnapshotHashes,
1031 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1032) {
1033 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1034 known_snapshot_hashes
1035 .get(&peer_snapshot_hash.snapshot_hash.full)
1036 .map(|known_incremental_hashes| {
1037 if let Some(incr) = peer_snapshot_hash.snapshot_hash.incr.as_ref() {
1038 known_incremental_hashes.contains(incr)
1039 } else {
1040 true
1043 }
1044 })
1045 .unwrap_or(false)
1046 });
1047
1048 trace!(
1049 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1050 );
1051}
1052
1053fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1055 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1056) {
1057 let highest_full_snapshot_hash = peer_snapshot_hashes
1058 .iter()
1059 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1060 .max_by_key(|(slot, _hash)| *slot);
1061 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1062 return;
1066 };
1067
1068 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1069 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1070 });
1071
1072 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1073}
1074
1075fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1077 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1078) {
1079 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1080 .iter()
1081 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1082 .max_by_key(|(slot, _hash)| *slot);
1083
1084 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1085 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1086 });
1087
1088 trace!(
1089 "retain peer snapshot hashes with highest incremental snapshot slot: \
1090 {peer_snapshot_hashes:?}"
1091 );
1092}
1093
1094#[allow(clippy::too_many_arguments)]
1096fn download_snapshots(
1097 validator_config: &ValidatorConfig,
1098 bootstrap_config: &RpcBootstrapConfig,
1099 use_progress_bar: bool,
1100 maximum_local_snapshot_age: Slot,
1101 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1102 minimal_snapshot_download_speed: f32,
1103 maximum_snapshot_download_abort: u64,
1104 download_abort_count: &mut u64,
1105 snapshot_hash: Option<SnapshotHash>,
1106 rpc_contact_info: &ContactInfo,
1107) -> Result<(), String> {
1108 if snapshot_hash.is_none() {
1109 return Ok(());
1110 }
1111 let SnapshotHash {
1112 full: full_snapshot_hash,
1113 incr: incremental_snapshot_hash,
1114 } = snapshot_hash.unwrap();
1115 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1116 let incremental_snapshot_archives_dir = &validator_config
1117 .snapshot_config
1118 .incremental_snapshot_archives_dir;
1119
1120 if should_use_local_snapshot(
1122 full_snapshot_archives_dir,
1123 incremental_snapshot_archives_dir,
1124 maximum_local_snapshot_age,
1125 full_snapshot_hash,
1126 incremental_snapshot_hash,
1127 bootstrap_config.incremental_snapshot_fetch,
1128 ) {
1129 return Ok(());
1130 }
1131
1132 if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
1134 .into_iter()
1135 .any(|snapshot_archive| {
1136 snapshot_archive.slot() == full_snapshot_hash.0
1137 && snapshot_archive.hash().0 == full_snapshot_hash.1
1138 })
1139 {
1140 info!(
1141 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1142 full_snapshot_hash.0, full_snapshot_hash.1
1143 );
1144 } else {
1145 download_snapshot(
1146 validator_config,
1147 bootstrap_config,
1148 use_progress_bar,
1149 start_progress,
1150 minimal_snapshot_download_speed,
1151 maximum_snapshot_download_abort,
1152 download_abort_count,
1153 rpc_contact_info,
1154 full_snapshot_hash,
1155 SnapshotArchiveKind::Full,
1156 )?;
1157 }
1158
1159 if bootstrap_config.incremental_snapshot_fetch {
1160 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1162 if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1163 .into_iter()
1164 .any(|snapshot_archive| {
1165 snapshot_archive.slot() == incremental_snapshot_hash.0
1166 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1167 && snapshot_archive.base_slot() == full_snapshot_hash.0
1168 })
1169 {
1170 info!(
1171 "Incremental snapshot archive already exists locally. Skipping download. \
1172 slot: {}, hash: {}",
1173 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1174 );
1175 } else {
1176 download_snapshot(
1177 validator_config,
1178 bootstrap_config,
1179 use_progress_bar,
1180 start_progress,
1181 minimal_snapshot_download_speed,
1182 maximum_snapshot_download_abort,
1183 download_abort_count,
1184 rpc_contact_info,
1185 incremental_snapshot_hash,
1186 SnapshotArchiveKind::Incremental(full_snapshot_hash.0),
1187 )?;
1188 }
1189 }
1190 }
1191
1192 Ok(())
1193}
1194
1195#[allow(clippy::too_many_arguments)]
1197fn download_snapshot(
1198 validator_config: &ValidatorConfig,
1199 bootstrap_config: &RpcBootstrapConfig,
1200 use_progress_bar: bool,
1201 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1202 minimal_snapshot_download_speed: f32,
1203 maximum_snapshot_download_abort: u64,
1204 download_abort_count: &mut u64,
1205 rpc_contact_info: &ContactInfo,
1206 desired_snapshot_hash: (Slot, Hash),
1207 snapshot_kind: SnapshotArchiveKind,
1208) -> Result<(), String> {
1209 let maximum_full_snapshot_archives_to_retain = validator_config
1210 .snapshot_config
1211 .maximum_full_snapshot_archives_to_retain;
1212 let maximum_incremental_snapshot_archives_to_retain = validator_config
1213 .snapshot_config
1214 .maximum_incremental_snapshot_archives_to_retain;
1215 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1216 let incremental_snapshot_archives_dir = &validator_config
1217 .snapshot_config
1218 .incremental_snapshot_archives_dir;
1219
1220 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1221 slot: desired_snapshot_hash.0,
1222 rpc_addr: rpc_contact_info
1223 .rpc()
1224 .ok_or_else(|| String::from("Invalid RPC address"))?,
1225 };
1226 let desired_snapshot_hash = (
1227 desired_snapshot_hash.0,
1228 agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1229 );
1230 download_snapshot_archive(
1231 &rpc_contact_info
1232 .rpc()
1233 .ok_or_else(|| String::from("Invalid RPC address"))?,
1234 full_snapshot_archives_dir,
1235 incremental_snapshot_archives_dir,
1236 desired_snapshot_hash,
1237 snapshot_kind,
1238 maximum_full_snapshot_archives_to_retain,
1239 maximum_incremental_snapshot_archives_to_retain,
1240 use_progress_bar,
1241 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1242 debug!("Download progress: {download_progress:?}");
1243 if download_progress.last_throughput < minimal_snapshot_download_speed
1244 && download_progress.notification_count <= 1
1245 && download_progress.percentage_done <= 2_f32
1246 && download_progress.estimated_remaining_time > 60_f32
1247 && *download_abort_count < maximum_snapshot_download_abort
1248 {
1249 if let Some(ref known_validators) = validator_config.known_validators {
1250 if known_validators.contains(rpc_contact_info.pubkey())
1251 && known_validators.len() == 1
1252 && bootstrap_config.only_known_rpc
1253 {
1254 warn!(
1255 "The snapshot download is too slow, throughput: {} < min speed {} \
1256 bytes/sec, but will NOT abort and try a different node as it is the \
1257 only known validator and the --only-known-rpc flag is set. Abort \
1258 count: {}, Progress detail: {:?}",
1259 download_progress.last_throughput,
1260 minimal_snapshot_download_speed,
1261 download_abort_count,
1262 download_progress,
1263 );
1264 return true; }
1266 }
1267 warn!(
1268 "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1269 will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1270 download_progress.last_throughput,
1271 minimal_snapshot_download_speed,
1272 download_abort_count,
1273 download_progress,
1274 );
1275 *download_abort_count += 1;
1276 false
1277 } else {
1278 true
1279 }
1280 })),
1281 )
1282}
1283
1284fn should_use_local_snapshot(
1287 full_snapshot_archives_dir: &Path,
1288 incremental_snapshot_archives_dir: &Path,
1289 maximum_local_snapshot_age: Slot,
1290 full_snapshot_hash: (Slot, Hash),
1291 incremental_snapshot_hash: Option<(Slot, Hash)>,
1292 incremental_snapshot_fetch: bool,
1293) -> bool {
1294 let cluster_snapshot_slot = incremental_snapshot_hash
1295 .map(|(slot, _)| slot)
1296 .unwrap_or(full_snapshot_hash.0);
1297
1298 match get_highest_local_snapshot_hash(
1299 full_snapshot_archives_dir,
1300 incremental_snapshot_archives_dir,
1301 incremental_snapshot_fetch,
1302 ) {
1303 None => {
1304 info!(
1305 "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1306 local snapshot."
1307 );
1308 false
1309 }
1310 Some((local_snapshot_slot, _)) => {
1311 if local_snapshot_slot
1312 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1313 {
1314 info!(
1315 "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1316 a snapshot for slot {cluster_snapshot_slot}."
1317 );
1318 true
1319 } else {
1320 info!(
1321 "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1322 newer snapshot for slot {cluster_snapshot_slot}."
1323 );
1324 false
1325 }
1326 }
1327 }
1328}
1329
1330fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1332 cluster_info.get_snapshot_hashes_for_node(node).map(
1333 |crds_data::SnapshotHashes {
1334 full, incremental, ..
1335 }| {
1336 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1337 SnapshotHash {
1338 full,
1339 incr: highest_incremental_snapshot_hash,
1340 }
1341 },
1342 )
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347 use super::*;
1348
1349 impl PeerSnapshotHash {
1350 fn new(
1351 rpc_contact_info: ContactInfo,
1352 full_snapshot_hash: (Slot, Hash),
1353 incremental_snapshot_hash: Option<(Slot, Hash)>,
1354 ) -> Self {
1355 Self {
1356 rpc_contact_info,
1357 snapshot_hash: SnapshotHash {
1358 full: full_snapshot_hash,
1359 incr: incremental_snapshot_hash,
1360 },
1361 }
1362 }
1363 }
1364
1365 fn default_contact_info_for_tests() -> ContactInfo {
1366 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1367 }
1368
1369 #[test]
1370 fn test_build_known_snapshot_hashes() {
1371 agave_logger::setup();
1372 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1373 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1374
1375 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1376 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1377
1378 let oracle = {
1380 let mut oracle = HashMap::new();
1381
1382 for (full, incr) in [
1383 (full_snapshot_hash1, None),
1385 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1387 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1389 (full_snapshot_hash2, None),
1391 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1392 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1393 ] {
1394 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1396 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1397 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1398 }
1399
1400 oracle.insert(Pubkey::new_unique(), None);
1402 oracle.insert(Pubkey::new_unique(), None);
1403 oracle.insert(Pubkey::new_unique(), None);
1404
1405 oracle
1406 };
1407
1408 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1409
1410 let known_snapshot_hashes =
1411 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1412
1413 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1416 assert_eq!(known_full_snapshot_hashes.len(), 1);
1417 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1418
1419 let known_incremental_snapshot_hashes =
1421 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1422 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1423 let known_incremental_snapshot_hash =
1424 known_incremental_snapshot_hashes.iter().next().unwrap();
1425
1426 assert!(
1433 known_full_snapshot_hash == &full_snapshot_hash1
1434 || known_full_snapshot_hash == &full_snapshot_hash2
1435 );
1436 assert!(
1437 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1438 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1439 );
1440 }
1441
1442 #[test]
1443 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1444 let known_snapshot_hashes: KnownSnapshotHashes = [
1445 (
1446 (200_000, Hash::new_unique()),
1447 [
1448 (200_200, Hash::new_unique()),
1449 (200_400, Hash::new_unique()),
1450 (200_600, Hash::new_unique()),
1451 (200_800, Hash::new_unique()),
1452 ]
1453 .iter()
1454 .cloned()
1455 .collect(),
1456 ),
1457 (
1458 (300_000, Hash::new_unique()),
1459 [
1460 (300_200, Hash::new_unique()),
1461 (300_400, Hash::new_unique()),
1462 (300_600, Hash::new_unique()),
1463 ]
1464 .iter()
1465 .cloned()
1466 .collect(),
1467 ),
1468 ]
1469 .iter()
1470 .cloned()
1471 .collect();
1472
1473 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1474 let known_full_snapshot_hash = known_snapshot_hash.0;
1475 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1476
1477 let contact_info = default_contact_info_for_tests();
1478 let peer_snapshot_hashes = vec![
1479 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1481 PeerSnapshotHash::new(
1483 contact_info.clone(),
1484 (111_000, Hash::default()),
1485 Some((111_111, Hash::default())),
1486 ),
1487 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1489 PeerSnapshotHash::new(
1491 contact_info.clone(),
1492 (111_000, Hash::default()),
1493 Some(*known_incremental_snapshot_hash),
1494 ),
1495 PeerSnapshotHash::new(
1497 contact_info.clone(),
1498 *known_full_snapshot_hash,
1499 Some((111_111, Hash::default())),
1500 ),
1501 PeerSnapshotHash::new(
1503 contact_info.clone(),
1504 *known_full_snapshot_hash,
1505 Some(*known_incremental_snapshot_hash),
1506 ),
1507 ];
1508
1509 let expected = vec![
1510 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1511 PeerSnapshotHash::new(
1512 contact_info,
1513 *known_full_snapshot_hash,
1514 Some(*known_incremental_snapshot_hash),
1515 ),
1516 ];
1517 let mut actual = peer_snapshot_hashes;
1518 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1519 &known_snapshot_hashes,
1520 &mut actual,
1521 );
1522 assert_eq!(expected, actual);
1523 }
1524
1525 #[test]
1526 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1527 let contact_info = default_contact_info_for_tests();
1528 let peer_snapshot_hashes = vec![
1529 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1531 PeerSnapshotHash::new(
1532 contact_info.clone(),
1533 (100_000, Hash::default()),
1534 Some((100_100, Hash::default())),
1535 ),
1536 PeerSnapshotHash::new(
1537 contact_info.clone(),
1538 (100_000, Hash::default()),
1539 Some((100_200, Hash::default())),
1540 ),
1541 PeerSnapshotHash::new(
1542 contact_info.clone(),
1543 (100_000, Hash::default()),
1544 Some((100_300, Hash::default())),
1545 ),
1546 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1548 PeerSnapshotHash::new(
1549 contact_info.clone(),
1550 (200_000, Hash::default()),
1551 Some((200_100, Hash::default())),
1552 ),
1553 PeerSnapshotHash::new(
1554 contact_info.clone(),
1555 (200_000, Hash::default()),
1556 Some((200_200, Hash::default())),
1557 ),
1558 PeerSnapshotHash::new(
1559 contact_info.clone(),
1560 (200_000, Hash::default()),
1561 Some((200_300, Hash::default())),
1562 ),
1563 ];
1564
1565 let expected = vec![
1566 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1567 PeerSnapshotHash::new(
1568 contact_info.clone(),
1569 (200_000, Hash::default()),
1570 Some((200_100, Hash::default())),
1571 ),
1572 PeerSnapshotHash::new(
1573 contact_info.clone(),
1574 (200_000, Hash::default()),
1575 Some((200_200, Hash::default())),
1576 ),
1577 PeerSnapshotHash::new(
1578 contact_info,
1579 (200_000, Hash::default()),
1580 Some((200_300, Hash::default())),
1581 ),
1582 ];
1583 let mut actual = peer_snapshot_hashes;
1584 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1585 assert_eq!(expected, actual);
1586 }
1587
1588 #[test]
1589 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1590 let contact_info = default_contact_info_for_tests();
1591 let peer_snapshot_hashes = vec![
1592 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1593 PeerSnapshotHash::new(
1594 contact_info.clone(),
1595 (200_000, Hash::default()),
1596 Some((200_100, Hash::default())),
1597 ),
1598 PeerSnapshotHash::new(
1599 contact_info.clone(),
1600 (200_000, Hash::default()),
1601 Some((200_200, Hash::default())),
1602 ),
1603 PeerSnapshotHash::new(
1604 contact_info.clone(),
1605 (200_000, Hash::default()),
1606 Some((200_300, Hash::default())),
1607 ),
1608 PeerSnapshotHash::new(
1609 contact_info.clone(),
1610 (200_000, Hash::default()),
1611 Some((200_010, Hash::default())),
1612 ),
1613 PeerSnapshotHash::new(
1614 contact_info.clone(),
1615 (200_000, Hash::default()),
1616 Some((200_020, Hash::default())),
1617 ),
1618 PeerSnapshotHash::new(
1619 contact_info.clone(),
1620 (200_000, Hash::default()),
1621 Some((200_030, Hash::default())),
1622 ),
1623 ];
1624
1625 let expected = vec![PeerSnapshotHash::new(
1626 contact_info,
1627 (200_000, Hash::default()),
1628 Some((200_300, Hash::default())),
1629 )];
1630 let mut actual = peer_snapshot_hashes;
1631 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1632 assert_eq!(expected, actual);
1633 }
1634
1635 #[test]
1638 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1639 let contact_info = default_contact_info_for_tests();
1640 let peer_snapshot_hashes = vec![
1641 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1642 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1643 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1644 ];
1645
1646 let expected = peer_snapshot_hashes.clone();
1647 let mut actual = peer_snapshot_hashes;
1648 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1649 assert_eq!(expected, actual);
1650 }
1651
1652 #[test]
1655 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1656 {
1657 let mut actual = vec![];
1658 let expected = actual.clone();
1659 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1660 assert_eq!(expected, actual);
1661 }
1662 {
1663 let mut actual = vec![];
1664 let expected = actual.clone();
1665 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1666 assert_eq!(expected, actual);
1667 }
1668 }
1669}