1use {
2 crate::{
3 admin_rpc_service::{self, StakedNodesOverrides, load_staked_nodes_overrides},
4 bootstrap,
5 cli::{self},
6 commands::{FromClapArgMatches, run::args::RunArgs},
7 ledger_lockfile, lock_ledger,
8 },
9 agave_snapshots::{
10 ArchiveFormat, SnapshotInterval, SnapshotVersion,
11 paths::BANK_SNAPSHOTS_DIR,
12 snapshot_config::{SnapshotConfig, SnapshotUsage},
13 },
14 agave_votor::vote_history_storage,
15 agave_xdp::{set_cpu_affinity, xdp_retransmitter::XdpConfig},
16 clap::{ArgMatches, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit},
17 crossbeam_channel::unbounded,
18 log::*,
19 rand::{rng, seq::SliceRandom},
20 solana_accounts_db::{
21 accounts_db::{AccountShrinkThreshold, AccountsDbConfig, MarkObsoleteAccounts},
22 accounts_file::StorageAccess,
23 accounts_index::{
24 AccountSecondaryIndexes, AccountsIndexConfig, DEFAULT_NUM_ENTRIES_OVERHEAD,
25 DEFAULT_NUM_ENTRIES_TO_EVICT, IndexLimit, IndexLimitThreshold, ScanFilter,
26 },
27 partitioned_rewards::PartitionedEpochRewardsConfig,
28 utils::{
29 create_all_accounts_run_and_snapshot_dirs, create_and_canonicalize_directories,
30 create_and_canonicalize_directory,
31 },
32 },
33 solana_clap_utils::input_parsers::{
34 keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of,
35 },
36 solana_clock::{DEFAULT_SLOTS_PER_EPOCH, Slot},
37 solana_core::{
38 banking_stage::transaction_scheduler::scheduler_controller::SchedulerConfig,
39 banking_trace::DISABLED_BAKING_TRACE_DIR,
40 consensus::tower_storage,
41 repair::repair_handler::RepairHandlerType,
42 resource_limits,
43 snapshot_packager_service::SnapshotPackagerService,
44 system_monitor_service::SystemMonitorService,
45 tpu::MAX_VOTES_PER_SECOND,
46 validator::{
47 BlockProductionMethod, BlockVerificationMethod, SchedulerPacing, Validator,
48 ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig, is_snapshot_config_valid,
49 },
50 },
51 solana_genesis_utils::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
52 solana_gossip::{
53 cluster_info::{DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, NodeConfig},
54 contact_info::ContactInfo,
55 node::Node,
56 },
57 solana_hash::Hash,
58 solana_keypair::Keypair,
59 solana_ledger::{
60 blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
61 use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
62 },
63 solana_net_utils::multihomed_sockets::BindIpAddrs,
64 solana_poh::poh_service,
65 solana_pubkey::Pubkey,
66 solana_runtime::{runtime_config::RuntimeConfig, snapshot_utils},
67 solana_signer::Signer,
68 solana_streamer::{
69 nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
70 quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
71 },
72 solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
73 solana_turbine::broadcast_stage::BroadcastStageType,
74 solana_validator_exit::Exit,
75 std::{
76 collections::HashSet,
77 env,
78 fs::{self, File},
79 net::{IpAddr, Ipv4Addr, SocketAddr},
80 num::{NonZeroU64, NonZeroUsize},
81 path::{Path, PathBuf},
82 str::{self, FromStr},
83 sync::{Arc, RwLock, atomic::AtomicBool},
84 },
85};
86
87#[derive(Debug, PartialEq, Eq)]
88pub enum Operation {
89 Initialize,
90 Run,
91}
92
93pub fn execute(
94 matches: &ArgMatches,
95 solana_version: &str,
96 operation: Operation,
97) -> Result<(), Box<dyn std::error::Error>> {
98 if env::var_os("RUST_BACKTRACE").is_none() {
100 unsafe { env::set_var("RUST_BACKTRACE", "1") }
102 }
103
104 let run_args = RunArgs::from_clap_arg_match(matches)?;
105
106 let cli::thread_args::NumThreadConfig {
107 accounts_db_background_threads,
108 accounts_db_foreground_threads,
109 accounts_index_flush_threads,
110 block_production_num_workers,
111 ip_echo_server_threads,
112 rayon_global_threads,
113 replay_forks_threads,
114 replay_transactions_threads,
115 tpu_sigverify_threads,
116 tpu_transaction_forward_receive_threads,
117 tpu_transaction_receive_threads,
118 tpu_vote_transaction_receive_threads,
119 tvu_receive_threads,
120 tvu_retransmit_threads,
121 tvu_sigverify_threads,
122 } = cli::thread_args::parse_num_threads_args(matches);
123
124 let identity_keypair = Arc::new(run_args.identity_keypair);
125
126 let logfile = run_args.logfile;
127 if let Some(logfile) = logfile.as_ref() {
128 println!("log file: {}", logfile.display());
129 }
130 let use_progress_bar = logfile.is_none();
131 agave_logger::initialize_logging(logfile.clone());
132 cli::warn_for_deprecated_arguments(matches);
133
134 info!("{} {}", crate_name!(), solana_version);
135 info!("Starting validator with: {:#?}", std::env::args_os());
136
137 solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
138 solana_metrics::set_panic_hook("validator", Some(String::from(solana_version)));
139 solana_entry::entry::init_poh();
140
141 let bind_addresses = {
142 let parsed = matches
143 .values_of("bind_address")
144 .expect("bind_address should always be present due to default")
145 .map(solana_net_utils::parse_host)
146 .collect::<Result<Vec<_>, _>>()?;
147 BindIpAddrs::new(parsed).map_err(|err| format!("invalid bind_addresses: {err}"))?
148 };
149
150 let entrypoint_addrs = run_args.entrypoints;
151 for addr in &entrypoint_addrs {
152 if !run_args.socket_addr_space.check(addr) {
153 Err(format!("invalid entrypoint address: {addr}"))?;
154 }
155 }
156
157 let xdp_interface = matches.value_of("retransmit_xdp_interface");
158 let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
159 let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
160 XdpConfig::new(
161 xdp_interface,
162 parse_cpu_ranges(cpus).unwrap(),
163 xdp_zero_copy,
164 )
165 });
166
167 let dynamic_port_range =
168 solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
169 .expect("invalid dynamic_port_range");
170
171 let advertised_ip = matches
172 .value_of("advertised_ip")
173 .map(|advertised_ip| {
174 solana_net_utils::parse_host(advertised_ip)
175 .map_err(|err| format!("failed to parse --advertised-ip: {err}"))
176 })
177 .transpose()?;
178
179 let advertised_ip = if let Some(cli_ip) = advertised_ip {
180 cli_ip
181 } else if !bind_addresses.active().is_unspecified() && !bind_addresses.active().is_loopback() {
182 bind_addresses.active()
183 } else if !entrypoint_addrs.is_empty() {
184 let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
185 order.shuffle(&mut rng());
186
187 order
188 .into_iter()
189 .find_map(|i| {
190 let entrypoint_addr = &entrypoint_addrs[i];
191 info!(
192 "Contacting {entrypoint_addr} to determine the validator's public IP address"
193 );
194 solana_net_utils::get_public_ip_addr_with_binding(
195 entrypoint_addr,
196 bind_addresses.active(),
197 )
198 .map_or_else(
199 |err| {
200 warn!("Failed to contact cluster entrypoint {entrypoint_addr}: {err}");
201 None
202 },
203 Some,
204 )
205 })
206 .ok_or_else(|| "unable to determine the validator's public IP address".to_string())?
207 } else {
208 IpAddr::V4(Ipv4Addr::LOCALHOST)
209 };
210 let gossip_port = value_t!(matches, "gossip_port", u16).or_else(|_| {
211 solana_net_utils::find_available_port_in_range(bind_addresses.active(), (0, 1))
212 .map_err(|err| format!("unable to find an available gossip port: {err}"))
213 })?;
214
215 let public_tpu_addr = matches
216 .value_of("public_tpu_addr")
217 .map(|public_tpu_addr| {
218 solana_net_utils::parse_host_port(public_tpu_addr)
219 .map_err(|err| format!("failed to parse --public-tpu-address: {err}"))
220 })
221 .transpose()?;
222
223 let public_tpu_forwards_addr = matches
224 .value_of("public_tpu_forwards_addr")
225 .map(|public_tpu_forwards_addr| {
226 solana_net_utils::parse_host_port(public_tpu_forwards_addr)
227 .map_err(|err| format!("failed to parse --public-tpu-forwards-address: {err}"))
228 })
229 .transpose()?;
230
231 let public_tvu_addr = matches
232 .value_of("public_tvu_addr")
233 .map(|public_tvu_addr| {
234 solana_net_utils::parse_host_port(public_tvu_addr)
235 .map_err(|err| format!("failed to parse --public-tvu-address: {err}"))
236 })
237 .transpose()?;
238
239 if bind_addresses.len() > 1 && public_tvu_addr.is_some() {
240 Err(String::from(
241 "--public-tvu-address can not be used in a multihoming context",
242 ))?;
243 }
244
245 let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
246
247 let node_config = NodeConfig {
248 advertised_ip,
249 gossip_port,
250 port_range: dynamic_port_range,
251 bind_ip_addrs: bind_addresses.clone(),
252 public_tpu_addr,
253 public_tpu_forwards_addr,
254 public_tvu_addr,
255 num_tvu_receive_sockets: tvu_receive_threads,
256 num_tvu_retransmit_sockets: tvu_retransmit_threads,
257 num_quic_endpoints,
258 };
259
260 let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), node_config);
261
262 let exit = Arc::new(AtomicBool::new(false));
263
264 #[cfg(target_os = "linux")]
265 let maybe_xdp_retransmit_builder = {
266 use {
267 agave_xdp::xdp_retransmitter::{XdpRetransmitBuilder, master_ip_if_bonded},
268 caps::{
269 CapSet,
270 Capability::{CAP_BPF, CAP_NET_ADMIN, CAP_NET_RAW, CAP_PERFMON, CAP_SYS_NICE},
271 },
272 };
273
274 let mut required_caps = HashSet::new();
275 let mut retained_caps = HashSet::new();
276 let supported_caps = HashSet::from_iter([
277 CAP_BPF,
278 CAP_NET_ADMIN,
279 CAP_NET_RAW,
280 CAP_PERFMON,
281 CAP_SYS_NICE,
282 ]);
283
284 if let Some(xdp_config) = retransmit_xdp.as_ref() {
285 required_caps.insert(CAP_NET_ADMIN);
286 required_caps.insert(CAP_NET_RAW);
287 if xdp_config.zero_copy {
288 required_caps.insert(CAP_BPF);
289 required_caps.insert(CAP_PERFMON);
290 }
291 }
292
293 let snapshot_packager_niceness_adj =
294 value_t_or_exit!(matches, "snapshot_packager_niceness_adj", i8);
295
296 if snapshot_packager_niceness_adj != 0 || run_args.json_rpc_config.rpc_niceness_adj != 0 {
297 required_caps.insert(CAP_SYS_NICE);
298 retained_caps.insert(CAP_SYS_NICE);
299 }
300
301 assert!(
303 required_caps.is_subset(&supported_caps),
304 "required_caps contains a cap not in supported_caps",
305 );
306
307 let current_permitted =
309 caps::read(None, CapSet::Permitted).expect("permitted capset to be readable");
310 let missing_caps = required_caps
311 .difference(¤t_permitted)
312 .collect::<Vec<_>>();
313 if !missing_caps.is_empty() {
314 error!(
315 "the current configuration requires the following capabilities, which have not \
316 been permitted to the current process: {missing_caps:?}",
317 );
318 std::process::exit(1);
319 }
320 let extra_caps = current_permitted
322 .difference(&supported_caps)
323 .collect::<Vec<_>>();
324 if !extra_caps.is_empty() {
325 warn!(
326 "dropping extraneous capabilities ({extra_caps:?}) from the current process. \
327 consider removing them from your operational configuration.",
328 );
329 }
330 caps::set(None, CapSet::Permitted, &required_caps)
332 .expect("permitted capset to be writable");
333
334 let maybe_xdp_retransmit_builder = retransmit_xdp.clone().map(|xdp_config| {
338 let src_port = node.sockets.retransmit_sockets[0]
339 .local_addr()
340 .expect("failed to get local address")
341 .port();
342 let src_ip = match node.bind_ip_addrs.active() {
343 IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
344 IpAddr::V4(_unspecified) => xdp_config
345 .interface
346 .as_ref()
347 .and_then(|iface| master_ip_if_bonded(iface)),
348 _ => panic!("IPv6 not supported"),
349 };
350 XdpRetransmitBuilder::new(xdp_config, src_port, src_ip, exit.clone())
351 .expect("failed to create xdp retransmitter")
352 });
353
354 caps::set(None, CapSet::Permitted, &retained_caps)
356 .expect("linux allows permitted capset to be set");
357
358 maybe_xdp_retransmit_builder
359 };
360
361 #[cfg(not(target_os = "linux"))]
362 let maybe_xdp_retransmit_builder = None;
363
364 let reserved = retransmit_xdp
365 .map(|xdp| xdp.cpus.clone())
366 .unwrap_or_default()
367 .iter()
368 .cloned()
369 .collect::<HashSet<_>>();
370 if !reserved.is_empty() {
371 let available = core_affinity::get_core_ids()
372 .unwrap_or_default()
373 .into_iter()
374 .map(|core_id| core_id.id)
375 .collect::<HashSet<_>>();
376 let available = available.difference(&reserved);
377 set_cpu_affinity(available.into_iter().copied()).unwrap();
378 }
379
380 solana_core::validator::report_target_features();
381
382 let authorized_voter_keypairs = keypairs_of(matches, "authorized_voter_keypairs")
383 .map(|keypairs| keypairs.into_iter().map(Arc::new).collect())
384 .unwrap_or_else(|| vec![Arc::new(keypair_of(matches, "identity").expect("identity"))]);
385 let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
386
387 let staked_nodes_overrides_path = matches
388 .value_of("staked_nodes_overrides")
389 .map(str::to_string);
390 let staked_nodes_overrides = Arc::new(RwLock::new(
391 match &staked_nodes_overrides_path {
392 None => StakedNodesOverrides::default(),
393 Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| {
394 error!("Failed to load stake-nodes-overrides from {p}: {err}");
395 clap::Error::with_description(
396 "Failed to load configuration of stake-nodes-overrides argument",
397 clap::ErrorKind::InvalidValue,
398 )
399 .exit()
400 }),
401 }
402 .staked_map_id,
403 ));
404
405 let init_complete_file = matches.value_of("init_complete_file");
406
407 let private_rpc = matches.is_present("private_rpc");
408 let do_port_check = !matches.is_present("no_port_check");
409
410 let ledger_path = run_args.ledger_path;
411
412 let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
413 let limit_ledger_size = match matches.value_of("limit_ledger_size") {
414 Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
415 None => DEFAULT_MAX_LEDGER_SHREDS,
416 };
417 if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
418 Err(format!(
419 "The provided --limit-ledger-size value was too small, the minimum value is \
420 {DEFAULT_MIN_MAX_LEDGER_SHREDS}"
421 ))?;
422 }
423 Some(limit_ledger_size)
424 } else {
425 None
426 };
427
428 let debug_keys: Option<Arc<HashSet<_>>> = if matches.is_present("debug_key") {
429 Some(Arc::new(
430 values_t_or_exit!(matches, "debug_key", Pubkey)
431 .into_iter()
432 .collect(),
433 ))
434 } else {
435 None
436 };
437
438 let repair_validators = validators_set(
439 &identity_keypair.pubkey(),
440 matches,
441 "repair_validators",
442 "--repair-validator",
443 )?;
444 let repair_whitelist = validators_set(
445 &identity_keypair.pubkey(),
446 matches,
447 "repair_whitelist",
448 "--repair-whitelist",
449 )?;
450 let repair_whitelist = Arc::new(RwLock::new(repair_whitelist.unwrap_or_default()));
451 let gossip_validators = validators_set(
452 &identity_keypair.pubkey(),
453 matches,
454 "gossip_validators",
455 "--gossip-validator",
456 )?;
457
458 if bind_addresses.len() > 1 {
459 for (flag, msg) in [
460 (
461 "advertised_ip",
462 "--advertised-ip cannot be used in a multihoming context. In multihoming, the \
463 validator will advertise the first --bind-address as this node's public IP \
464 address.",
465 ),
466 (
467 "public_tpu_addr",
468 "--public-tpu-address can not be used in a multihoming context",
469 ),
470 ] {
471 if matches.is_present(flag) {
472 Err(String::from(msg))?;
473 }
474 }
475 }
476
477 let rpc_bind_address = if matches.is_present("rpc_bind_address") {
478 solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap())
479 .expect("invalid rpc_bind_address")
480 } else if private_rpc {
481 solana_net_utils::parse_host("127.0.0.1").unwrap()
482 } else {
483 bind_addresses.active()
484 };
485
486 let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
487
488 let account_indexes = AccountSecondaryIndexes::from_clap_arg_match(matches)?;
489
490 let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
491 let accounts_shrink_optimize_total_space =
492 value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
493 let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool);
494
495 let tpu_connection_pool_size = matches
496 .value_of("tpu_connection_pool_size")
497 .unwrap_or("")
498 .parse()
499 .unwrap_or(DEFAULT_TPU_CONNECTION_POOL_SIZE);
500
501 let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
502 if !(0.0..=1.0).contains(&shrink_ratio) {
503 Err(format!(
504 "the specified account-shrink-ratio is invalid, it must be between 0. and 1.0 \
505 inclusive: {shrink_ratio}"
506 ))?;
507 }
508
509 let shrink_ratio = if accounts_shrink_optimize_total_space {
510 AccountShrinkThreshold::TotalSpace { shrink_ratio }
511 } else {
512 AccountShrinkThreshold::IndividualStore { shrink_ratio }
513 };
514 let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
519 .ok()
520 .or_else(|| get_cluster_shred_version(&entrypoint_addrs, bind_addresses.active()));
521
522 let tower_path = value_t!(matches, "tower", PathBuf)
523 .ok()
524 .unwrap_or_else(|| ledger_path.clone());
525 let tower_storage: Arc<dyn tower_storage::TowerStorage> =
526 Arc::new(tower_storage::FileTowerStorage::new(tower_path));
527
528 let vote_history_storage: Arc<dyn vote_history_storage::VoteHistoryStorage> = Arc::new(
529 vote_history_storage::FileVoteHistoryStorage::new(ledger_path.clone()),
530 );
531
532 let accounts_index_limit =
533 value_t!(matches, "accounts_index_limit", String).unwrap_or_else(|err| err.exit());
534 let index_limit = {
535 enum CliIndexLimit {
536 Minimal,
537 Unlimited,
538 Threshold(u64),
539 }
540 let cli_index_limit = match accounts_index_limit.as_str() {
541 "minimal" => CliIndexLimit::Minimal,
542 "unlimited" => CliIndexLimit::Unlimited,
543 "25GB" => CliIndexLimit::Threshold(25_000_000_000),
544 "50GB" => CliIndexLimit::Threshold(50_000_000_000),
545 "100GB" => CliIndexLimit::Threshold(100_000_000_000),
546 "200GB" => CliIndexLimit::Threshold(200_000_000_000),
547 "400GB" => CliIndexLimit::Threshold(400_000_000_000),
548 "800GB" => CliIndexLimit::Threshold(800_000_000_000),
549 x => {
550 unreachable!("invalid value given to `--accounts-index-limit`: '{x}'")
552 }
553 };
554 match cli_index_limit {
555 CliIndexLimit::Minimal => IndexLimit::Minimal,
556 CliIndexLimit::Unlimited => IndexLimit::InMemOnly,
557 CliIndexLimit::Threshold(num_bytes) => IndexLimit::Threshold(IndexLimitThreshold {
558 num_bytes,
559 num_entries_overhead: DEFAULT_NUM_ENTRIES_OVERHEAD,
560 num_entries_to_evict: DEFAULT_NUM_ENTRIES_TO_EVICT,
561 }),
562 }
563 };
564 let index_limit = if matches.is_present("enable_accounts_disk_index") {
566 IndexLimit::Minimal
567 } else {
568 index_limit
569 };
570
571 let mut accounts_index_config = AccountsIndexConfig {
572 num_flush_threads: Some(accounts_index_flush_threads),
573 index_limit,
574 ..AccountsIndexConfig::default()
575 };
576 if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
577 accounts_index_config.bins = Some(bins);
578 }
579 if let Ok(num_initial_accounts) =
580 value_t!(matches, "accounts_index_initial_accounts_count", usize)
581 {
582 accounts_index_config.num_initial_accounts = Some(num_initial_accounts);
583 }
584
585 {
586 let mut accounts_index_paths: Vec<PathBuf> = if matches.is_present("accounts_index_path") {
587 values_t_or_exit!(matches, "accounts_index_path", String)
588 .into_iter()
589 .map(PathBuf::from)
590 .collect()
591 } else {
592 vec![]
593 };
594 if accounts_index_paths.is_empty() {
595 accounts_index_paths = vec![ledger_path.join("accounts_index")];
596 }
597 accounts_index_config.drives = Some(accounts_index_paths);
598 }
599
600 const MB: usize = 1_024 * 1_024;
601 accounts_index_config.scan_results_limit_bytes =
602 value_t!(matches, "accounts_index_scan_results_limit_mb", usize)
603 .ok()
604 .map(|mb| mb * MB);
605
606 let account_shrink_paths: Option<Vec<PathBuf>> =
607 values_t!(matches, "account_shrink_path", String)
608 .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
609 .ok();
610 let account_shrink_paths = account_shrink_paths
611 .as_ref()
612 .map(|paths| {
613 create_and_canonicalize_directories(paths)
614 .map_err(|err| format!("unable to access account shrink path: {err}"))
615 })
616 .transpose()?;
617
618 let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
619 .map(|paths| {
620 create_all_accounts_run_and_snapshot_dirs(&paths)
621 .map_err(|err| format!("unable to create account subdirectories: {err}"))
622 })
623 .transpose()?
624 .unzip();
625
626 let read_cache_limit_bytes =
627 values_of::<usize>(matches, "accounts_db_read_cache_limit").map(|limits| {
628 match limits.len() {
629 2 => (limits[0], limits[1]),
630 _ => {
631 unreachable!("invalid number of values given to accounts-db-read-cache-limit")
633 }
634 }
635 });
636
637 let storage_access = matches
638 .value_of("accounts_db_access_storages_method")
639 .map(|method| match method {
640 "mmap" => {
641 warn!("Using `mmap` for `--accounts-db-access-storages-method` is now deprecated.");
642 #[allow(deprecated)]
643 StorageAccess::Mmap
644 }
645 "file" => StorageAccess::File,
646 _ => {
647 unreachable!("invalid value given to accounts-db-access-storages-method")
649 }
650 })
651 .unwrap_or_default();
652
653 let scan_filter_for_shrinking = matches
654 .value_of("accounts_db_scan_filter_for_shrinking")
655 .map(|filter| match filter {
656 "all" => ScanFilter::All,
657 "only-abnormal" => ScanFilter::OnlyAbnormal,
658 "only-abnormal-with-verify" => ScanFilter::OnlyAbnormalWithVerify,
659 _ => {
660 unreachable!("invalid value given to accounts_db_scan_filter_for_shrinking")
662 }
663 })
664 .unwrap_or_default();
665
666 let mark_obsolete_accounts = matches
667 .value_of("accounts_db_mark_obsolete_accounts")
668 .map(|mark_obsolete_accounts| {
669 match mark_obsolete_accounts {
670 "enabled" => MarkObsoleteAccounts::Enabled,
671 "disabled" => MarkObsoleteAccounts::Disabled,
672 _ => {
673 unreachable!("invalid value given to accounts_db_mark_obsolete_accounts")
675 }
676 }
677 })
678 .unwrap_or_default();
679
680 let accounts_db_config = AccountsDbConfig {
681 index: Some(accounts_index_config),
682 account_indexes: Some(account_indexes.clone()),
683 bank_hash_details_dir: ledger_path.clone(),
684 shrink_paths: account_shrink_run_paths,
685 shrink_ratio,
686 read_cache_limit_bytes,
687 read_cache_evict_sample_size: None,
688 write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
689 .ok()
690 .map(|mb| mb * MB as u64),
691 ancient_append_vec_offset: value_t!(matches, "accounts_db_ancient_append_vecs", i64).ok(),
692 ancient_storage_ideal_size: value_t!(
693 matches,
694 "accounts_db_ancient_storage_ideal_size",
695 u64
696 )
697 .ok(),
698 max_ancient_storages: value_t!(matches, "accounts_db_max_ancient_storages", usize).ok(),
699 skip_initial_hash_calc: false,
700 exhaustively_verify_refcounts: matches.is_present("accounts_db_verify_refcounts"),
701 partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig::default(),
702 storage_access,
703 scan_filter_for_shrinking,
704 num_background_threads: Some(accounts_db_background_threads),
705 num_foreground_threads: Some(accounts_db_foreground_threads),
706 mark_obsolete_accounts,
707 use_registered_io_uring_buffers: resource_limits::check_memlock_limit_for_disk_io(
708 solana_accounts_db::accounts_db::TOTAL_IO_URING_BUFFERS_SIZE_LIMIT,
709 ),
710 snapshots_use_direct_io: !matches.is_present("no_accounts_db_snapshots_direct_io"),
711 };
712
713 let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
714 Some(
715 values_t_or_exit!(matches, "geyser_plugin_config", String)
716 .into_iter()
717 .map(PathBuf::from)
718 .collect(),
719 )
720 } else {
721 None
722 };
723 let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
724 || matches.is_present("geyser_plugin_always_enabled");
725
726 let account_paths: Vec<PathBuf> =
727 if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
728 account_paths
729 .join(",")
730 .split(',')
731 .map(PathBuf::from)
732 .collect()
733 } else {
734 vec![ledger_path.join("accounts")]
735 };
736 let account_paths = create_and_canonicalize_directories(account_paths)
737 .map_err(|err| format!("unable to access account path: {err}"))?;
738
739 let (account_run_paths, account_snapshot_paths) =
741 create_all_accounts_run_and_snapshot_dirs(&account_paths)
742 .map_err(|err| format!("unable to create account directories: {err}"))?;
743
744 let account_snapshot_paths =
746 if let Some(account_shrink_snapshot_paths) = account_shrink_snapshot_paths {
747 account_snapshot_paths
748 .into_iter()
749 .chain(account_shrink_snapshot_paths)
750 .collect()
751 } else {
752 account_snapshot_paths
753 };
754
755 let snapshot_config = new_snapshot_config(
756 matches,
757 &ledger_path,
758 &account_paths,
759 run_args.rpc_bootstrap_config.incremental_snapshot_fetch,
760 )?;
761
762 let use_snapshot_archives_at_startup = value_t_or_exit!(
763 matches,
764 use_snapshot_archives_at_startup::cli::NAME,
765 UseSnapshotArchivesAtStartup
766 );
767
768 let mut validator_config = ValidatorConfig {
769 logfile,
770 require_tower: matches.is_present("require_tower"),
771 tower_storage,
772 vote_history_storage,
773 max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
774 expected_genesis_hash: matches
775 .value_of("expected_genesis_hash")
776 .map(|s| Hash::from_str(s).unwrap()),
777 fixed_leader_schedule: None,
778 expected_bank_hash: matches
779 .value_of("expected_bank_hash")
780 .map(|s| Hash::from_str(s).unwrap()),
781 expected_shred_version,
782 new_hard_forks: hardforks_of(matches, "hard_forks"),
783 rpc_config: run_args.json_rpc_config,
784 on_start_geyser_plugin_config_files,
785 geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
786 rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
787 (
788 SocketAddr::new(rpc_bind_address, rpc_port),
789 SocketAddr::new(rpc_bind_address, rpc_port + 1),
790 )
794 }),
795 pubsub_config: run_args.pub_sub_config,
796 voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
797 wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
798 known_validators: run_args.known_validators,
799 repair_validators,
800 repair_whitelist,
801 repair_handler_type: RepairHandlerType::default(),
802 gossip_validators,
803 max_ledger_shreds,
804 blockstore_options: run_args.blockstore_options,
805 run_verification: !matches.is_present("skip_startup_ledger_verification"),
806 debug_keys,
807 warp_slot: None,
808 generator_config: None,
809 contact_debug_interval,
810 contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
811 send_transaction_service_config: run_args.send_transaction_service_config,
812 no_poh_speed_test: matches.is_present("no_poh_speed_test"),
813 no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
814 no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
815 no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
816 no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
817 enforce_ulimit_nofile: true,
820 poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core")
821 .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
822 poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch")
823 .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
824 process_ledger_before_services: matches.is_present("process_ledger_before_services"),
825 account_paths: account_run_paths,
826 account_snapshot_paths,
827 accounts_db_config,
828 accounts_db_skip_shrink: true,
829 accounts_db_force_initial_clean: matches.is_present("no_skip_initial_accounts_db_clean"),
830 snapshot_config,
831 no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
832 wait_to_vote_slot: None,
833 runtime_config: RuntimeConfig {
834 log_messages_bytes_limit: value_of(matches, "log_messages_bytes_limit"),
835 ..RuntimeConfig::default()
836 },
837 staked_nodes_overrides: staked_nodes_overrides.clone(),
838 use_snapshot_archives_at_startup,
839 ip_echo_server_threads,
840 rayon_global_threads,
841 replay_forks_threads,
842 replay_transactions_threads,
843 tvu_shred_sigverify_threads: tvu_sigverify_threads,
844 delay_leader_block_for_pending_fork: matches
845 .is_present("delay_leader_block_for_pending_fork"),
846 turbine_disabled: Arc::<AtomicBool>::default(),
847 broadcast_stage_type: BroadcastStageType::Standard,
848 block_verification_method: value_t_or_exit!(
849 matches,
850 "block_verification_method",
851 BlockVerificationMethod
852 ),
853 unified_scheduler_handler_threads: value_t!(
854 matches,
855 "unified_scheduler_handler_threads",
856 usize
857 )
858 .ok(),
859 block_production_method: value_t_or_exit!(
860 matches,
861 "block_production_method",
862 BlockProductionMethod
863 ),
864 block_production_num_workers,
865 block_production_scheduler_config: SchedulerConfig {
866 scheduler_pacing: value_t_or_exit!(
867 matches,
868 "block_production_pacing_fill_time_millis",
869 SchedulerPacing
870 ),
871 },
872 enable_block_production_forwarding: staked_nodes_overrides_path.is_some(),
873 enable_scheduler_bindings: matches.is_present("enable_scheduler_bindings"),
874 banking_trace_dir_byte_limit: parse_banking_trace_dir_byte_limit(matches),
875 validator_exit: Arc::new(RwLock::new(Exit::default())),
876 validator_exit_backpressure: [(
877 SnapshotPackagerService::NAME.to_string(),
878 Arc::new(AtomicBool::new(false)),
879 )]
880 .into(),
881 voting_service_test_override: None,
882 snapshot_packager_niceness_adj: value_t_or_exit!(
883 matches,
884 "snapshot_packager_niceness_adj",
885 i8
886 ),
887 };
888
889 let vote_account = pubkey_of(matches, "vote_account").unwrap_or_else(|| {
890 if !validator_config.voting_disabled {
891 warn!("--vote-account not specified, validator will not vote");
892 validator_config.voting_disabled = true;
893 }
894 Keypair::new().pubkey()
895 });
896
897 let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
898 let minimal_snapshot_download_speed =
899 value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32);
900 let maximum_snapshot_download_abort =
901 value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64);
902
903 let public_rpc_addr = matches
904 .value_of("public_rpc_addr")
905 .map(|addr| {
906 solana_net_utils::parse_host_port(addr)
907 .map_err(|err| format!("failed to parse public rpc address: {err}"))
908 })
909 .transpose()?;
910
911 if !matches.is_present("no_os_network_limits_test") {
912 if SystemMonitorService::check_os_network_limits() {
913 info!("OS network limits test passed.");
914 } else {
915 Err("OS network limit test failed. See \
916 https://docs.anza.xyz/operations/guides/validator-start#system-tuning"
917 .to_string())?;
918 }
919 }
920
921 let mut ledger_lock = ledger_lockfile(&ledger_path);
922 let _ledger_write_guard = lock_ledger(&ledger_path, &mut ledger_lock);
923
924 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
925 let admin_service_post_init = Arc::new(RwLock::new(None));
926 let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
927 if starting_with_geyser_plugins {
928 let (sender, receiver) = unbounded();
929 (Some(sender), Some(receiver))
930 } else {
931 (None, None)
932 };
933 admin_rpc_service::run(
934 &ledger_path,
935 admin_rpc_service::AdminRpcRequestMetadata {
936 rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
937 start_time: std::time::SystemTime::now(),
938 validator_exit: validator_config.validator_exit.clone(),
939 validator_exit_backpressure: validator_config.validator_exit_backpressure.clone(),
940 start_progress: start_progress.clone(),
941 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
942 post_init: admin_service_post_init.clone(),
943 tower_storage: validator_config.tower_storage.clone(),
944 staked_nodes_overrides,
945 rpc_to_plugin_manager_sender,
946 },
947 );
948
949 let tpu_max_connections_per_peer: Option<u64> = matches
950 .value_of("tpu_max_connections_per_peer")
951 .and_then(|v| v.parse().ok());
952 let tpu_max_connections_per_unstaked_peer = tpu_max_connections_per_peer
953 .unwrap_or_else(|| value_t_or_exit!(matches, "tpu_max_connections_per_unstaked_peer", u64));
954 let tpu_max_connections_per_staked_peer = tpu_max_connections_per_peer
955 .unwrap_or_else(|| value_t_or_exit!(matches, "tpu_max_connections_per_staked_peer", u64));
956 let tpu_max_staked_connections = value_t_or_exit!(matches, "tpu_max_staked_connections", u64);
957 let tpu_max_unstaked_connections =
958 value_t_or_exit!(matches, "tpu_max_unstaked_connections", u64);
959
960 let tpu_max_fwd_staked_connections =
961 value_t_or_exit!(matches, "tpu_max_fwd_staked_connections", u64);
962 let tpu_max_fwd_unstaked_connections =
963 value_t_or_exit!(matches, "tpu_max_fwd_unstaked_connections", u64);
964
965 let tpu_max_connections_per_ipaddr_per_minute: u64 =
966 value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64);
967 let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64);
968
969 let cluster_entrypoints = entrypoint_addrs
970 .iter()
971 .map(ContactInfo::new_gossip_entry_point)
972 .collect::<Vec<_>>();
973
974 if restricted_repair_only_mode {
975 node.info.remove_tpu();
979 node.info.remove_tpu_forwards();
980 node.info.remove_tvu();
981 node.info.remove_serve_repair();
982 node.info.remove_alpenglow();
983
984 node.sockets.ip_echo = None;
986 }
987
988 if !private_rpc {
989 macro_rules! set_socket {
990 ($method:ident, $addr:expr, $name:literal) => {
991 node.info.$method($addr).expect(&format!(
992 "Operator must spin up node with valid {} address",
993 $name
994 ))
995 };
996 }
997 if let Some(public_rpc_addr) = public_rpc_addr {
998 set_socket!(set_rpc, public_rpc_addr, "RPC");
999 set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub");
1000 } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
1001 let addr = node
1002 .info
1003 .gossip()
1004 .expect("Operator must spin up node with valid gossip address")
1005 .ip();
1006 set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC");
1007 set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub");
1008 }
1009 }
1010
1011 snapshot_utils::remove_tmp_snapshot_archives(
1012 &validator_config.snapshot_config.full_snapshot_archives_dir,
1013 );
1014 snapshot_utils::remove_tmp_snapshot_archives(
1015 &validator_config
1016 .snapshot_config
1017 .incremental_snapshot_archives_dir,
1018 );
1019
1020 let should_check_duplicate_instance = true;
1021 if !cluster_entrypoints.is_empty() {
1022 bootstrap::rpc_bootstrap(
1023 &node,
1024 &identity_keypair,
1025 &ledger_path,
1026 &vote_account,
1027 authorized_voter_keypairs.clone(),
1028 &cluster_entrypoints,
1029 &mut validator_config,
1030 run_args.rpc_bootstrap_config,
1031 do_port_check,
1032 use_progress_bar,
1033 maximum_local_snapshot_age,
1034 should_check_duplicate_instance,
1035 &start_progress,
1036 minimal_snapshot_download_speed,
1037 maximum_snapshot_download_abort,
1038 run_args.socket_addr_space,
1039 );
1040 *start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
1041 }
1042
1043 if operation == Operation::Initialize {
1044 info!("Validator ledger initialization complete");
1045 return Ok(());
1046 }
1047
1048 node.info.hot_swap_pubkey(identity_keypair.pubkey());
1054
1055 let tpu_quic_server_config = SwQosQuicStreamerConfig {
1056 quic_streamer_config: QuicStreamerConfig {
1057 max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1058 num_threads: tpu_transaction_receive_threads,
1059 ..Default::default()
1060 },
1061 qos_config: SwQosConfig {
1062 max_connections_per_unstaked_peer: tpu_max_connections_per_unstaked_peer
1063 .try_into()
1064 .unwrap(),
1065 max_connections_per_staked_peer: tpu_max_connections_per_staked_peer
1066 .try_into()
1067 .unwrap(),
1068 max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
1069 max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
1070 max_streams_per_ms,
1071 },
1072 };
1073
1074 let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
1075 quic_streamer_config: QuicStreamerConfig {
1076 max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1077 num_threads: tpu_transaction_forward_receive_threads,
1078 ..Default::default()
1079 },
1080 qos_config: SwQosConfig {
1081 max_connections_per_staked_peer: tpu_max_connections_per_staked_peer
1082 .try_into()
1083 .unwrap(),
1084 max_connections_per_unstaked_peer: tpu_max_connections_per_unstaked_peer
1085 .try_into()
1086 .unwrap(),
1087 max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
1088 max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
1089 max_streams_per_ms,
1090 },
1091 };
1092
1093 let vote_quic_server_config = SimpleQosQuicStreamerConfig {
1094 quic_streamer_config: QuicStreamerConfig {
1095 max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1096 num_threads: tpu_vote_transaction_receive_threads,
1097 ..Default::default()
1098 },
1099 qos_config: SimpleQosConfig {
1100 max_streams_per_second: MAX_VOTES_PER_SECOND,
1101 ..Default::default()
1102 },
1103 };
1104
1105 let validator = Validator::new_with_exit(
1106 node,
1107 identity_keypair,
1108 &ledger_path,
1109 &vote_account,
1110 authorized_voter_keypairs,
1111 cluster_entrypoints,
1112 &validator_config,
1113 should_check_duplicate_instance,
1114 rpc_to_plugin_manager_receiver,
1115 start_progress,
1116 run_args.socket_addr_space,
1117 ValidatorTpuConfig {
1118 vote_use_quic,
1119 tpu_connection_pool_size,
1120 tpu_quic_server_config,
1121 tpu_fwd_quic_server_config,
1122 vote_quic_server_config,
1123 sigverify_threads: tpu_sigverify_threads,
1124 },
1125 admin_service_post_init,
1126 maybe_xdp_retransmit_builder,
1127 exit,
1128 )
1129 .map_err(|err| format!("{err:?}"))?;
1130
1131 if let Some(filename) = init_complete_file {
1132 File::create(filename).map_err(|err| format!("unable to create {filename}: {err}"))?;
1133 }
1134 info!("Validator initialized");
1135 validator.listen_for_signals()?;
1136 validator.join();
1137 info!("Validator exiting..");
1138
1139 Ok(())
1140}
1141
1142fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
1144 if matches.is_present(name) {
1145 Some(values_t_or_exit!(matches, name, Slot))
1146 } else {
1147 None
1148 }
1149}
1150
1151fn validators_set(
1152 identity_pubkey: &Pubkey,
1153 matches: &ArgMatches<'_>,
1154 matches_name: &str,
1155 arg_name: &str,
1156) -> Result<Option<HashSet<Pubkey>>, String> {
1157 if matches.is_present(matches_name) {
1158 let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
1159 .into_iter()
1160 .collect();
1161 if validators_set.contains(identity_pubkey) {
1162 Err(format!(
1163 "the validator's identity pubkey cannot be a {arg_name}: {identity_pubkey}"
1164 ))?;
1165 }
1166 Ok(Some(validators_set))
1167 } else {
1168 Ok(None)
1169 }
1170}
1171
1172fn get_cluster_shred_version(entrypoints: &[SocketAddr], bind_address: IpAddr) -> Option<u16> {
1173 let entrypoints = {
1174 let mut index: Vec<_> = (0..entrypoints.len()).collect();
1175 index.shuffle(&mut rand::rng());
1176 index.into_iter().map(|i| &entrypoints[i])
1177 };
1178 for entrypoint in entrypoints {
1179 match solana_net_utils::get_cluster_shred_version_with_binding(entrypoint, bind_address) {
1180 Err(err) => eprintln!("get_cluster_shred_version failed: {entrypoint}, {err}"),
1181 Ok(0) => eprintln!("entrypoint {entrypoint} returned shred-version zero"),
1182 Ok(shred_version) => {
1183 info!("obtained shred-version {shred_version} from {entrypoint}");
1184 return Some(shred_version);
1185 }
1186 }
1187 }
1188 None
1189}
1190
1191fn parse_banking_trace_dir_byte_limit(matches: &ArgMatches) -> u64 {
1192 if matches.is_present("disable_banking_trace") {
1193 DISABLED_BAKING_TRACE_DIR
1197 } else {
1198 value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64)
1201 }
1202}
1203
1204fn new_snapshot_config(
1205 matches: &ArgMatches,
1206 ledger_path: &Path,
1207 account_paths: &[PathBuf],
1208 incremental_snapshot_fetch: bool,
1209) -> Result<SnapshotConfig, Box<dyn std::error::Error>> {
1210 let (full_snapshot_archive_interval, incremental_snapshot_archive_interval) =
1211 if matches.is_present("no_snapshots") {
1212 (SnapshotInterval::Disabled, SnapshotInterval::Disabled)
1214 } else {
1215 match (
1216 incremental_snapshot_fetch,
1217 value_t_or_exit!(matches, "snapshot_interval_slots", NonZeroU64),
1218 ) {
1219 (true, incremental_snapshot_interval_slots) => {
1220 let full_snapshot_interval_slots =
1223 value_t_or_exit!(matches, "full_snapshot_interval_slots", NonZeroU64);
1224 (
1225 SnapshotInterval::Slots(full_snapshot_interval_slots),
1226 SnapshotInterval::Slots(incremental_snapshot_interval_slots),
1227 )
1228 }
1229 (false, full_snapshot_interval_slots) => {
1230 if matches.occurrences_of("full_snapshot_interval_slots") > 0 {
1234 warn!(
1235 "Incremental snapshots are disabled, yet \
1236 --full-snapshot-interval-slots was specified! Note that \
1237 --full-snapshot-interval-slots is *ignored* when incremental \
1238 snapshots are disabled. Use --snapshot-interval-slots instead.",
1239 );
1240 }
1241 (
1242 SnapshotInterval::Slots(full_snapshot_interval_slots),
1243 SnapshotInterval::Disabled,
1244 )
1245 }
1246 }
1247 };
1248
1249 info!(
1250 "Snapshot configuration: full snapshot interval: {}, incremental snapshot interval: {}",
1251 match full_snapshot_archive_interval {
1252 SnapshotInterval::Disabled => "disabled".to_string(),
1253 SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1254 },
1255 match incremental_snapshot_archive_interval {
1256 SnapshotInterval::Disabled => "disabled".to_string(),
1257 SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1258 },
1259 );
1260 if let SnapshotInterval::Slots(full_snapshot_interval_slots) = full_snapshot_archive_interval {
1263 let full_snapshot_interval_slots = full_snapshot_interval_slots.get();
1264 if full_snapshot_interval_slots > DEFAULT_SLOTS_PER_EPOCH {
1265 warn!(
1266 "The full snapshot interval is excessively large: {full_snapshot_interval_slots}! \
1267 This will negatively impact the background cleanup tasks in accounts-db. \
1268 Consider a smaller value.",
1269 );
1270 }
1271 }
1272
1273 let snapshots_dir = matches
1274 .value_of("snapshots")
1275 .map(Path::new)
1276 .unwrap_or(ledger_path);
1277 let snapshots_dir = create_and_canonicalize_directory(snapshots_dir).map_err(|err| {
1278 format!(
1279 "failed to create snapshots directory '{}': {err}",
1280 snapshots_dir.display(),
1281 )
1282 })?;
1283 if account_paths
1284 .iter()
1285 .any(|account_path| account_path == &snapshots_dir)
1286 {
1287 Err(
1288 "the --accounts and --snapshots paths must be unique since they both create \
1289 'snapshots' subdirectories, otherwise there may be collisions"
1290 .to_string(),
1291 )?;
1292 }
1293
1294 let bank_snapshots_dir = snapshots_dir.join(BANK_SNAPSHOTS_DIR);
1295 fs::create_dir_all(&bank_snapshots_dir).map_err(|err| {
1296 format!(
1297 "failed to create bank snapshots directory '{}': {err}",
1298 bank_snapshots_dir.display(),
1299 )
1300 })?;
1301
1302 let full_snapshot_archives_dir = matches
1303 .value_of("full_snapshot_archive_path")
1304 .map(PathBuf::from)
1305 .unwrap_or_else(|| snapshots_dir.clone());
1306 fs::create_dir_all(&full_snapshot_archives_dir).map_err(|err| {
1307 format!(
1308 "failed to create full snapshot archives directory '{}': {err}",
1309 full_snapshot_archives_dir.display(),
1310 )
1311 })?;
1312
1313 let incremental_snapshot_archives_dir = matches
1314 .value_of("incremental_snapshot_archive_path")
1315 .map(PathBuf::from)
1316 .unwrap_or_else(|| snapshots_dir.clone());
1317 fs::create_dir_all(&incremental_snapshot_archives_dir).map_err(|err| {
1318 format!(
1319 "failed to create incremental snapshot archives directory '{}': {err}",
1320 incremental_snapshot_archives_dir.display(),
1321 )
1322 })?;
1323
1324 let archive_format = {
1325 let archive_format_str = value_t_or_exit!(matches, "snapshot_archive_format", String);
1326 let mut archive_format = ArchiveFormat::from_cli_arg(&archive_format_str)
1327 .unwrap_or_else(|| panic!("Archive format not recognized: {archive_format_str}"));
1328 if let ArchiveFormat::TarZstd { config } = &mut archive_format {
1329 config.compression_level =
1330 value_t_or_exit!(matches, "snapshot_zstd_compression_level", i32);
1331 }
1332 archive_format
1333 };
1334
1335 let snapshot_version = matches
1336 .value_of("snapshot_version")
1337 .map(|value| {
1338 value
1339 .parse::<SnapshotVersion>()
1340 .map_err(|err| format!("unable to parse snapshot version: {err}"))
1341 })
1342 .transpose()?
1343 .unwrap_or(SnapshotVersion::default());
1344
1345 let maximum_full_snapshot_archives_to_retain =
1346 value_t_or_exit!(matches, "maximum_full_snapshots_to_retain", NonZeroUsize);
1347 let maximum_incremental_snapshot_archives_to_retain = value_t_or_exit!(
1348 matches,
1349 "maximum_incremental_snapshots_to_retain",
1350 NonZeroUsize
1351 );
1352
1353 let snapshot_config = SnapshotConfig {
1354 usage: if full_snapshot_archive_interval == SnapshotInterval::Disabled {
1355 SnapshotUsage::LoadOnly
1356 } else {
1357 SnapshotUsage::LoadAndGenerate
1358 },
1359 full_snapshot_archive_interval,
1360 incremental_snapshot_archive_interval,
1361 bank_snapshots_dir,
1362 full_snapshot_archives_dir,
1363 incremental_snapshot_archives_dir,
1364 archive_format,
1365 snapshot_version,
1366 maximum_full_snapshot_archives_to_retain,
1367 maximum_incremental_snapshot_archives_to_retain,
1368 };
1369
1370 if !is_snapshot_config_valid(&snapshot_config) {
1371 Err(
1372 "invalid snapshot configuration provided: snapshot intervals are incompatible. full \
1373 snapshot interval MUST be larger than incremental snapshot interval (if enabled)"
1374 .to_string(),
1375 )?;
1376 }
1377
1378 Ok(snapshot_config)
1379}