1use {
2 agave_snapshots::{
3 paths as snapshot_paths, snapshot_archive_info::SnapshotArchiveInfoGetter as _,
4 SnapshotKind,
5 },
6 itertools::Itertools,
7 log::*,
8 rand::{seq::SliceRandom, thread_rng, Rng},
9 rayon::prelude::*,
10 solana_account::ReadableAccount,
11 solana_clock::Slot,
12 solana_commitment_config::CommitmentConfig,
13 solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
14 solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
15 solana_genesis_utils::download_then_check_genesis_hash,
16 solana_gossip::{
17 cluster_info::ClusterInfo,
18 contact_info::{ContactInfo, Protocol},
19 crds_data,
20 gossip_service::GossipService,
21 node::Node,
22 },
23 solana_hash::Hash,
24 solana_keypair::Keypair,
25 solana_metrics::datapoint_info,
26 solana_pubkey::Pubkey,
27 solana_rpc_client::rpc_client::RpcClient,
28 solana_signer::Signer,
29 solana_streamer::socket::SocketAddrSpace,
30 solana_vote_program::vote_state::VoteStateV4,
31 std::{
32 collections::{hash_map::RandomState, HashMap, HashSet},
33 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
34 path::Path,
35 process::exit,
36 sync::{
37 atomic::{AtomicBool, Ordering},
38 Arc, RwLock,
39 },
40 time::{Duration, Instant},
41 },
42 thiserror::Error,
43};
44
45const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
49const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
52const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
55const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
57
58pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
59
60pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
61
62#[derive(Debug, PartialEq, Clone)]
63pub struct RpcBootstrapConfig {
64 pub no_genesis_fetch: bool,
65 pub no_snapshot_fetch: bool,
66 pub only_known_rpc: bool,
67 pub max_genesis_archive_unpacked_size: u64,
68 pub check_vote_account: Option<String>,
69 pub incremental_snapshot_fetch: bool,
70}
71
72fn verify_reachable_ports(
73 node: &Node,
74 cluster_entrypoint: &ContactInfo,
75 validator_config: &ValidatorConfig,
76 socket_addr_space: &SocketAddrSpace,
77) -> bool {
78 let verify_address = |addr: &Option<SocketAddr>| -> bool {
79 addr.as_ref()
80 .map(|addr| socket_addr_space.check(addr))
81 .unwrap_or_default()
82 };
83
84 let mut udp_sockets = vec![&node.sockets.repair];
85 udp_sockets.extend(node.sockets.gossip.iter());
86
87 if verify_address(&node.info.serve_repair(Protocol::UDP)) {
88 udp_sockets.push(&node.sockets.serve_repair);
89 }
90 if verify_address(&node.info.tpu(Protocol::UDP)) {
91 udp_sockets.extend(node.sockets.tpu.iter());
92 udp_sockets.extend(&node.sockets.tpu_quic);
93 }
94 if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
95 udp_sockets.extend(node.sockets.tpu_forwards.iter());
96 udp_sockets.extend(&node.sockets.tpu_forwards_quic);
97 }
98 if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
99 udp_sockets.extend(node.sockets.tpu_vote.iter());
100 }
101 if verify_address(&node.info.tvu(Protocol::UDP)) {
102 udp_sockets.extend(node.sockets.tvu.iter());
103 udp_sockets.extend(node.sockets.broadcast.iter());
104 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
105 }
106 if !solana_net_utils::verify_all_reachable_udp(
107 &cluster_entrypoint.gossip().unwrap(),
108 &udp_sockets,
109 ) {
110 return false;
111 }
112
113 let mut tcp_listeners = vec![];
114 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
115 for (purpose, bind_addr, public_addr) in &[
116 ("RPC", rpc_addr, node.info.rpc()),
117 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
118 ] {
119 if verify_address(public_addr) {
120 tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
121 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
122 exit(1);
123 }));
124 }
125 }
126 }
127
128 if let Some(ip_echo) = &node.sockets.ip_echo {
129 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
130 tcp_listeners.push(ip_echo);
131 }
132
133 solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
134}
135
136fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
137 if let Some(known_validators) = known_validators {
138 known_validators.contains(id)
139 } else {
140 false
141 }
142}
143
144#[allow(clippy::too_many_arguments)]
145fn start_gossip_node(
146 identity_keypair: Arc<Keypair>,
147 cluster_entrypoints: &[ContactInfo],
148 known_validators: Option<HashSet<Pubkey>>,
149 ledger_path: &Path,
150 gossip_addr: &SocketAddr,
151 gossip_sockets: Arc<[UdpSocket]>,
152 expected_shred_version: u16,
153 gossip_validators: Option<HashSet<Pubkey>>,
154 should_check_duplicate_instance: bool,
155 socket_addr_space: SocketAddrSpace,
156) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
157 let contact_info = ClusterInfo::gossip_contact_info(
158 identity_keypair.pubkey(),
159 *gossip_addr,
160 expected_shred_version,
161 );
162 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
163 if let Some(known_validators) = known_validators {
164 cluster_info
165 .set_trim_keep_pubkeys(known_validators)
166 .expect("set_trim_keep_pubkeys should succeed as ClusterInfo was just created");
167 }
168 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
169 cluster_info.restore_contact_info(ledger_path, 0);
170 let cluster_info = Arc::new(cluster_info);
171
172 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
173 let gossip_service = GossipService::new(
174 &cluster_info,
175 None,
176 gossip_sockets,
177 gossip_validators,
178 should_check_duplicate_instance,
179 None,
180 gossip_exit_flag.clone(),
181 );
182 (cluster_info, gossip_exit_flag, gossip_service)
183}
184
185fn get_rpc_peers(
186 cluster_info: &ClusterInfo,
187 validator_config: &ValidatorConfig,
188 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
189 blacklist_timeout: &Instant,
190 retry_reason: &mut Option<String>,
191 bootstrap_config: &RpcBootstrapConfig,
192) -> Vec<ContactInfo> {
193 let shred_version = validator_config
194 .expected_shred_version
195 .unwrap_or_else(|| cluster_info.my_shred_version());
196
197 info!(
198 "Searching for an RPC service with shred version {shred_version}{}...",
199 retry_reason
200 .as_ref()
201 .map(|s| format!(" (Retrying: {s})"))
202 .unwrap_or_default()
203 );
204
205 let mut rpc_peers = cluster_info.rpc_peers();
206 if bootstrap_config.only_known_rpc {
207 rpc_peers.retain(|rpc_peer| {
208 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
209 });
210 }
211
212 let rpc_peers_total = rpc_peers.len();
213
214 let rpc_peers: Vec<_> = rpc_peers
216 .into_iter()
217 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
218 .collect();
219 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
220 let rpc_known_peers = rpc_peers
221 .iter()
222 .filter(|rpc_peer| {
223 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
224 })
225 .count();
226
227 info!(
228 "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
229 {rpc_peers_blacklisted} blacklisted"
230 );
231
232 if rpc_peers_blacklisted == rpc_peers_total {
233 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
234 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
235 {
236 blacklisted_rpc_nodes.clear();
239 Some("Blacklist timeout expired".to_owned())
240 } else {
241 Some("Wait for known rpc peers".to_owned())
242 };
243 return vec![];
244 }
245 rpc_peers
246}
247
248fn check_vote_account(
249 rpc_client: &RpcClient,
250 identity_pubkey: &Pubkey,
251 vote_account_address: &Pubkey,
252 authorized_voter_pubkeys: &[Pubkey],
253) -> Result<(), String> {
254 let vote_account = rpc_client
255 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
256 .map_err(|err| format!("failed to fetch vote account: {err}"))?
257 .value
258 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
259
260 if vote_account.owner != solana_vote_program::id() {
261 return Err(format!(
262 "not a vote account (owned by {}): {}",
263 vote_account.owner, vote_account_address
264 ));
265 }
266
267 let identity_account = rpc_client
268 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
269 .map_err(|err| format!("failed to fetch identity account: {err}"))?
270 .value
271 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
272
273 let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
274 if let Some(vote_state) = vote_state {
275 if vote_state.authorized_voters.is_empty() {
276 return Err("Vote account not yet initialized".to_string());
277 }
278
279 if vote_state.node_pubkey != *identity_pubkey {
280 return Err(format!(
281 "vote account's identity ({}) does not match the validator's identity {}).",
282 vote_state.node_pubkey, identity_pubkey
283 ));
284 }
285
286 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
287 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
288 return Err(format!(
289 "authorized voter {vote_account_authorized_voter_pubkey} not available"
290 ));
291 }
292 }
293 } else {
294 return Err(format!(
295 "invalid vote account data for {vote_account_address}"
296 ));
297 }
298
299 if identity_account.lamports <= 1 {
301 return Err(format!(
302 "underfunded identity account ({}): only {} lamports available",
303 identity_pubkey, identity_account.lamports
304 ));
305 }
306
307 Ok(())
308}
309
310#[derive(Error, Debug)]
311pub enum GetRpcNodeError {
312 #[error("Unable to find any RPC peers")]
313 NoRpcPeersFound,
314
315 #[error("Giving up, did not get newer snapshots from the cluster")]
316 NoNewerSnapshots,
317}
318
319#[derive(Debug)]
323struct GetRpcNodeResult {
324 rpc_contact_info: ContactInfo,
325 snapshot_hash: Option<SnapshotHash>,
326}
327
328#[derive(Debug, PartialEq, Eq, Clone)]
330struct PeerSnapshotHash {
331 rpc_contact_info: ContactInfo,
332 snapshot_hash: SnapshotHash,
333}
334
335#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
338pub struct SnapshotHash {
339 full: (Slot, Hash),
340 incr: Option<(Slot, Hash)>,
341}
342
343pub fn fail_rpc_node(
344 err: String,
345 known_validators: &Option<HashSet<Pubkey, RandomState>>,
346 rpc_id: &Pubkey,
347 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
348) {
349 warn!("{err}");
350 if let Some(ref known_validators) = known_validators {
351 if known_validators.contains(rpc_id) {
352 return;
353 }
354 }
355
356 info!("Excluding {rpc_id} as a future RPC candidate");
357 blacklisted_rpc_nodes.insert(*rpc_id);
358}
359
360fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
361 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
362 cluster_info.save_contact_info();
363 gossip_exit_flag.store(true, Ordering::Relaxed);
364 gossip_service.join().unwrap();
365}
366
367#[allow(clippy::too_many_arguments)]
368pub fn attempt_download_genesis_and_snapshot(
369 rpc_contact_info: &ContactInfo,
370 ledger_path: &Path,
371 validator_config: &mut ValidatorConfig,
372 bootstrap_config: &RpcBootstrapConfig,
373 use_progress_bar: bool,
374 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
375 rpc_client: &RpcClient,
376 maximum_local_snapshot_age: Slot,
377 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
378 minimal_snapshot_download_speed: f32,
379 maximum_snapshot_download_abort: u64,
380 download_abort_count: &mut u64,
381 snapshot_hash: Option<SnapshotHash>,
382 identity_keypair: &Arc<Keypair>,
383 vote_account: &Pubkey,
384 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
385) -> Result<(), String> {
386 download_then_check_genesis_hash(
387 &rpc_contact_info
388 .rpc()
389 .ok_or_else(|| String::from("Invalid RPC address"))?,
390 ledger_path,
391 &mut validator_config.expected_genesis_hash,
392 bootstrap_config.max_genesis_archive_unpacked_size,
393 bootstrap_config.no_genesis_fetch,
394 use_progress_bar,
395 rpc_client,
396 )?;
397
398 if let Some(gossip) = gossip.take() {
399 shutdown_gossip_service(gossip);
400 }
401
402 let rpc_client_slot = rpc_client
403 .get_slot_with_commitment(CommitmentConfig::finalized())
404 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
405 info!("RPC node root slot: {rpc_client_slot}");
406
407 download_snapshots(
408 validator_config,
409 bootstrap_config,
410 use_progress_bar,
411 maximum_local_snapshot_age,
412 start_progress,
413 minimal_snapshot_download_speed,
414 maximum_snapshot_download_abort,
415 download_abort_count,
416 snapshot_hash,
417 rpc_contact_info,
418 )?;
419
420 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
421 let rpc_client = RpcClient::new(url);
422 check_vote_account(
423 &rpc_client,
424 &identity_keypair.pubkey(),
425 vote_account,
426 &authorized_voter_keypairs
427 .read()
428 .unwrap()
429 .iter()
430 .map(|k| k.pubkey())
431 .collect::<Vec<_>>(),
432 )
433 .unwrap_or_else(|err| {
434 error!("{err}");
441 exit(1);
442 });
443 }
444 Ok(())
445}
446
447fn ping(addr: &SocketAddr) -> Option<Duration> {
449 let start = Instant::now();
450 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
451 Ok(_) => Some(start.elapsed()),
452 Err(_) => None,
453 }
454}
455
456fn get_vetted_rpc_nodes(
460 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
461 cluster_info: &Arc<ClusterInfo>,
462 validator_config: &ValidatorConfig,
463 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
464 bootstrap_config: &RpcBootstrapConfig,
465) {
466 while vetted_rpc_nodes.is_empty() {
467 let rpc_node_details = match get_rpc_nodes(
468 cluster_info,
469 validator_config,
470 blacklisted_rpc_nodes,
471 bootstrap_config,
472 ) {
473 Ok(rpc_node_details) => rpc_node_details,
474 Err(err) => {
475 error!(
476 "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
477 `--no-port-check`, or adjusting `--known-validator ...` arguments as \
478 applicable"
479 );
480 exit(1);
481 }
482 };
483
484 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
485 vetted_rpc_nodes.extend(
486 rpc_node_details
487 .into_par_iter()
488 .filter_map(|rpc_node_details| {
489 let GetRpcNodeResult {
490 rpc_contact_info,
491 snapshot_hash,
492 } = rpc_node_details;
493
494 info!(
495 "Using RPC service from node {}: {:?}",
496 rpc_contact_info.pubkey(),
497 rpc_contact_info.rpc()
498 );
499
500 let rpc_addr = rpc_contact_info.rpc()?;
501 let ping_time = ping(&rpc_addr);
502
503 let rpc_client =
504 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
505
506 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
507 })
508 .filter(
509 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
510 .get_version()
511 {
512 Ok(rpc_version) => {
513 if let Some(ping_time) = ping_time {
514 info!(
515 "RPC node version: {} Ping: {}ms",
516 rpc_version.solana_core,
517 ping_time.as_millis()
518 );
519 true
520 } else {
521 fail_rpc_node(
522 "Failed to ping RPC".to_string(),
523 &validator_config.known_validators,
524 rpc_contact_info.pubkey(),
525 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
526 );
527 false
528 }
529 }
530 Err(err) => {
531 fail_rpc_node(
532 format!("Failed to get RPC node version: {err}"),
533 &validator_config.known_validators,
534 rpc_contact_info.pubkey(),
535 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
536 );
537 false
538 }
539 },
540 )
541 .collect::<Vec<(
542 ContactInfo,
543 Option<SnapshotHash>,
544 RpcClient,
545 Option<Duration>,
546 )>>()
547 .into_iter()
548 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
549 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
550 (rpc_contact_info, snapshot_hash, rpc_client)
551 })
552 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
553 );
554 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
555 }
556}
557
558#[allow(clippy::too_many_arguments)]
559pub fn rpc_bootstrap(
560 node: &Node,
561 identity_keypair: &Arc<Keypair>,
562 ledger_path: &Path,
563 vote_account: &Pubkey,
564 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
565 cluster_entrypoints: &[ContactInfo],
566 validator_config: &mut ValidatorConfig,
567 bootstrap_config: RpcBootstrapConfig,
568 do_port_check: bool,
569 use_progress_bar: bool,
570 maximum_local_snapshot_age: Slot,
571 should_check_duplicate_instance: bool,
572 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
573 minimal_snapshot_download_speed: f32,
574 maximum_snapshot_download_abort: u64,
575 socket_addr_space: SocketAddrSpace,
576) {
577 if do_port_check {
578 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
579 order.shuffle(&mut thread_rng());
580 if order.into_iter().all(|i| {
581 !verify_reachable_ports(
582 node,
583 &cluster_entrypoints[i],
584 validator_config,
585 &socket_addr_space,
586 )
587 }) {
588 exit(1);
589 }
590 }
591
592 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
593 return;
594 }
595
596 let total_snapshot_download_time = Instant::now();
597 let mut get_rpc_nodes_time = Duration::new(0, 0);
598 let mut snapshot_download_time = Duration::new(0, 0);
599 let mut blacklisted_rpc_nodes = HashSet::new();
600 let mut gossip = None;
601 let mut vetted_rpc_nodes = vec![];
602 let mut download_abort_count = 0;
603 loop {
604 if gossip.is_none() {
605 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
606
607 gossip = Some(start_gossip_node(
608 identity_keypair.clone(),
609 cluster_entrypoints,
610 validator_config.known_validators.clone(),
611 ledger_path,
612 &node
613 .info
614 .gossip()
615 .expect("Operator must spin up node with valid gossip address"),
616 node.sockets.gossip.clone(),
617 validator_config
618 .expected_shred_version
619 .expect("expected_shred_version should not be None"),
620 validator_config.gossip_validators.clone(),
621 should_check_duplicate_instance,
622 socket_addr_space,
623 ));
624 }
625
626 let get_rpc_nodes_start = Instant::now();
627 get_vetted_rpc_nodes(
628 &mut vetted_rpc_nodes,
629 &gossip.as_ref().unwrap().0,
630 validator_config,
631 &mut blacklisted_rpc_nodes,
632 &bootstrap_config,
633 );
634 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
635 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
636
637 let snapshot_download_start = Instant::now();
638 let download_result = attempt_download_genesis_and_snapshot(
639 &rpc_contact_info,
640 ledger_path,
641 validator_config,
642 &bootstrap_config,
643 use_progress_bar,
644 &mut gossip,
645 &rpc_client,
646 maximum_local_snapshot_age,
647 start_progress,
648 minimal_snapshot_download_speed,
649 maximum_snapshot_download_abort,
650 &mut download_abort_count,
651 snapshot_hash,
652 identity_keypair,
653 vote_account,
654 authorized_voter_keypairs.clone(),
655 );
656 snapshot_download_time += snapshot_download_start.elapsed();
657 match download_result {
658 Ok(()) => break,
659 Err(err) => {
660 fail_rpc_node(
661 err,
662 &validator_config.known_validators,
663 rpc_contact_info.pubkey(),
664 &mut blacklisted_rpc_nodes,
665 );
666 }
667 }
668 }
669
670 if let Some(gossip) = gossip.take() {
671 shutdown_gossip_service(gossip);
672 }
673
674 datapoint_info!(
675 "bootstrap-snapshot-download",
676 (
677 "total_time_secs",
678 total_snapshot_download_time.elapsed().as_secs(),
679 i64
680 ),
681 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
682 (
683 "snapshot_download_time_secs",
684 snapshot_download_time.as_secs(),
685 i64
686 ),
687 ("download_abort_count", download_abort_count, i64),
688 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
689 );
690}
691
692fn get_rpc_nodes(
696 cluster_info: &ClusterInfo,
697 validator_config: &ValidatorConfig,
698 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
699 bootstrap_config: &RpcBootstrapConfig,
700) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
701 let mut blacklist_timeout = Instant::now();
702 let mut get_rpc_peers_timout = Instant::now();
703 let mut newer_cluster_snapshot_timeout = None;
704 let mut retry_reason = None;
705 loop {
706 std::thread::sleep(Duration::from_secs(1));
708 info!("\n{}", cluster_info.rpc_info_trace());
709
710 let rpc_peers = get_rpc_peers(
711 cluster_info,
712 validator_config,
713 blacklisted_rpc_nodes,
714 &blacklist_timeout,
715 &mut retry_reason,
716 bootstrap_config,
717 );
718 if rpc_peers.is_empty() {
719 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
720 return Err(GetRpcNodeError::NoRpcPeersFound);
721 }
722 continue;
723 }
724
725 blacklist_timeout = Instant::now();
727 get_rpc_peers_timout = Instant::now();
728 if bootstrap_config.no_snapshot_fetch {
729 let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
730 return Ok(vec![GetRpcNodeResult {
731 rpc_contact_info: random_peer.clone(),
732 snapshot_hash: None,
733 }]);
734 }
735
736 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
737 .as_ref()
738 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
739 .unwrap_or(true)
740 {
741 KnownValidatorsToWaitFor::All
742 } else {
743 KnownValidatorsToWaitFor::Any
744 };
745 let peer_snapshot_hashes = get_peer_snapshot_hashes(
746 cluster_info,
747 &rpc_peers,
748 validator_config.known_validators.as_ref(),
749 known_validators_to_wait_for,
750 bootstrap_config.incremental_snapshot_fetch,
751 );
752 if peer_snapshot_hashes.is_empty() {
753 match newer_cluster_snapshot_timeout {
754 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
755 Some(newer_cluster_snapshot_timeout) => {
756 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
757 return Err(GetRpcNodeError::NoNewerSnapshots);
758 }
759 }
760 }
761 retry_reason = Some("No snapshots available".to_owned());
762 continue;
763 } else {
764 let rpc_peers = peer_snapshot_hashes
765 .iter()
766 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
767 .collect::<Vec<_>>();
768 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
769 info!(
770 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
771 final_snapshot_hash
772 .incr
773 .map(|(slot, _hash)| slot)
774 .unwrap_or(final_snapshot_hash.full.0),
775 rpc_peers.len(),
776 if rpc_peers.len() > 1 { "s" } else { "" },
777 rpc_peers,
778 );
779 let rpc_node_results = peer_snapshot_hashes
780 .iter()
781 .map(|peer_snapshot_hash| GetRpcNodeResult {
782 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
783 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
784 })
785 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
786 .collect();
787 return Ok(rpc_node_results);
788 }
789 }
790}
791
792fn get_highest_local_snapshot_hash(
795 full_snapshot_archives_dir: impl AsRef<Path>,
796 incremental_snapshot_archives_dir: impl AsRef<Path>,
797 incremental_snapshot_fetch: bool,
798) -> Option<(Slot, Hash)> {
799 snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
800 .and_then(|full_snapshot_info| {
801 if incremental_snapshot_fetch {
802 snapshot_paths::get_highest_incremental_snapshot_archive_info(
803 incremental_snapshot_archives_dir,
804 full_snapshot_info.slot(),
805 )
806 .map(|incremental_snapshot_info| {
807 (
808 incremental_snapshot_info.slot(),
809 *incremental_snapshot_info.hash(),
810 )
811 })
812 } else {
813 None
814 }
815 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
816 })
817 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
818}
819
820fn get_peer_snapshot_hashes(
827 cluster_info: &ClusterInfo,
828 rpc_peers: &[ContactInfo],
829 known_validators: Option<&HashSet<Pubkey>>,
830 known_validators_to_wait_for: KnownValidatorsToWaitFor,
831 incremental_snapshot_fetch: bool,
832) -> Vec<PeerSnapshotHash> {
833 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
834 if let Some(known_validators) = known_validators {
835 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
836 cluster_info,
837 known_validators,
838 known_validators_to_wait_for,
839 );
840 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
841 &known_snapshot_hashes,
842 &mut peer_snapshot_hashes,
843 );
844 }
845 if incremental_snapshot_fetch {
846 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
854 &mut peer_snapshot_hashes,
855 );
856 }
857 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
858
859 peer_snapshot_hashes
860}
861
862type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
865
866fn get_snapshot_hashes_from_known_validators(
877 cluster_info: &ClusterInfo,
878 known_validators: &HashSet<Pubkey>,
879 known_validators_to_wait_for: KnownValidatorsToWaitFor,
880) -> KnownSnapshotHashes {
881 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
883
884 if !do_known_validators_have_all_snapshot_hashes(
885 known_validators,
886 known_validators_to_wait_for,
887 get_snapshot_hashes_for_node,
888 ) {
889 debug!(
890 "Snapshot hashes have not been discovered from known validators. This likely means \
891 the gossip tables are not fully populated. We will sleep and retry..."
892 );
893 return KnownSnapshotHashes::default();
894 }
895
896 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
897}
898
899fn do_known_validators_have_all_snapshot_hashes<'a>(
908 known_validators: impl IntoIterator<Item = &'a Pubkey>,
909 known_validators_to_wait_for: KnownValidatorsToWaitFor,
910 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
911) -> bool {
912 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
913
914 match known_validators_to_wait_for {
915 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
916 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
917 }
918}
919
920#[derive(Debug, Copy, Clone, Eq, PartialEq)]
923enum KnownValidatorsToWaitFor {
924 All,
925 Any,
926}
927
928fn build_known_snapshot_hashes<'a>(
934 nodes: impl IntoIterator<Item = &'a Pubkey>,
935 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
936) -> KnownSnapshotHashes {
937 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
938
939 fn is_any_same_slot_and_different_hash<'a>(
942 needle: &(Slot, Hash),
943 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
944 ) -> bool {
945 haystack
946 .into_iter()
947 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
948 }
949
950 'to_next_node: for node in nodes {
951 let Some(SnapshotHash {
952 full: full_snapshot_hash,
953 incr: incremental_snapshot_hash,
954 }) = get_snapshot_hashes_for_node(node)
955 else {
956 continue 'to_next_node;
957 };
958
959 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
964 warn!(
965 "Ignoring all snapshot hashes from node {node} since we've seen a different full \
966 snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
967 );
968 debug!(
969 "known full snapshot hashes: {:#?}",
970 known_snapshot_hashes.keys(),
971 );
972 continue 'to_next_node;
973 }
974
975 let known_incremental_snapshot_hashes =
979 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
980
981 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
982 if is_any_same_slot_and_different_hash(
987 &incremental_snapshot_hash,
988 known_incremental_snapshot_hashes.iter(),
989 ) {
990 warn!(
991 "Ignoring incremental snapshot hash from node {node} since we've seen a \
992 different incremental snapshot hash with this slot. full snapshot hash: \
993 {full_snapshot_hash:?}, incremental snapshot hash: \
994 {incremental_snapshot_hash:?}"
995 );
996 debug!(
997 "known incremental snapshot hashes based on this slot: {:#?}",
998 known_incremental_snapshot_hashes.iter(),
999 );
1000 continue 'to_next_node;
1001 }
1002
1003 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1004 };
1005 }
1006
1007 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1008 known_snapshot_hashes
1009}
1010
1011fn get_eligible_peer_snapshot_hashes(
1017 cluster_info: &ClusterInfo,
1018 rpc_peers: &[ContactInfo],
1019) -> Vec<PeerSnapshotHash> {
1020 let peer_snapshot_hashes = rpc_peers
1021 .iter()
1022 .flat_map(|rpc_peer| {
1023 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1024 PeerSnapshotHash {
1025 rpc_contact_info: rpc_peer.clone(),
1026 snapshot_hash,
1027 }
1028 })
1029 })
1030 .collect();
1031
1032 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1033 peer_snapshot_hashes
1034}
1035
1036fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1038 known_snapshot_hashes: &KnownSnapshotHashes,
1039 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1040) {
1041 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1042 known_snapshot_hashes
1043 .get(&peer_snapshot_hash.snapshot_hash.full)
1044 .map(|known_incremental_hashes| {
1045 if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1046 true
1049 } else {
1050 known_incremental_hashes
1051 .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1052 }
1053 })
1054 .unwrap_or(false)
1055 });
1056
1057 trace!(
1058 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1059 );
1060}
1061
1062fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1064 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1065) {
1066 let highest_full_snapshot_hash = peer_snapshot_hashes
1067 .iter()
1068 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1069 .max_by_key(|(slot, _hash)| *slot);
1070 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1071 return;
1075 };
1076
1077 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1078 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1079 });
1080
1081 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1082}
1083
1084fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1086 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1087) {
1088 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1089 .iter()
1090 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1091 .max_by_key(|(slot, _hash)| *slot);
1092
1093 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1094 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1095 });
1096
1097 trace!(
1098 "retain peer snapshot hashes with highest incremental snapshot slot: \
1099 {peer_snapshot_hashes:?}"
1100 );
1101}
1102
1103#[allow(clippy::too_many_arguments)]
1105fn download_snapshots(
1106 validator_config: &ValidatorConfig,
1107 bootstrap_config: &RpcBootstrapConfig,
1108 use_progress_bar: bool,
1109 maximum_local_snapshot_age: Slot,
1110 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1111 minimal_snapshot_download_speed: f32,
1112 maximum_snapshot_download_abort: u64,
1113 download_abort_count: &mut u64,
1114 snapshot_hash: Option<SnapshotHash>,
1115 rpc_contact_info: &ContactInfo,
1116) -> Result<(), String> {
1117 if snapshot_hash.is_none() {
1118 return Ok(());
1119 }
1120 let SnapshotHash {
1121 full: full_snapshot_hash,
1122 incr: incremental_snapshot_hash,
1123 } = snapshot_hash.unwrap();
1124 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1125 let incremental_snapshot_archives_dir = &validator_config
1126 .snapshot_config
1127 .incremental_snapshot_archives_dir;
1128
1129 if should_use_local_snapshot(
1131 full_snapshot_archives_dir,
1132 incremental_snapshot_archives_dir,
1133 maximum_local_snapshot_age,
1134 full_snapshot_hash,
1135 incremental_snapshot_hash,
1136 bootstrap_config.incremental_snapshot_fetch,
1137 ) {
1138 return Ok(());
1139 }
1140
1141 if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
1143 .into_iter()
1144 .any(|snapshot_archive| {
1145 snapshot_archive.slot() == full_snapshot_hash.0
1146 && snapshot_archive.hash().0 == full_snapshot_hash.1
1147 })
1148 {
1149 info!(
1150 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1151 full_snapshot_hash.0, full_snapshot_hash.1
1152 );
1153 } else {
1154 download_snapshot(
1155 validator_config,
1156 bootstrap_config,
1157 use_progress_bar,
1158 start_progress,
1159 minimal_snapshot_download_speed,
1160 maximum_snapshot_download_abort,
1161 download_abort_count,
1162 rpc_contact_info,
1163 full_snapshot_hash,
1164 SnapshotKind::FullSnapshot,
1165 )?;
1166 }
1167
1168 if bootstrap_config.incremental_snapshot_fetch {
1169 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1171 if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1172 .into_iter()
1173 .any(|snapshot_archive| {
1174 snapshot_archive.slot() == incremental_snapshot_hash.0
1175 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1176 && snapshot_archive.base_slot() == full_snapshot_hash.0
1177 })
1178 {
1179 info!(
1180 "Incremental snapshot archive already exists locally. Skipping download. \
1181 slot: {}, hash: {}",
1182 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1183 );
1184 } else {
1185 download_snapshot(
1186 validator_config,
1187 bootstrap_config,
1188 use_progress_bar,
1189 start_progress,
1190 minimal_snapshot_download_speed,
1191 maximum_snapshot_download_abort,
1192 download_abort_count,
1193 rpc_contact_info,
1194 incremental_snapshot_hash,
1195 SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1196 )?;
1197 }
1198 }
1199 }
1200
1201 Ok(())
1202}
1203
1204#[allow(clippy::too_many_arguments)]
1206fn download_snapshot(
1207 validator_config: &ValidatorConfig,
1208 bootstrap_config: &RpcBootstrapConfig,
1209 use_progress_bar: bool,
1210 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1211 minimal_snapshot_download_speed: f32,
1212 maximum_snapshot_download_abort: u64,
1213 download_abort_count: &mut u64,
1214 rpc_contact_info: &ContactInfo,
1215 desired_snapshot_hash: (Slot, Hash),
1216 snapshot_kind: SnapshotKind,
1217) -> Result<(), String> {
1218 let maximum_full_snapshot_archives_to_retain = validator_config
1219 .snapshot_config
1220 .maximum_full_snapshot_archives_to_retain;
1221 let maximum_incremental_snapshot_archives_to_retain = validator_config
1222 .snapshot_config
1223 .maximum_incremental_snapshot_archives_to_retain;
1224 let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
1225 let incremental_snapshot_archives_dir = &validator_config
1226 .snapshot_config
1227 .incremental_snapshot_archives_dir;
1228
1229 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1230 slot: desired_snapshot_hash.0,
1231 rpc_addr: rpc_contact_info
1232 .rpc()
1233 .ok_or_else(|| String::from("Invalid RPC address"))?,
1234 };
1235 let desired_snapshot_hash = (
1236 desired_snapshot_hash.0,
1237 agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1238 );
1239 download_snapshot_archive(
1240 &rpc_contact_info
1241 .rpc()
1242 .ok_or_else(|| String::from("Invalid RPC address"))?,
1243 full_snapshot_archives_dir,
1244 incremental_snapshot_archives_dir,
1245 desired_snapshot_hash,
1246 snapshot_kind,
1247 maximum_full_snapshot_archives_to_retain,
1248 maximum_incremental_snapshot_archives_to_retain,
1249 use_progress_bar,
1250 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1251 debug!("Download progress: {download_progress:?}");
1252 if download_progress.last_throughput < minimal_snapshot_download_speed
1253 && download_progress.notification_count <= 1
1254 && download_progress.percentage_done <= 2_f32
1255 && download_progress.estimated_remaining_time > 60_f32
1256 && *download_abort_count < maximum_snapshot_download_abort
1257 {
1258 if let Some(ref known_validators) = validator_config.known_validators {
1259 if known_validators.contains(rpc_contact_info.pubkey())
1260 && known_validators.len() == 1
1261 && bootstrap_config.only_known_rpc
1262 {
1263 warn!(
1264 "The snapshot download is too slow, throughput: {} < min speed {} \
1265 bytes/sec, but will NOT abort and try a different node as it is the \
1266 only known validator and the --only-known-rpc flag is set. Abort \
1267 count: {}, Progress detail: {:?}",
1268 download_progress.last_throughput,
1269 minimal_snapshot_download_speed,
1270 download_abort_count,
1271 download_progress,
1272 );
1273 return true; }
1275 }
1276 warn!(
1277 "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1278 will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1279 download_progress.last_throughput,
1280 minimal_snapshot_download_speed,
1281 download_abort_count,
1282 download_progress,
1283 );
1284 *download_abort_count += 1;
1285 false
1286 } else {
1287 true
1288 }
1289 })),
1290 )
1291}
1292
1293fn should_use_local_snapshot(
1296 full_snapshot_archives_dir: &Path,
1297 incremental_snapshot_archives_dir: &Path,
1298 maximum_local_snapshot_age: Slot,
1299 full_snapshot_hash: (Slot, Hash),
1300 incremental_snapshot_hash: Option<(Slot, Hash)>,
1301 incremental_snapshot_fetch: bool,
1302) -> bool {
1303 let cluster_snapshot_slot = incremental_snapshot_hash
1304 .map(|(slot, _)| slot)
1305 .unwrap_or(full_snapshot_hash.0);
1306
1307 match get_highest_local_snapshot_hash(
1308 full_snapshot_archives_dir,
1309 incremental_snapshot_archives_dir,
1310 incremental_snapshot_fetch,
1311 ) {
1312 None => {
1313 info!(
1314 "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1315 local snapshot."
1316 );
1317 false
1318 }
1319 Some((local_snapshot_slot, _)) => {
1320 if local_snapshot_slot
1321 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1322 {
1323 info!(
1324 "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1325 a snapshot for slot {cluster_snapshot_slot}."
1326 );
1327 true
1328 } else {
1329 info!(
1330 "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1331 newer snapshot for slot {cluster_snapshot_slot}."
1332 );
1333 false
1334 }
1335 }
1336 }
1337}
1338
1339fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1341 cluster_info.get_snapshot_hashes_for_node(node).map(
1342 |crds_data::SnapshotHashes {
1343 full, incremental, ..
1344 }| {
1345 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1346 SnapshotHash {
1347 full,
1348 incr: highest_incremental_snapshot_hash,
1349 }
1350 },
1351 )
1352}
1353
1354#[cfg(test)]
1355mod tests {
1356 use super::*;
1357
1358 impl PeerSnapshotHash {
1359 fn new(
1360 rpc_contact_info: ContactInfo,
1361 full_snapshot_hash: (Slot, Hash),
1362 incremental_snapshot_hash: Option<(Slot, Hash)>,
1363 ) -> Self {
1364 Self {
1365 rpc_contact_info,
1366 snapshot_hash: SnapshotHash {
1367 full: full_snapshot_hash,
1368 incr: incremental_snapshot_hash,
1369 },
1370 }
1371 }
1372 }
1373
1374 fn default_contact_info_for_tests() -> ContactInfo {
1375 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1376 }
1377
1378 #[test]
1379 fn test_build_known_snapshot_hashes() {
1380 agave_logger::setup();
1381 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1382 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1383
1384 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1385 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1386
1387 let oracle = {
1389 let mut oracle = HashMap::new();
1390
1391 for (full, incr) in [
1392 (full_snapshot_hash1, None),
1394 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1396 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1398 (full_snapshot_hash2, None),
1400 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1401 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1402 ] {
1403 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1405 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1406 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1407 }
1408
1409 oracle.insert(Pubkey::new_unique(), None);
1411 oracle.insert(Pubkey::new_unique(), None);
1412 oracle.insert(Pubkey::new_unique(), None);
1413
1414 oracle
1415 };
1416
1417 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1418
1419 let known_snapshot_hashes =
1420 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1421
1422 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1425 assert_eq!(known_full_snapshot_hashes.len(), 1);
1426 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1427
1428 let known_incremental_snapshot_hashes =
1430 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1431 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1432 let known_incremental_snapshot_hash =
1433 known_incremental_snapshot_hashes.iter().next().unwrap();
1434
1435 assert!(
1442 known_full_snapshot_hash == &full_snapshot_hash1
1443 || known_full_snapshot_hash == &full_snapshot_hash2
1444 );
1445 assert!(
1446 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1447 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1448 );
1449 }
1450
1451 #[test]
1452 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1453 let known_snapshot_hashes: KnownSnapshotHashes = [
1454 (
1455 (200_000, Hash::new_unique()),
1456 [
1457 (200_200, Hash::new_unique()),
1458 (200_400, Hash::new_unique()),
1459 (200_600, Hash::new_unique()),
1460 (200_800, Hash::new_unique()),
1461 ]
1462 .iter()
1463 .cloned()
1464 .collect(),
1465 ),
1466 (
1467 (300_000, Hash::new_unique()),
1468 [
1469 (300_200, Hash::new_unique()),
1470 (300_400, Hash::new_unique()),
1471 (300_600, Hash::new_unique()),
1472 ]
1473 .iter()
1474 .cloned()
1475 .collect(),
1476 ),
1477 ]
1478 .iter()
1479 .cloned()
1480 .collect();
1481
1482 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1483 let known_full_snapshot_hash = known_snapshot_hash.0;
1484 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1485
1486 let contact_info = default_contact_info_for_tests();
1487 let peer_snapshot_hashes = vec![
1488 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1490 PeerSnapshotHash::new(
1492 contact_info.clone(),
1493 (111_000, Hash::default()),
1494 Some((111_111, Hash::default())),
1495 ),
1496 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1498 PeerSnapshotHash::new(
1500 contact_info.clone(),
1501 (111_000, Hash::default()),
1502 Some(*known_incremental_snapshot_hash),
1503 ),
1504 PeerSnapshotHash::new(
1506 contact_info.clone(),
1507 *known_full_snapshot_hash,
1508 Some((111_111, Hash::default())),
1509 ),
1510 PeerSnapshotHash::new(
1512 contact_info.clone(),
1513 *known_full_snapshot_hash,
1514 Some(*known_incremental_snapshot_hash),
1515 ),
1516 ];
1517
1518 let expected = vec![
1519 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1520 PeerSnapshotHash::new(
1521 contact_info,
1522 *known_full_snapshot_hash,
1523 Some(*known_incremental_snapshot_hash),
1524 ),
1525 ];
1526 let mut actual = peer_snapshot_hashes;
1527 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1528 &known_snapshot_hashes,
1529 &mut actual,
1530 );
1531 assert_eq!(expected, actual);
1532 }
1533
1534 #[test]
1535 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1536 let contact_info = default_contact_info_for_tests();
1537 let peer_snapshot_hashes = vec![
1538 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1540 PeerSnapshotHash::new(
1541 contact_info.clone(),
1542 (100_000, Hash::default()),
1543 Some((100_100, Hash::default())),
1544 ),
1545 PeerSnapshotHash::new(
1546 contact_info.clone(),
1547 (100_000, Hash::default()),
1548 Some((100_200, Hash::default())),
1549 ),
1550 PeerSnapshotHash::new(
1551 contact_info.clone(),
1552 (100_000, Hash::default()),
1553 Some((100_300, Hash::default())),
1554 ),
1555 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1557 PeerSnapshotHash::new(
1558 contact_info.clone(),
1559 (200_000, Hash::default()),
1560 Some((200_100, Hash::default())),
1561 ),
1562 PeerSnapshotHash::new(
1563 contact_info.clone(),
1564 (200_000, Hash::default()),
1565 Some((200_200, Hash::default())),
1566 ),
1567 PeerSnapshotHash::new(
1568 contact_info.clone(),
1569 (200_000, Hash::default()),
1570 Some((200_300, Hash::default())),
1571 ),
1572 ];
1573
1574 let expected = vec![
1575 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1576 PeerSnapshotHash::new(
1577 contact_info.clone(),
1578 (200_000, Hash::default()),
1579 Some((200_100, Hash::default())),
1580 ),
1581 PeerSnapshotHash::new(
1582 contact_info.clone(),
1583 (200_000, Hash::default()),
1584 Some((200_200, Hash::default())),
1585 ),
1586 PeerSnapshotHash::new(
1587 contact_info,
1588 (200_000, Hash::default()),
1589 Some((200_300, Hash::default())),
1590 ),
1591 ];
1592 let mut actual = peer_snapshot_hashes;
1593 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1594 assert_eq!(expected, actual);
1595 }
1596
1597 #[test]
1598 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1599 let contact_info = default_contact_info_for_tests();
1600 let peer_snapshot_hashes = vec![
1601 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1602 PeerSnapshotHash::new(
1603 contact_info.clone(),
1604 (200_000, Hash::default()),
1605 Some((200_100, Hash::default())),
1606 ),
1607 PeerSnapshotHash::new(
1608 contact_info.clone(),
1609 (200_000, Hash::default()),
1610 Some((200_200, Hash::default())),
1611 ),
1612 PeerSnapshotHash::new(
1613 contact_info.clone(),
1614 (200_000, Hash::default()),
1615 Some((200_300, Hash::default())),
1616 ),
1617 PeerSnapshotHash::new(
1618 contact_info.clone(),
1619 (200_000, Hash::default()),
1620 Some((200_010, Hash::default())),
1621 ),
1622 PeerSnapshotHash::new(
1623 contact_info.clone(),
1624 (200_000, Hash::default()),
1625 Some((200_020, Hash::default())),
1626 ),
1627 PeerSnapshotHash::new(
1628 contact_info.clone(),
1629 (200_000, Hash::default()),
1630 Some((200_030, Hash::default())),
1631 ),
1632 ];
1633
1634 let expected = vec![PeerSnapshotHash::new(
1635 contact_info,
1636 (200_000, Hash::default()),
1637 Some((200_300, Hash::default())),
1638 )];
1639 let mut actual = peer_snapshot_hashes;
1640 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1641 assert_eq!(expected, actual);
1642 }
1643
1644 #[test]
1647 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1648 let contact_info = default_contact_info_for_tests();
1649 let peer_snapshot_hashes = vec![
1650 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1651 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1652 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1653 ];
1654
1655 let expected = peer_snapshot_hashes.clone();
1656 let mut actual = peer_snapshot_hashes;
1657 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1658 assert_eq!(expected, actual);
1659 }
1660
1661 #[test]
1664 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1665 {
1666 let mut actual = vec![];
1667 let expected = actual.clone();
1668 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1669 assert_eq!(expected, actual);
1670 }
1671 {
1672 let mut actual = vec![];
1673 let expected = actual.clone();
1674 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1675 assert_eq!(expected, actual);
1676 }
1677 }
1678}