1use {
2 itertools::Itertools,
3 log::*,
4 miraland_core::validator::{ValidatorConfig, ValidatorStartProgress},
5 miraland_download_utils::{download_snapshot_archive, DownloadProgressRecord},
6 miraland_genesis_utils::download_then_check_genesis_hash,
7 miraland_gossip::{
8 cluster_info::{ClusterInfo, Node},
9 contact_info::Protocol,
10 crds_value,
11 gossip_service::GossipService,
12 legacy_contact_info::LegacyContactInfo as ContactInfo,
13 },
14 miraland_metrics::datapoint_info,
15 miraland_rpc_client::rpc_client::RpcClient,
16 miraland_streamer::socket::SocketAddrSpace,
17 rand::{seq::SliceRandom, thread_rng, Rng},
18 rayon::prelude::*,
19 miraland_runtime::{
20 snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotKind,
21 snapshot_utils,
22 },
23 miraland_sdk::{
24 clock::Slot,
25 commitment_config::CommitmentConfig,
26 hash::Hash,
27 pubkey::Pubkey,
28 signature::{Keypair, Signer},
29 },
30 std::{
31 collections::{hash_map::RandomState, HashMap, HashSet},
32 net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
33 path::Path,
34 process::exit,
35 sync::{
36 atomic::{AtomicBool, Ordering},
37 Arc, RwLock,
38 },
39 time::{Duration, Instant},
40 },
41 thiserror::Error,
42};
43
44const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
48const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
51const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
54const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
56
57pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
58
59pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
60
61#[derive(Debug)]
62pub struct RpcBootstrapConfig {
63 pub no_genesis_fetch: bool,
64 pub no_snapshot_fetch: bool,
65 pub only_known_rpc: bool,
66 pub max_genesis_archive_unpacked_size: u64,
67 pub check_vote_account: Option<String>,
68 pub incremental_snapshot_fetch: bool,
69}
70
71fn verify_reachable_ports(
72 node: &Node,
73 cluster_entrypoint: &ContactInfo,
74 validator_config: &ValidatorConfig,
75 socket_addr_space: &SocketAddrSpace,
76) -> bool {
77 let verify_address = |addr: &Option<SocketAddr>| -> bool {
78 addr.as_ref()
79 .map(|addr| socket_addr_space.check(addr))
80 .unwrap_or_default()
81 };
82 let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
83
84 if verify_address(&node.info.serve_repair(Protocol::UDP).ok()) {
85 udp_sockets.push(&node.sockets.serve_repair);
86 }
87 if verify_address(&node.info.tpu(Protocol::UDP).ok()) {
88 udp_sockets.extend(node.sockets.tpu.iter());
89 udp_sockets.push(&node.sockets.tpu_quic);
90 }
91 if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) {
92 udp_sockets.extend(node.sockets.tpu_forwards.iter());
93 udp_sockets.push(&node.sockets.tpu_forwards_quic);
94 }
95 if verify_address(&node.info.tpu_vote().ok()) {
96 udp_sockets.extend(node.sockets.tpu_vote.iter());
97 }
98 if verify_address(&node.info.tvu(Protocol::UDP).ok()) {
99 udp_sockets.extend(node.sockets.tvu.iter());
100 udp_sockets.extend(node.sockets.broadcast.iter());
101 udp_sockets.extend(node.sockets.retransmit_sockets.iter());
102 }
103
104 let mut tcp_listeners = vec![];
105 if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
106 for (purpose, bind_addr, public_addr) in &[
107 ("RPC", rpc_addr, node.info.rpc()),
108 ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
109 ] {
110 if verify_address(&public_addr.as_ref().ok().copied()) {
111 tcp_listeners.push((
112 bind_addr.port(),
113 TcpListener::bind(bind_addr).unwrap_or_else(|err| {
114 error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
115 exit(1);
116 }),
117 ));
118 }
119 }
120 }
121
122 if let Some(ip_echo) = &node.sockets.ip_echo {
123 let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
124 tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo));
125 }
126
127 miraland_net_utils::verify_reachable_ports(
128 &cluster_entrypoint.gossip().unwrap(),
129 tcp_listeners,
130 &udp_sockets,
131 )
132}
133
134fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
135 if let Some(known_validators) = known_validators {
136 known_validators.contains(id)
137 } else {
138 false
139 }
140}
141
142fn start_gossip_node(
143 identity_keypair: Arc<Keypair>,
144 cluster_entrypoints: &[ContactInfo],
145 ledger_path: &Path,
146 gossip_addr: &SocketAddr,
147 gossip_socket: UdpSocket,
148 expected_shred_version: Option<u16>,
149 gossip_validators: Option<HashSet<Pubkey>>,
150 should_check_duplicate_instance: bool,
151 socket_addr_space: SocketAddrSpace,
152) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
153 let contact_info = ClusterInfo::gossip_contact_info(
154 identity_keypair.pubkey(),
155 *gossip_addr,
156 expected_shred_version.unwrap_or(0),
157 );
158 let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
159 cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
160 cluster_info.restore_contact_info(ledger_path, 0);
161 let cluster_info = Arc::new(cluster_info);
162
163 let gossip_exit_flag = Arc::new(AtomicBool::new(false));
164 let gossip_service = GossipService::new(
165 &cluster_info,
166 None,
167 gossip_socket,
168 gossip_validators,
169 should_check_duplicate_instance,
170 None,
171 gossip_exit_flag.clone(),
172 );
173 (cluster_info, gossip_exit_flag, gossip_service)
174}
175
176fn get_rpc_peers(
177 cluster_info: &ClusterInfo,
178 cluster_entrypoints: &[ContactInfo],
179 validator_config: &ValidatorConfig,
180 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
181 blacklist_timeout: &Instant,
182 retry_reason: &mut Option<String>,
183 bootstrap_config: &RpcBootstrapConfig,
184) -> Vec<ContactInfo> {
185 let shred_version = validator_config
186 .expected_shred_version
187 .unwrap_or_else(|| cluster_info.my_shred_version());
188 if shred_version == 0 {
189 let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
190 cluster_entrypoint
191 .gossip()
192 .ok()
193 .and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr))
194 .map_or(false, |entrypoint| entrypoint.shred_version() == 0)
195 });
196
197 if all_zero_shred_versions {
198 eprintln!("Entrypoint shred version is zero. Restart with --expected-shred-version");
199 exit(1);
200 }
201 info!("Waiting to adopt entrypoint shred version...");
202 return vec![];
203 }
204
205 info!(
206 "Searching for an RPC service with shred version {shred_version}{}...",
207 retry_reason
208 .as_ref()
209 .map(|s| format!(" (Retrying: {s})"))
210 .unwrap_or_default()
211 );
212
213 let mut rpc_peers = cluster_info
214 .all_rpc_peers()
215 .into_iter()
216 .filter(|contact_info| contact_info.shred_version() == shred_version)
217 .collect::<Vec<_>>();
218
219 if bootstrap_config.only_known_rpc {
220 rpc_peers.retain(|rpc_peer| {
221 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
222 });
223 }
224
225 let rpc_peers_total = rpc_peers.len();
226
227 let rpc_peers: Vec<_> = rpc_peers
229 .into_iter()
230 .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
231 .collect();
232 let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
233 let rpc_known_peers = rpc_peers
234 .iter()
235 .filter(|rpc_peer| {
236 is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
237 })
238 .count();
239
240 info!(
241 "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
242 {rpc_peers_blacklisted} blacklisted"
243 );
244
245 if rpc_peers_blacklisted == rpc_peers_total {
246 *retry_reason = if !blacklisted_rpc_nodes.is_empty()
247 && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
248 {
249 blacklisted_rpc_nodes.clear();
252 Some("Blacklist timeout expired".to_owned())
253 } else {
254 Some("Wait for known rpc peers".to_owned())
255 };
256 return vec![];
257 }
258 rpc_peers
259}
260
261fn check_vote_account(
262 rpc_client: &RpcClient,
263 identity_pubkey: &Pubkey,
264 vote_account_address: &Pubkey,
265 authorized_voter_pubkeys: &[Pubkey],
266) -> Result<(), String> {
267 let vote_account = rpc_client
268 .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
269 .map_err(|err| format!("failed to fetch vote account: {err}"))?
270 .value
271 .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
272
273 if vote_account.owner != miraland_vote_program::id() {
274 return Err(format!(
275 "not a vote account (owned by {}): {}",
276 vote_account.owner, vote_account_address
277 ));
278 }
279
280 let identity_account = rpc_client
281 .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
282 .map_err(|err| format!("failed to fetch identity account: {err}"))?
283 .value
284 .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
285
286 let vote_state = miraland_vote_program::vote_state::from(&vote_account);
287 if let Some(vote_state) = vote_state {
288 if vote_state.authorized_voters().is_empty() {
289 return Err("Vote account not yet initialized".to_string());
290 }
291
292 if vote_state.node_pubkey != *identity_pubkey {
293 return Err(format!(
294 "vote account's identity ({}) does not match the validator's identity {}).",
295 vote_state.node_pubkey, identity_pubkey
296 ));
297 }
298
299 for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters().iter() {
300 if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
301 return Err(format!(
302 "authorized voter {vote_account_authorized_voter_pubkey} not available"
303 ));
304 }
305 }
306 } else {
307 return Err(format!(
308 "invalid vote account data for {vote_account_address}"
309 ));
310 }
311
312 if identity_account.lamports <= 1 {
314 return Err(format!(
315 "underfunded identity account ({}): only {} lamports available",
316 identity_pubkey, identity_account.lamports
317 ));
318 }
319
320 Ok(())
321}
322
323#[derive(Error, Debug)]
324pub enum GetRpcNodeError {
325 #[error("Unable to find any RPC peers")]
326 NoRpcPeersFound,
327
328 #[error("Giving up, did not get newer snapshots from the cluster")]
329 NoNewerSnapshots,
330}
331
332#[derive(Debug)]
336struct GetRpcNodeResult {
337 rpc_contact_info: ContactInfo,
338 snapshot_hash: Option<SnapshotHash>,
339}
340
341#[derive(Debug, PartialEq, Eq, Clone)]
343struct PeerSnapshotHash {
344 rpc_contact_info: ContactInfo,
345 snapshot_hash: SnapshotHash,
346}
347
348#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
351pub struct SnapshotHash {
352 full: (Slot, Hash),
353 incr: Option<(Slot, Hash)>,
354}
355
356pub fn fail_rpc_node(
357 err: String,
358 known_validators: &Option<HashSet<Pubkey, RandomState>>,
359 rpc_id: &Pubkey,
360 blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
361) {
362 warn!("{err}");
363 if let Some(ref known_validators) = known_validators {
364 if known_validators.contains(rpc_id) {
365 return;
366 }
367 }
368
369 info!("Excluding {rpc_id} as a future RPC candidate");
370 blacklisted_rpc_nodes.insert(*rpc_id);
371}
372
373fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
374 let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
375 cluster_info.save_contact_info();
376 gossip_exit_flag.store(true, Ordering::Relaxed);
377 gossip_service.join().unwrap();
378}
379
380#[allow(clippy::too_many_arguments)]
381pub fn attempt_download_genesis_and_snapshot(
382 rpc_contact_info: &ContactInfo,
383 ledger_path: &Path,
384 validator_config: &mut ValidatorConfig,
385 bootstrap_config: &RpcBootstrapConfig,
386 use_progress_bar: bool,
387 gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
388 rpc_client: &RpcClient,
389 full_snapshot_archives_dir: &Path,
390 incremental_snapshot_archives_dir: &Path,
391 maximum_local_snapshot_age: Slot,
392 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
393 minimal_snapshot_download_speed: f32,
394 maximum_snapshot_download_abort: u64,
395 download_abort_count: &mut u64,
396 snapshot_hash: Option<SnapshotHash>,
397 identity_keypair: &Arc<Keypair>,
398 vote_account: &Pubkey,
399 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
400) -> Result<(), String> {
401 download_then_check_genesis_hash(
402 &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
403 ledger_path,
404 &mut validator_config.expected_genesis_hash,
405 bootstrap_config.max_genesis_archive_unpacked_size,
406 bootstrap_config.no_genesis_fetch,
407 use_progress_bar,
408 rpc_client,
409 )?;
410
411 if let Some(gossip) = gossip.take() {
412 shutdown_gossip_service(gossip);
413 }
414
415 let rpc_client_slot = rpc_client
416 .get_slot_with_commitment(CommitmentConfig::finalized())
417 .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
418 info!("RPC node root slot: {rpc_client_slot}");
419
420 download_snapshots(
421 full_snapshot_archives_dir,
422 incremental_snapshot_archives_dir,
423 validator_config,
424 bootstrap_config,
425 use_progress_bar,
426 maximum_local_snapshot_age,
427 start_progress,
428 minimal_snapshot_download_speed,
429 maximum_snapshot_download_abort,
430 download_abort_count,
431 snapshot_hash,
432 rpc_contact_info,
433 )?;
434
435 if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
436 let rpc_client = RpcClient::new(url);
437 check_vote_account(
438 &rpc_client,
439 &identity_keypair.pubkey(),
440 vote_account,
441 &authorized_voter_keypairs
442 .read()
443 .unwrap()
444 .iter()
445 .map(|k| k.pubkey())
446 .collect::<Vec<_>>(),
447 )
448 .unwrap_or_else(|err| {
449 error!("{err}");
456 exit(1);
457 });
458 }
459 Ok(())
460}
461
462fn ping(addr: &SocketAddr) -> Option<Duration> {
464 let start = Instant::now();
465 match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
466 Ok(_) => Some(start.elapsed()),
467 Err(_) => None,
468 }
469}
470
471fn get_vetted_rpc_nodes(
475 vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
476 cluster_info: &Arc<ClusterInfo>,
477 cluster_entrypoints: &[ContactInfo],
478 validator_config: &ValidatorConfig,
479 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
480 bootstrap_config: &RpcBootstrapConfig,
481) {
482 while vetted_rpc_nodes.is_empty() {
483 let rpc_node_details = match get_rpc_nodes(
484 cluster_info,
485 cluster_entrypoints,
486 validator_config,
487 blacklisted_rpc_nodes,
488 bootstrap_config,
489 ) {
490 Ok(rpc_node_details) => rpc_node_details,
491 Err(err) => {
492 error!(
493 "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
494 `--no-port-check`, or adjusting `--known-validator ...` arguments as \
495 applicable"
496 );
497 exit(1);
498 }
499 };
500
501 let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
502 vetted_rpc_nodes.extend(
503 rpc_node_details
504 .into_par_iter()
505 .filter_map(|rpc_node_details| {
506 let GetRpcNodeResult {
507 rpc_contact_info,
508 snapshot_hash,
509 } = rpc_node_details;
510
511 info!(
512 "Using RPC service from node {}: {:?}",
513 rpc_contact_info.pubkey(),
514 rpc_contact_info.rpc()
515 );
516
517 let rpc_addr = rpc_contact_info.rpc().ok()?;
518 let ping_time = ping(&rpc_addr);
519
520 let rpc_client =
521 RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
522
523 Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
524 })
525 .filter(
526 |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
527 .get_version()
528 {
529 Ok(rpc_version) => {
530 if let Some(ping_time) = ping_time {
531 info!(
532 "RPC node version: {} Ping: {}ms",
533 rpc_version.miraland_core,
534 ping_time.as_millis()
535 );
536 true
537 } else {
538 fail_rpc_node(
539 "Failed to ping RPC".to_string(),
540 &validator_config.known_validators,
541 rpc_contact_info.pubkey(),
542 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
543 );
544 false
545 }
546 }
547 Err(err) => {
548 fail_rpc_node(
549 format!("Failed to get RPC node version: {err}"),
550 &validator_config.known_validators,
551 rpc_contact_info.pubkey(),
552 &mut newly_blacklisted_rpc_nodes.write().unwrap(),
553 );
554 false
555 }
556 },
557 )
558 .collect::<Vec<(
559 ContactInfo,
560 Option<SnapshotHash>,
561 RpcClient,
562 Option<Duration>,
563 )>>()
564 .into_iter()
565 .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
566 .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
567 (rpc_contact_info, snapshot_hash, rpc_client)
568 })
569 .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
570 );
571 blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
572 }
573}
574
575#[allow(clippy::too_many_arguments)]
576pub fn rpc_bootstrap(
577 node: &Node,
578 identity_keypair: &Arc<Keypair>,
579 ledger_path: &Path,
580 full_snapshot_archives_dir: &Path,
581 incremental_snapshot_archives_dir: &Path,
582 vote_account: &Pubkey,
583 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
584 cluster_entrypoints: &[ContactInfo],
585 validator_config: &mut ValidatorConfig,
586 bootstrap_config: RpcBootstrapConfig,
587 do_port_check: bool,
588 use_progress_bar: bool,
589 maximum_local_snapshot_age: Slot,
590 should_check_duplicate_instance: bool,
591 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
592 minimal_snapshot_download_speed: f32,
593 maximum_snapshot_download_abort: u64,
594 socket_addr_space: SocketAddrSpace,
595) {
596 if do_port_check {
597 let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
598 order.shuffle(&mut thread_rng());
599 if order.into_iter().all(|i| {
600 !verify_reachable_ports(
601 node,
602 &cluster_entrypoints[i],
603 validator_config,
604 &socket_addr_space,
605 )
606 }) {
607 exit(1);
608 }
609 }
610
611 if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
612 return;
613 }
614
615 let total_snapshot_download_time = Instant::now();
616 let mut get_rpc_nodes_time = Duration::new(0, 0);
617 let mut snapshot_download_time = Duration::new(0, 0);
618 let mut blacklisted_rpc_nodes = HashSet::new();
619 let mut gossip = None;
620 let mut vetted_rpc_nodes = vec![];
621 let mut download_abort_count = 0;
622 loop {
623 if gossip.is_none() {
624 *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
625
626 gossip = Some(start_gossip_node(
627 identity_keypair.clone(),
628 cluster_entrypoints,
629 ledger_path,
630 &node
631 .info
632 .gossip()
633 .expect("Operator must spin up node with valid gossip address"),
634 node.sockets.gossip.try_clone().unwrap(),
635 validator_config.expected_shred_version,
636 validator_config.gossip_validators.clone(),
637 should_check_duplicate_instance,
638 socket_addr_space,
639 ));
640 }
641
642 let get_rpc_nodes_start = Instant::now();
643 get_vetted_rpc_nodes(
644 &mut vetted_rpc_nodes,
645 &gossip.as_ref().unwrap().0,
646 cluster_entrypoints,
647 validator_config,
648 &mut blacklisted_rpc_nodes,
649 &bootstrap_config,
650 );
651 let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
652 get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
653
654 let snapshot_download_start = Instant::now();
655 let download_result = attempt_download_genesis_and_snapshot(
656 &rpc_contact_info,
657 ledger_path,
658 validator_config,
659 &bootstrap_config,
660 use_progress_bar,
661 &mut gossip,
662 &rpc_client,
663 full_snapshot_archives_dir,
664 incremental_snapshot_archives_dir,
665 maximum_local_snapshot_age,
666 start_progress,
667 minimal_snapshot_download_speed,
668 maximum_snapshot_download_abort,
669 &mut download_abort_count,
670 snapshot_hash,
671 identity_keypair,
672 vote_account,
673 authorized_voter_keypairs.clone(),
674 );
675 snapshot_download_time += snapshot_download_start.elapsed();
676 match download_result {
677 Ok(()) => break,
678 Err(err) => {
679 fail_rpc_node(
680 err,
681 &validator_config.known_validators,
682 rpc_contact_info.pubkey(),
683 &mut blacklisted_rpc_nodes,
684 );
685 }
686 }
687 }
688
689 if let Some(gossip) = gossip.take() {
690 shutdown_gossip_service(gossip);
691 }
692
693 datapoint_info!(
694 "bootstrap-snapshot-download",
695 (
696 "total_time_secs",
697 total_snapshot_download_time.elapsed().as_secs(),
698 i64
699 ),
700 ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
701 (
702 "snapshot_download_time_secs",
703 snapshot_download_time.as_secs(),
704 i64
705 ),
706 ("download_abort_count", download_abort_count, i64),
707 ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
708 );
709}
710
711fn get_rpc_nodes(
715 cluster_info: &ClusterInfo,
716 cluster_entrypoints: &[ContactInfo],
717 validator_config: &ValidatorConfig,
718 blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
719 bootstrap_config: &RpcBootstrapConfig,
720) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
721 let mut blacklist_timeout = Instant::now();
722 let mut get_rpc_peers_timout = Instant::now();
723 let mut newer_cluster_snapshot_timeout = None;
724 let mut retry_reason = None;
725 loop {
726 std::thread::sleep(Duration::from_secs(1));
728 info!("\n{}", cluster_info.rpc_info_trace());
729
730 let rpc_peers = get_rpc_peers(
731 cluster_info,
732 cluster_entrypoints,
733 validator_config,
734 blacklisted_rpc_nodes,
735 &blacklist_timeout,
736 &mut retry_reason,
737 bootstrap_config,
738 );
739 if rpc_peers.is_empty() {
740 if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
741 return Err(GetRpcNodeError::NoRpcPeersFound);
742 }
743 continue;
744 }
745
746 blacklist_timeout = Instant::now();
748 get_rpc_peers_timout = Instant::now();
749 if bootstrap_config.no_snapshot_fetch {
750 let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
751 return Ok(vec![GetRpcNodeResult {
752 rpc_contact_info: random_peer.clone(),
753 snapshot_hash: None,
754 }]);
755 }
756
757 let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
758 .as_ref()
759 .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
760 .unwrap_or(true)
761 {
762 KnownValidatorsToWaitFor::All
763 } else {
764 KnownValidatorsToWaitFor::Any
765 };
766 let peer_snapshot_hashes = get_peer_snapshot_hashes(
767 cluster_info,
768 &rpc_peers,
769 validator_config.known_validators.as_ref(),
770 known_validators_to_wait_for,
771 bootstrap_config.incremental_snapshot_fetch,
772 );
773 if peer_snapshot_hashes.is_empty() {
774 match newer_cluster_snapshot_timeout {
775 None => newer_cluster_snapshot_timeout = Some(Instant::now()),
776 Some(newer_cluster_snapshot_timeout) => {
777 if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
778 return Err(GetRpcNodeError::NoNewerSnapshots);
779 }
780 }
781 }
782 retry_reason = Some("No snapshots available".to_owned());
783 continue;
784 } else {
785 let rpc_peers = peer_snapshot_hashes
786 .iter()
787 .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
788 .collect::<Vec<_>>();
789 let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
790 info!(
791 "Highest available snapshot slot is {}, available from {} node{}: {:?}",
792 final_snapshot_hash
793 .incr
794 .map(|(slot, _hash)| slot)
795 .unwrap_or(final_snapshot_hash.full.0),
796 rpc_peers.len(),
797 if rpc_peers.len() > 1 { "s" } else { "" },
798 rpc_peers,
799 );
800 let rpc_node_results = peer_snapshot_hashes
801 .iter()
802 .map(|peer_snapshot_hash| GetRpcNodeResult {
803 rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
804 snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
805 })
806 .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
807 .collect();
808 return Ok(rpc_node_results);
809 }
810 }
811}
812
813fn get_highest_local_snapshot_hash(
816 full_snapshot_archives_dir: impl AsRef<Path>,
817 incremental_snapshot_archives_dir: impl AsRef<Path>,
818 incremental_snapshot_fetch: bool,
819) -> Option<(Slot, Hash)> {
820 snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
821 .and_then(|full_snapshot_info| {
822 if incremental_snapshot_fetch {
823 snapshot_utils::get_highest_incremental_snapshot_archive_info(
824 incremental_snapshot_archives_dir,
825 full_snapshot_info.slot(),
826 )
827 .map(|incremental_snapshot_info| {
828 (
829 incremental_snapshot_info.slot(),
830 *incremental_snapshot_info.hash(),
831 )
832 })
833 } else {
834 None
835 }
836 .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
837 })
838 .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
839}
840
841fn get_peer_snapshot_hashes(
848 cluster_info: &ClusterInfo,
849 rpc_peers: &[ContactInfo],
850 known_validators: Option<&HashSet<Pubkey>>,
851 known_validators_to_wait_for: KnownValidatorsToWaitFor,
852 incremental_snapshot_fetch: bool,
853) -> Vec<PeerSnapshotHash> {
854 let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
855 if let Some(known_validators) = known_validators {
856 let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
857 cluster_info,
858 known_validators,
859 known_validators_to_wait_for,
860 );
861 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
862 &known_snapshot_hashes,
863 &mut peer_snapshot_hashes,
864 );
865 }
866 if incremental_snapshot_fetch {
867 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
875 &mut peer_snapshot_hashes,
876 );
877 }
878 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
879
880 peer_snapshot_hashes
881}
882
883type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
886
887fn get_snapshot_hashes_from_known_validators(
898 cluster_info: &ClusterInfo,
899 known_validators: &HashSet<Pubkey>,
900 known_validators_to_wait_for: KnownValidatorsToWaitFor,
901) -> KnownSnapshotHashes {
902 let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
904
905 if !do_known_validators_have_all_snapshot_hashes(
906 known_validators,
907 known_validators_to_wait_for,
908 get_snapshot_hashes_for_node,
909 ) {
910 debug!(
911 "Snapshot hashes have not been discovered from known validators. This likely means \
912 the gossip tables are not fully populated. We will sleep and retry..."
913 );
914 return KnownSnapshotHashes::default();
915 }
916
917 build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
918}
919
920fn do_known_validators_have_all_snapshot_hashes<'a>(
929 known_validators: impl IntoIterator<Item = &'a Pubkey>,
930 known_validators_to_wait_for: KnownValidatorsToWaitFor,
931 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
932) -> bool {
933 let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
934
935 match known_validators_to_wait_for {
936 KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
937 KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
938 }
939}
940
941#[derive(Debug, Copy, Clone, Eq, PartialEq)]
944enum KnownValidatorsToWaitFor {
945 All,
946 Any,
947}
948
949fn build_known_snapshot_hashes<'a>(
955 nodes: impl IntoIterator<Item = &'a Pubkey>,
956 get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
957) -> KnownSnapshotHashes {
958 let mut known_snapshot_hashes = KnownSnapshotHashes::new();
959
960 fn is_any_same_slot_and_different_hash<'a>(
963 needle: &(Slot, Hash),
964 haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
965 ) -> bool {
966 haystack
967 .into_iter()
968 .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
969 }
970
971 'to_next_node: for node in nodes {
972 let Some(SnapshotHash {
973 full: full_snapshot_hash,
974 incr: incremental_snapshot_hash,
975 }) = get_snapshot_hashes_for_node(node)
976 else {
977 continue 'to_next_node;
978 };
979
980 if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
985 warn!(
986 "Ignoring all snapshot hashes from node {node} since we've seen a different full \
987 snapshot hash with this slot.\
988 \nfull snapshot hash: {full_snapshot_hash:?}"
989 );
990 debug!(
991 "known full snapshot hashes: {:#?}",
992 known_snapshot_hashes.keys(),
993 );
994 continue 'to_next_node;
995 }
996
997 let known_incremental_snapshot_hashes =
1001 known_snapshot_hashes.entry(full_snapshot_hash).or_default();
1002
1003 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1004 if is_any_same_slot_and_different_hash(
1009 &incremental_snapshot_hash,
1010 known_incremental_snapshot_hashes.iter(),
1011 ) {
1012 warn!(
1013 "Ignoring incremental snapshot hash from node {node} since we've seen a \
1014 different incremental snapshot hash with this slot.\
1015 \nfull snapshot hash: {full_snapshot_hash:?}\
1016 \nincremental snapshot hash: {incremental_snapshot_hash:?}"
1017 );
1018 debug!(
1019 "known incremental snapshot hashes based on this slot: {:#?}",
1020 known_incremental_snapshot_hashes.iter(),
1021 );
1022 continue 'to_next_node;
1023 }
1024
1025 known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
1026 };
1027 }
1028
1029 trace!("known snapshot hashes: {known_snapshot_hashes:?}");
1030 known_snapshot_hashes
1031}
1032
1033fn get_eligible_peer_snapshot_hashes(
1039 cluster_info: &ClusterInfo,
1040 rpc_peers: &[ContactInfo],
1041) -> Vec<PeerSnapshotHash> {
1042 let peer_snapshot_hashes = rpc_peers
1043 .iter()
1044 .flat_map(|rpc_peer| {
1045 get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
1046 PeerSnapshotHash {
1047 rpc_contact_info: rpc_peer.clone(),
1048 snapshot_hash,
1049 }
1050 })
1051 })
1052 .collect();
1053
1054 trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
1055 peer_snapshot_hashes
1056}
1057
1058fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1060 known_snapshot_hashes: &KnownSnapshotHashes,
1061 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1062) {
1063 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1064 known_snapshot_hashes
1065 .get(&peer_snapshot_hash.snapshot_hash.full)
1066 .map(|known_incremental_hashes| {
1067 if peer_snapshot_hash.snapshot_hash.incr.is_none() {
1068 true
1071 } else {
1072 known_incremental_hashes
1073 .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
1074 }
1075 })
1076 .unwrap_or(false)
1077 });
1078
1079 trace!(
1080 "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
1081 );
1082}
1083
1084fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
1086 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1087) {
1088 let highest_full_snapshot_hash = peer_snapshot_hashes
1089 .iter()
1090 .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
1091 .max_by_key(|(slot, _hash)| *slot);
1092 let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
1093 return;
1097 };
1098
1099 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1100 peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
1101 });
1102
1103 trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
1104}
1105
1106fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
1108 peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
1109) {
1110 let highest_incremental_snapshot_hash = peer_snapshot_hashes
1111 .iter()
1112 .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
1113 .max_by_key(|(slot, _hash)| *slot);
1114
1115 peer_snapshot_hashes.retain(|peer_snapshot_hash| {
1116 peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
1117 });
1118
1119 trace!(
1120 "retain peer snapshot hashes with highest incremental snapshot slot: \
1121 {peer_snapshot_hashes:?}"
1122 );
1123}
1124
1125#[allow(clippy::too_many_arguments)]
1127fn download_snapshots(
1128 full_snapshot_archives_dir: &Path,
1129 incremental_snapshot_archives_dir: &Path,
1130 validator_config: &ValidatorConfig,
1131 bootstrap_config: &RpcBootstrapConfig,
1132 use_progress_bar: bool,
1133 maximum_local_snapshot_age: Slot,
1134 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1135 minimal_snapshot_download_speed: f32,
1136 maximum_snapshot_download_abort: u64,
1137 download_abort_count: &mut u64,
1138 snapshot_hash: Option<SnapshotHash>,
1139 rpc_contact_info: &ContactInfo,
1140) -> Result<(), String> {
1141 if snapshot_hash.is_none() {
1142 return Ok(());
1143 }
1144 let SnapshotHash {
1145 full: full_snapshot_hash,
1146 incr: incremental_snapshot_hash,
1147 } = snapshot_hash.unwrap();
1148
1149 if should_use_local_snapshot(
1151 full_snapshot_archives_dir,
1152 incremental_snapshot_archives_dir,
1153 maximum_local_snapshot_age,
1154 full_snapshot_hash,
1155 incremental_snapshot_hash,
1156 bootstrap_config.incremental_snapshot_fetch,
1157 ) {
1158 return Ok(());
1159 }
1160
1161 if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir)
1163 .into_iter()
1164 .any(|snapshot_archive| {
1165 snapshot_archive.slot() == full_snapshot_hash.0
1166 && snapshot_archive.hash().0 == full_snapshot_hash.1
1167 })
1168 {
1169 info!(
1170 "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
1171 full_snapshot_hash.0, full_snapshot_hash.1
1172 );
1173 } else {
1174 download_snapshot(
1175 full_snapshot_archives_dir,
1176 incremental_snapshot_archives_dir,
1177 validator_config,
1178 bootstrap_config,
1179 use_progress_bar,
1180 start_progress,
1181 minimal_snapshot_download_speed,
1182 maximum_snapshot_download_abort,
1183 download_abort_count,
1184 rpc_contact_info,
1185 full_snapshot_hash,
1186 SnapshotKind::FullSnapshot,
1187 )?;
1188 }
1189
1190 if bootstrap_config.incremental_snapshot_fetch {
1191 if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
1193 if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1194 .into_iter()
1195 .any(|snapshot_archive| {
1196 snapshot_archive.slot() == incremental_snapshot_hash.0
1197 && snapshot_archive.hash().0 == incremental_snapshot_hash.1
1198 && snapshot_archive.base_slot() == full_snapshot_hash.0
1199 })
1200 {
1201 info!(
1202 "Incremental snapshot archive already exists locally. Skipping download. \
1203 slot: {}, hash: {}",
1204 incremental_snapshot_hash.0, incremental_snapshot_hash.1
1205 );
1206 } else {
1207 download_snapshot(
1208 full_snapshot_archives_dir,
1209 incremental_snapshot_archives_dir,
1210 validator_config,
1211 bootstrap_config,
1212 use_progress_bar,
1213 start_progress,
1214 minimal_snapshot_download_speed,
1215 maximum_snapshot_download_abort,
1216 download_abort_count,
1217 rpc_contact_info,
1218 incremental_snapshot_hash,
1219 SnapshotKind::IncrementalSnapshot(full_snapshot_hash.0),
1220 )?;
1221 }
1222 }
1223 }
1224
1225 Ok(())
1226}
1227
1228#[allow(clippy::too_many_arguments)]
1230fn download_snapshot(
1231 full_snapshot_archives_dir: &Path,
1232 incremental_snapshot_archives_dir: &Path,
1233 validator_config: &ValidatorConfig,
1234 bootstrap_config: &RpcBootstrapConfig,
1235 use_progress_bar: bool,
1236 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1237 minimal_snapshot_download_speed: f32,
1238 maximum_snapshot_download_abort: u64,
1239 download_abort_count: &mut u64,
1240 rpc_contact_info: &ContactInfo,
1241 desired_snapshot_hash: (Slot, Hash),
1242 snapshot_kind: SnapshotKind,
1243) -> Result<(), String> {
1244 let maximum_full_snapshot_archives_to_retain = validator_config
1245 .snapshot_config
1246 .maximum_full_snapshot_archives_to_retain;
1247 let maximum_incremental_snapshot_archives_to_retain = validator_config
1248 .snapshot_config
1249 .maximum_incremental_snapshot_archives_to_retain;
1250
1251 *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
1252 slot: desired_snapshot_hash.0,
1253 rpc_addr: rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1254 };
1255 let desired_snapshot_hash = (
1256 desired_snapshot_hash.0,
1257 miraland_runtime::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
1258 );
1259 download_snapshot_archive(
1260 &rpc_contact_info.rpc().map_err(|err| format!("{err:?}"))?,
1261 full_snapshot_archives_dir,
1262 incremental_snapshot_archives_dir,
1263 desired_snapshot_hash,
1264 snapshot_kind,
1265 maximum_full_snapshot_archives_to_retain,
1266 maximum_incremental_snapshot_archives_to_retain,
1267 use_progress_bar,
1268 &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
1269 debug!("Download progress: {download_progress:?}");
1270 if download_progress.last_throughput < minimal_snapshot_download_speed
1271 && download_progress.notification_count <= 1
1272 && download_progress.percentage_done <= 2_f32
1273 && download_progress.estimated_remaining_time > 60_f32
1274 && *download_abort_count < maximum_snapshot_download_abort
1275 {
1276 if let Some(ref known_validators) = validator_config.known_validators {
1277 if known_validators.contains(rpc_contact_info.pubkey())
1278 && known_validators.len() == 1
1279 && bootstrap_config.only_known_rpc
1280 {
1281 warn!(
1282 "The snapshot download is too slow, throughput: {} < min speed {} \
1283 bytes/sec, but will NOT abort and try a different node as it is the \
1284 only known validator and the --only-known-rpc flag is set. Abort \
1285 count: {}, Progress detail: {:?}",
1286 download_progress.last_throughput,
1287 minimal_snapshot_download_speed,
1288 download_abort_count,
1289 download_progress,
1290 );
1291 return true; }
1293 }
1294 warn!(
1295 "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
1296 will abort and try a different node. Abort count: {}, Progress detail: {:?}",
1297 download_progress.last_throughput,
1298 minimal_snapshot_download_speed,
1299 download_abort_count,
1300 download_progress,
1301 );
1302 *download_abort_count += 1;
1303 false
1304 } else {
1305 true
1306 }
1307 })),
1308 )
1309}
1310
1311fn should_use_local_snapshot(
1314 full_snapshot_archives_dir: &Path,
1315 incremental_snapshot_archives_dir: &Path,
1316 maximum_local_snapshot_age: Slot,
1317 full_snapshot_hash: (Slot, Hash),
1318 incremental_snapshot_hash: Option<(Slot, Hash)>,
1319 incremental_snapshot_fetch: bool,
1320) -> bool {
1321 let cluster_snapshot_slot = incremental_snapshot_hash
1322 .map(|(slot, _)| slot)
1323 .unwrap_or(full_snapshot_hash.0);
1324
1325 match get_highest_local_snapshot_hash(
1326 full_snapshot_archives_dir,
1327 incremental_snapshot_archives_dir,
1328 incremental_snapshot_fetch,
1329 ) {
1330 None => {
1331 info!(
1332 "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
1333 local snapshot."
1334 );
1335 false
1336 }
1337 Some((local_snapshot_slot, _)) => {
1338 if local_snapshot_slot
1339 >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
1340 {
1341 info!(
1342 "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
1343 a snapshot for slot {cluster_snapshot_slot}."
1344 );
1345 true
1346 } else {
1347 info!(
1348 "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
1349 newer snapshot for slot {cluster_snapshot_slot}."
1350 );
1351 false
1352 }
1353 }
1354 }
1355}
1356
1357fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
1359 cluster_info.get_snapshot_hashes_for_node(node).map(
1360 |crds_value::SnapshotHashes {
1361 full, incremental, ..
1362 }| {
1363 let highest_incremental_snapshot_hash = incremental.into_iter().max();
1364 SnapshotHash {
1365 full,
1366 incr: highest_incremental_snapshot_hash,
1367 }
1368 },
1369 )
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use super::*;
1375
1376 impl PeerSnapshotHash {
1377 fn new(
1378 rpc_contact_info: ContactInfo,
1379 full_snapshot_hash: (Slot, Hash),
1380 incremental_snapshot_hash: Option<(Slot, Hash)>,
1381 ) -> Self {
1382 Self {
1383 rpc_contact_info,
1384 snapshot_hash: SnapshotHash {
1385 full: full_snapshot_hash,
1386 incr: incremental_snapshot_hash,
1387 },
1388 }
1389 }
1390 }
1391
1392 fn default_contact_info_for_tests() -> ContactInfo {
1393 ContactInfo::new_localhost(&Pubkey::default(), 1_681_834_947_321)
1394 }
1395
1396 #[test]
1397 fn test_build_known_snapshot_hashes() {
1398 miraland_logger::setup();
1399 let full_snapshot_hash1 = (400_000, Hash::new_unique());
1400 let full_snapshot_hash2 = (400_000, Hash::new_unique());
1401
1402 let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
1403 let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
1404
1405 let oracle = {
1407 let mut oracle = HashMap::new();
1408
1409 for (full, incr) in [
1410 (full_snapshot_hash1, None),
1412 (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
1414 (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
1416 (full_snapshot_hash2, None),
1418 (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
1419 (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
1420 ] {
1421 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1423 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1424 oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
1425 }
1426
1427 oracle.insert(Pubkey::new_unique(), None);
1429 oracle.insert(Pubkey::new_unique(), None);
1430 oracle.insert(Pubkey::new_unique(), None);
1431
1432 oracle
1433 };
1434
1435 let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
1436
1437 let known_snapshot_hashes =
1438 build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
1439
1440 let known_full_snapshot_hashes = known_snapshot_hashes.keys();
1443 assert_eq!(known_full_snapshot_hashes.len(), 1);
1444 let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
1445
1446 let known_incremental_snapshot_hashes =
1448 known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
1449 assert_eq!(known_incremental_snapshot_hashes.len(), 1);
1450 let known_incremental_snapshot_hash =
1451 known_incremental_snapshot_hashes.iter().next().unwrap();
1452
1453 assert!(
1460 known_full_snapshot_hash == &full_snapshot_hash1
1461 || known_full_snapshot_hash == &full_snapshot_hash2
1462 );
1463 assert!(
1464 known_incremental_snapshot_hash == &incremental_snapshot_hash1
1465 || known_incremental_snapshot_hash == &incremental_snapshot_hash2
1466 );
1467 }
1468
1469 #[test]
1470 fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
1471 let known_snapshot_hashes: KnownSnapshotHashes = [
1472 (
1473 (200_000, Hash::new_unique()),
1474 [
1475 (200_200, Hash::new_unique()),
1476 (200_400, Hash::new_unique()),
1477 (200_600, Hash::new_unique()),
1478 (200_800, Hash::new_unique()),
1479 ]
1480 .iter()
1481 .cloned()
1482 .collect(),
1483 ),
1484 (
1485 (300_000, Hash::new_unique()),
1486 [
1487 (300_200, Hash::new_unique()),
1488 (300_400, Hash::new_unique()),
1489 (300_600, Hash::new_unique()),
1490 ]
1491 .iter()
1492 .cloned()
1493 .collect(),
1494 ),
1495 ]
1496 .iter()
1497 .cloned()
1498 .collect();
1499
1500 let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
1501 let known_full_snapshot_hash = known_snapshot_hash.0;
1502 let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
1503
1504 let contact_info = default_contact_info_for_tests();
1505 let peer_snapshot_hashes = vec![
1506 PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
1508 PeerSnapshotHash::new(
1510 contact_info.clone(),
1511 (111_000, Hash::default()),
1512 Some((111_111, Hash::default())),
1513 ),
1514 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1516 PeerSnapshotHash::new(
1518 contact_info.clone(),
1519 (111_000, Hash::default()),
1520 Some(*known_incremental_snapshot_hash),
1521 ),
1522 PeerSnapshotHash::new(
1524 contact_info.clone(),
1525 *known_full_snapshot_hash,
1526 Some((111_111, Hash::default())),
1527 ),
1528 PeerSnapshotHash::new(
1530 contact_info.clone(),
1531 *known_full_snapshot_hash,
1532 Some(*known_incremental_snapshot_hash),
1533 ),
1534 ];
1535
1536 let expected = vec![
1537 PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
1538 PeerSnapshotHash::new(
1539 contact_info,
1540 *known_full_snapshot_hash,
1541 Some(*known_incremental_snapshot_hash),
1542 ),
1543 ];
1544 let mut actual = peer_snapshot_hashes;
1545 retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
1546 &known_snapshot_hashes,
1547 &mut actual,
1548 );
1549 assert_eq!(expected, actual);
1550 }
1551
1552 #[test]
1553 fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
1554 let contact_info = default_contact_info_for_tests();
1555 let peer_snapshot_hashes = vec![
1556 PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
1558 PeerSnapshotHash::new(
1559 contact_info.clone(),
1560 (100_000, Hash::default()),
1561 Some((100_100, Hash::default())),
1562 ),
1563 PeerSnapshotHash::new(
1564 contact_info.clone(),
1565 (100_000, Hash::default()),
1566 Some((100_200, Hash::default())),
1567 ),
1568 PeerSnapshotHash::new(
1569 contact_info.clone(),
1570 (100_000, Hash::default()),
1571 Some((100_300, Hash::default())),
1572 ),
1573 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1575 PeerSnapshotHash::new(
1576 contact_info.clone(),
1577 (200_000, Hash::default()),
1578 Some((200_100, Hash::default())),
1579 ),
1580 PeerSnapshotHash::new(
1581 contact_info.clone(),
1582 (200_000, Hash::default()),
1583 Some((200_200, Hash::default())),
1584 ),
1585 PeerSnapshotHash::new(
1586 contact_info.clone(),
1587 (200_000, Hash::default()),
1588 Some((200_300, Hash::default())),
1589 ),
1590 ];
1591
1592 let expected = vec![
1593 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1594 PeerSnapshotHash::new(
1595 contact_info.clone(),
1596 (200_000, Hash::default()),
1597 Some((200_100, Hash::default())),
1598 ),
1599 PeerSnapshotHash::new(
1600 contact_info.clone(),
1601 (200_000, Hash::default()),
1602 Some((200_200, Hash::default())),
1603 ),
1604 PeerSnapshotHash::new(
1605 contact_info,
1606 (200_000, Hash::default()),
1607 Some((200_300, Hash::default())),
1608 ),
1609 ];
1610 let mut actual = peer_snapshot_hashes;
1611 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1612 assert_eq!(expected, actual);
1613 }
1614
1615 #[test]
1616 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
1617 let contact_info = default_contact_info_for_tests();
1618 let peer_snapshot_hashes = vec![
1619 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
1620 PeerSnapshotHash::new(
1621 contact_info.clone(),
1622 (200_000, Hash::default()),
1623 Some((200_100, Hash::default())),
1624 ),
1625 PeerSnapshotHash::new(
1626 contact_info.clone(),
1627 (200_000, Hash::default()),
1628 Some((200_200, Hash::default())),
1629 ),
1630 PeerSnapshotHash::new(
1631 contact_info.clone(),
1632 (200_000, Hash::default()),
1633 Some((200_300, Hash::default())),
1634 ),
1635 PeerSnapshotHash::new(
1636 contact_info.clone(),
1637 (200_000, Hash::default()),
1638 Some((200_010, Hash::default())),
1639 ),
1640 PeerSnapshotHash::new(
1641 contact_info.clone(),
1642 (200_000, Hash::default()),
1643 Some((200_020, Hash::default())),
1644 ),
1645 PeerSnapshotHash::new(
1646 contact_info.clone(),
1647 (200_000, Hash::default()),
1648 Some((200_030, Hash::default())),
1649 ),
1650 ];
1651
1652 let expected = vec![PeerSnapshotHash::new(
1653 contact_info,
1654 (200_000, Hash::default()),
1655 Some((200_300, Hash::default())),
1656 )];
1657 let mut actual = peer_snapshot_hashes;
1658 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1659 assert_eq!(expected, actual);
1660 }
1661
1662 #[test]
1665 fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
1666 let contact_info = default_contact_info_for_tests();
1667 let peer_snapshot_hashes = vec![
1668 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1669 PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
1670 PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
1671 ];
1672
1673 let expected = peer_snapshot_hashes.clone();
1674 let mut actual = peer_snapshot_hashes;
1675 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1676 assert_eq!(expected, actual);
1677 }
1678
1679 #[test]
1682 fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
1683 {
1684 let mut actual = vec![];
1685 let expected = actual.clone();
1686 retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
1687 assert_eq!(expected, actual);
1688 }
1689 {
1690 let mut actual = vec![];
1691 let expected = actual.clone();
1692 retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
1693 assert_eq!(expected, actual);
1694 }
1695 }
1696}