Skip to main content

agave_validator/commands/run/
execute.rs

1use {
2    crate::{
3        admin_rpc_service::{self, StakedNodesOverrides, load_staked_nodes_overrides},
4        bootstrap,
5        cli::{self},
6        commands::{FromClapArgMatches, run::args::RunArgs},
7        ledger_lockfile, lock_ledger,
8    },
9    agave_snapshots::{
10        ArchiveFormat, SnapshotInterval, SnapshotVersion,
11        paths::BANK_SNAPSHOTS_DIR,
12        snapshot_config::{SnapshotConfig, SnapshotUsage},
13    },
14    agave_votor::vote_history_storage,
15    agave_xdp::{set_cpu_affinity, xdp_retransmitter::XdpConfig},
16    clap::{ArgMatches, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit},
17    crossbeam_channel::unbounded,
18    log::*,
19    rand::{rng, seq::SliceRandom},
20    solana_accounts_db::{
21        accounts_db::{AccountShrinkThreshold, AccountsDbConfig, MarkObsoleteAccounts},
22        accounts_file::StorageAccess,
23        accounts_index::{
24            AccountSecondaryIndexes, AccountsIndexConfig, DEFAULT_NUM_ENTRIES_OVERHEAD,
25            DEFAULT_NUM_ENTRIES_TO_EVICT, IndexLimit, IndexLimitThreshold, ScanFilter,
26        },
27        partitioned_rewards::PartitionedEpochRewardsConfig,
28        utils::{
29            create_all_accounts_run_and_snapshot_dirs, create_and_canonicalize_directories,
30            create_and_canonicalize_directory,
31        },
32    },
33    solana_clap_utils::input_parsers::{
34        keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of,
35    },
36    solana_clock::{DEFAULT_SLOTS_PER_EPOCH, Slot},
37    solana_core::{
38        banking_stage::transaction_scheduler::scheduler_controller::SchedulerConfig,
39        banking_trace::DISABLED_BAKING_TRACE_DIR,
40        consensus::tower_storage,
41        repair::repair_handler::RepairHandlerType,
42        resource_limits,
43        snapshot_packager_service::SnapshotPackagerService,
44        system_monitor_service::SystemMonitorService,
45        tpu::MAX_VOTES_PER_SECOND,
46        validator::{
47            BlockProductionMethod, BlockVerificationMethod, SchedulerPacing, Validator,
48            ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig, is_snapshot_config_valid,
49        },
50    },
51    solana_genesis_utils::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
52    solana_gossip::{
53        cluster_info::{DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, NodeConfig},
54        contact_info::ContactInfo,
55        node::Node,
56    },
57    solana_hash::Hash,
58    solana_keypair::Keypair,
59    solana_ledger::{
60        blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
61        use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
62    },
63    solana_net_utils::multihomed_sockets::BindIpAddrs,
64    solana_poh::poh_service,
65    solana_pubkey::Pubkey,
66    solana_runtime::{runtime_config::RuntimeConfig, snapshot_utils},
67    solana_signer::Signer,
68    solana_streamer::{
69        nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
70        quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
71    },
72    solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
73    solana_turbine::broadcast_stage::BroadcastStageType,
74    solana_validator_exit::Exit,
75    std::{
76        collections::HashSet,
77        env,
78        fs::{self, File},
79        net::{IpAddr, Ipv4Addr, SocketAddr},
80        num::{NonZeroU64, NonZeroUsize},
81        path::{Path, PathBuf},
82        str::{self, FromStr},
83        sync::{Arc, RwLock, atomic::AtomicBool},
84    },
85};
86
87#[derive(Debug, PartialEq, Eq)]
88pub enum Operation {
89    Initialize,
90    Run,
91}
92
93pub fn execute(
94    matches: &ArgMatches,
95    solana_version: &str,
96    operation: Operation,
97) -> Result<(), Box<dyn std::error::Error>> {
98    // Debugging panics is easier with a backtrace
99    if env::var_os("RUST_BACKTRACE").is_none() {
100        // Safety: env update is made before any spawned threads might access the environment
101        unsafe { env::set_var("RUST_BACKTRACE", "1") }
102    }
103
104    let run_args = RunArgs::from_clap_arg_match(matches)?;
105
106    let cli::thread_args::NumThreadConfig {
107        accounts_db_background_threads,
108        accounts_db_foreground_threads,
109        accounts_index_flush_threads,
110        block_production_num_workers,
111        ip_echo_server_threads,
112        rayon_global_threads,
113        replay_forks_threads,
114        replay_transactions_threads,
115        tpu_sigverify_threads,
116        tpu_transaction_forward_receive_threads,
117        tpu_transaction_receive_threads,
118        tpu_vote_transaction_receive_threads,
119        tvu_receive_threads,
120        tvu_retransmit_threads,
121        tvu_sigverify_threads,
122    } = cli::thread_args::parse_num_threads_args(matches);
123
124    let identity_keypair = Arc::new(run_args.identity_keypair);
125
126    let logfile = run_args.logfile;
127    if let Some(logfile) = logfile.as_ref() {
128        println!("log file: {}", logfile.display());
129    }
130    let use_progress_bar = logfile.is_none();
131    agave_logger::initialize_logging(logfile.clone());
132    cli::warn_for_deprecated_arguments(matches);
133
134    info!("{} {}", crate_name!(), solana_version);
135    info!("Starting validator with: {:#?}", std::env::args_os());
136
137    solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
138    solana_metrics::set_panic_hook("validator", Some(String::from(solana_version)));
139    solana_entry::entry::init_poh();
140
141    let bind_addresses = {
142        let parsed = matches
143            .values_of("bind_address")
144            .expect("bind_address should always be present due to default")
145            .map(solana_net_utils::parse_host)
146            .collect::<Result<Vec<_>, _>>()?;
147        BindIpAddrs::new(parsed).map_err(|err| format!("invalid bind_addresses: {err}"))?
148    };
149
150    let entrypoint_addrs = run_args.entrypoints;
151    for addr in &entrypoint_addrs {
152        if !run_args.socket_addr_space.check(addr) {
153            Err(format!("invalid entrypoint address: {addr}"))?;
154        }
155    }
156
157    let xdp_interface = matches.value_of("retransmit_xdp_interface");
158    let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
159    let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
160        XdpConfig::new(
161            xdp_interface,
162            parse_cpu_ranges(cpus).unwrap(),
163            xdp_zero_copy,
164        )
165    });
166
167    let dynamic_port_range =
168        solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
169            .expect("invalid dynamic_port_range");
170
171    let advertised_ip = matches
172        .value_of("advertised_ip")
173        .map(|advertised_ip| {
174            solana_net_utils::parse_host(advertised_ip)
175                .map_err(|err| format!("failed to parse --advertised-ip: {err}"))
176        })
177        .transpose()?;
178
179    let advertised_ip = if let Some(cli_ip) = advertised_ip {
180        cli_ip
181    } else if !bind_addresses.active().is_unspecified() && !bind_addresses.active().is_loopback() {
182        bind_addresses.active()
183    } else if !entrypoint_addrs.is_empty() {
184        let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
185        order.shuffle(&mut rng());
186
187        order
188            .into_iter()
189            .find_map(|i| {
190                let entrypoint_addr = &entrypoint_addrs[i];
191                info!(
192                    "Contacting {entrypoint_addr} to determine the validator's public IP address"
193                );
194                solana_net_utils::get_public_ip_addr_with_binding(
195                    entrypoint_addr,
196                    bind_addresses.active(),
197                )
198                .map_or_else(
199                    |err| {
200                        warn!("Failed to contact cluster entrypoint {entrypoint_addr}: {err}");
201                        None
202                    },
203                    Some,
204                )
205            })
206            .ok_or_else(|| "unable to determine the validator's public IP address".to_string())?
207    } else {
208        IpAddr::V4(Ipv4Addr::LOCALHOST)
209    };
210    let gossip_port = value_t!(matches, "gossip_port", u16).or_else(|_| {
211        solana_net_utils::find_available_port_in_range(bind_addresses.active(), (0, 1))
212            .map_err(|err| format!("unable to find an available gossip port: {err}"))
213    })?;
214
215    let public_tpu_addr = matches
216        .value_of("public_tpu_addr")
217        .map(|public_tpu_addr| {
218            solana_net_utils::parse_host_port(public_tpu_addr)
219                .map_err(|err| format!("failed to parse --public-tpu-address: {err}"))
220        })
221        .transpose()?;
222
223    let public_tpu_forwards_addr = matches
224        .value_of("public_tpu_forwards_addr")
225        .map(|public_tpu_forwards_addr| {
226            solana_net_utils::parse_host_port(public_tpu_forwards_addr)
227                .map_err(|err| format!("failed to parse --public-tpu-forwards-address: {err}"))
228        })
229        .transpose()?;
230
231    let public_tvu_addr = matches
232        .value_of("public_tvu_addr")
233        .map(|public_tvu_addr| {
234            solana_net_utils::parse_host_port(public_tvu_addr)
235                .map_err(|err| format!("failed to parse --public-tvu-address: {err}"))
236        })
237        .transpose()?;
238
239    if bind_addresses.len() > 1 && public_tvu_addr.is_some() {
240        Err(String::from(
241            "--public-tvu-address can not be used in a multihoming context",
242        ))?;
243    }
244
245    let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
246
247    let node_config = NodeConfig {
248        advertised_ip,
249        gossip_port,
250        port_range: dynamic_port_range,
251        bind_ip_addrs: bind_addresses.clone(),
252        public_tpu_addr,
253        public_tpu_forwards_addr,
254        public_tvu_addr,
255        num_tvu_receive_sockets: tvu_receive_threads,
256        num_tvu_retransmit_sockets: tvu_retransmit_threads,
257        num_quic_endpoints,
258    };
259
260    let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), node_config);
261
262    let exit = Arc::new(AtomicBool::new(false));
263
264    #[cfg(target_os = "linux")]
265    let maybe_xdp_retransmit_builder = {
266        use {
267            agave_xdp::xdp_retransmitter::{XdpRetransmitBuilder, master_ip_if_bonded},
268            caps::{
269                CapSet,
270                Capability::{CAP_BPF, CAP_NET_ADMIN, CAP_NET_RAW, CAP_PERFMON, CAP_SYS_NICE},
271            },
272        };
273
274        let mut required_caps = HashSet::new();
275        let mut retained_caps = HashSet::new();
276        let supported_caps = HashSet::from_iter([
277            CAP_BPF,
278            CAP_NET_ADMIN,
279            CAP_NET_RAW,
280            CAP_PERFMON,
281            CAP_SYS_NICE,
282        ]);
283
284        if let Some(xdp_config) = retransmit_xdp.as_ref() {
285            required_caps.insert(CAP_NET_ADMIN);
286            required_caps.insert(CAP_NET_RAW);
287            if xdp_config.zero_copy {
288                required_caps.insert(CAP_BPF);
289                required_caps.insert(CAP_PERFMON);
290            }
291        }
292
293        let snapshot_packager_niceness_adj =
294            value_t_or_exit!(matches, "snapshot_packager_niceness_adj", i8);
295
296        if snapshot_packager_niceness_adj != 0 || run_args.json_rpc_config.rpc_niceness_adj != 0 {
297            required_caps.insert(CAP_SYS_NICE);
298            retained_caps.insert(CAP_SYS_NICE);
299        }
300
301        // lazy dev check
302        assert!(
303            required_caps.is_subset(&supported_caps),
304            "required_caps contains a cap not in supported_caps",
305        );
306
307        // validate and minimize the permitted set
308        let current_permitted =
309            caps::read(None, CapSet::Permitted).expect("permitted capset to be readable");
310        let missing_caps = required_caps
311            .difference(&current_permitted)
312            .collect::<Vec<_>>();
313        if !missing_caps.is_empty() {
314            error!(
315                "the current configuration requires the following capabilities, which have not \
316                 been permitted to the current process: {missing_caps:?}",
317            );
318            std::process::exit(1);
319        }
320        // warn about extraneous caps that no configuration requires
321        let extra_caps = current_permitted
322            .difference(&supported_caps)
323            .collect::<Vec<_>>();
324        if !extra_caps.is_empty() {
325            warn!(
326                "dropping extraneous capabilities ({extra_caps:?}) from the current process. \
327                 consider removing them from your operational configuration.",
328            );
329        }
330        // drop all caps that the current configuration does not require
331        caps::set(None, CapSet::Permitted, &required_caps)
332            .expect("permitted capset to be writable");
333
334        // XDP _MUST_ be setup _BEFORE_ the app spawns any threads to ensure linux
335        // capabilities do not leak, leaving the process in a state where it could
336        // potentially be used as a privilege escalation gadget
337        let maybe_xdp_retransmit_builder = retransmit_xdp.clone().map(|xdp_config| {
338            let src_port = node.sockets.retransmit_sockets[0]
339                .local_addr()
340                .expect("failed to get local address")
341                .port();
342            let src_ip = match node.bind_ip_addrs.active() {
343                IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
344                IpAddr::V4(_unspecified) => xdp_config
345                    .interface
346                    .as_ref()
347                    .and_then(|iface| master_ip_if_bonded(iface)),
348                _ => panic!("IPv6 not supported"),
349            };
350            XdpRetransmitBuilder::new(xdp_config, src_port, src_ip, exit.clone())
351                .expect("failed to create xdp retransmitter")
352        });
353
354        // we're done with caps needed to init xdp now. remove them from our process
355        caps::set(None, CapSet::Permitted, &retained_caps)
356            .expect("linux allows permitted capset to be set");
357
358        maybe_xdp_retransmit_builder
359    };
360
361    #[cfg(not(target_os = "linux"))]
362    let maybe_xdp_retransmit_builder = None;
363
364    let reserved = retransmit_xdp
365        .map(|xdp| xdp.cpus.clone())
366        .unwrap_or_default()
367        .iter()
368        .cloned()
369        .collect::<HashSet<_>>();
370    if !reserved.is_empty() {
371        let available = core_affinity::get_core_ids()
372            .unwrap_or_default()
373            .into_iter()
374            .map(|core_id| core_id.id)
375            .collect::<HashSet<_>>();
376        let available = available.difference(&reserved);
377        set_cpu_affinity(available.into_iter().copied()).unwrap();
378    }
379
380    solana_core::validator::report_target_features();
381
382    let authorized_voter_keypairs = keypairs_of(matches, "authorized_voter_keypairs")
383        .map(|keypairs| keypairs.into_iter().map(Arc::new).collect())
384        .unwrap_or_else(|| vec![Arc::new(keypair_of(matches, "identity").expect("identity"))]);
385    let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
386
387    let staked_nodes_overrides_path = matches
388        .value_of("staked_nodes_overrides")
389        .map(str::to_string);
390    let staked_nodes_overrides = Arc::new(RwLock::new(
391        match &staked_nodes_overrides_path {
392            None => StakedNodesOverrides::default(),
393            Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| {
394                error!("Failed to load stake-nodes-overrides from {p}: {err}");
395                clap::Error::with_description(
396                    "Failed to load configuration of stake-nodes-overrides argument",
397                    clap::ErrorKind::InvalidValue,
398                )
399                .exit()
400            }),
401        }
402        .staked_map_id,
403    ));
404
405    let init_complete_file = matches.value_of("init_complete_file");
406
407    let private_rpc = matches.is_present("private_rpc");
408    let do_port_check = !matches.is_present("no_port_check");
409
410    let ledger_path = run_args.ledger_path;
411
412    let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
413        let limit_ledger_size = match matches.value_of("limit_ledger_size") {
414            Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
415            None => DEFAULT_MAX_LEDGER_SHREDS,
416        };
417        if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
418            Err(format!(
419                "The provided --limit-ledger-size value was too small, the minimum value is \
420                 {DEFAULT_MIN_MAX_LEDGER_SHREDS}"
421            ))?;
422        }
423        Some(limit_ledger_size)
424    } else {
425        None
426    };
427
428    let debug_keys: Option<Arc<HashSet<_>>> = if matches.is_present("debug_key") {
429        Some(Arc::new(
430            values_t_or_exit!(matches, "debug_key", Pubkey)
431                .into_iter()
432                .collect(),
433        ))
434    } else {
435        None
436    };
437
438    let repair_validators = validators_set(
439        &identity_keypair.pubkey(),
440        matches,
441        "repair_validators",
442        "--repair-validator",
443    )?;
444    let repair_whitelist = validators_set(
445        &identity_keypair.pubkey(),
446        matches,
447        "repair_whitelist",
448        "--repair-whitelist",
449    )?;
450    let repair_whitelist = Arc::new(RwLock::new(repair_whitelist.unwrap_or_default()));
451    let gossip_validators = validators_set(
452        &identity_keypair.pubkey(),
453        matches,
454        "gossip_validators",
455        "--gossip-validator",
456    )?;
457
458    if bind_addresses.len() > 1 {
459        for (flag, msg) in [
460            (
461                "advertised_ip",
462                "--advertised-ip cannot be used in a multihoming context. In multihoming, the \
463                 validator will advertise the first --bind-address as this node's public IP \
464                 address.",
465            ),
466            (
467                "public_tpu_addr",
468                "--public-tpu-address can not be used in a multihoming context",
469            ),
470        ] {
471            if matches.is_present(flag) {
472                Err(String::from(msg))?;
473            }
474        }
475    }
476
477    let rpc_bind_address = if matches.is_present("rpc_bind_address") {
478        solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap())
479            .expect("invalid rpc_bind_address")
480    } else if private_rpc {
481        solana_net_utils::parse_host("127.0.0.1").unwrap()
482    } else {
483        bind_addresses.active()
484    };
485
486    let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
487
488    let account_indexes = AccountSecondaryIndexes::from_clap_arg_match(matches)?;
489
490    let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
491    let accounts_shrink_optimize_total_space =
492        value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
493    let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool);
494
495    let tpu_connection_pool_size = matches
496        .value_of("tpu_connection_pool_size")
497        .unwrap_or("")
498        .parse()
499        .unwrap_or(DEFAULT_TPU_CONNECTION_POOL_SIZE);
500
501    let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
502    if !(0.0..=1.0).contains(&shrink_ratio) {
503        Err(format!(
504            "the specified account-shrink-ratio is invalid, it must be between 0. and 1.0 \
505             inclusive: {shrink_ratio}"
506        ))?;
507    }
508
509    let shrink_ratio = if accounts_shrink_optimize_total_space {
510        AccountShrinkThreshold::TotalSpace { shrink_ratio }
511    } else {
512        AccountShrinkThreshold::IndividualStore { shrink_ratio }
513    };
514    // TODO: Once entrypoints are updated to return shred-version, this should
515    // abort if it fails to obtain a shred-version, so that nodes always join
516    // gossip with a valid shred-version. The code to adopt entrypoint shred
517    // version can then be deleted from gossip and get_rpc_node above.
518    let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
519        .ok()
520        .or_else(|| get_cluster_shred_version(&entrypoint_addrs, bind_addresses.active()));
521
522    let tower_path = value_t!(matches, "tower", PathBuf)
523        .ok()
524        .unwrap_or_else(|| ledger_path.clone());
525    let tower_storage: Arc<dyn tower_storage::TowerStorage> =
526        Arc::new(tower_storage::FileTowerStorage::new(tower_path));
527
528    let vote_history_storage: Arc<dyn vote_history_storage::VoteHistoryStorage> = Arc::new(
529        vote_history_storage::FileVoteHistoryStorage::new(ledger_path.clone()),
530    );
531
532    let accounts_index_limit =
533        value_t!(matches, "accounts_index_limit", String).unwrap_or_else(|err| err.exit());
534    let index_limit = {
535        enum CliIndexLimit {
536            Minimal,
537            Unlimited,
538            Threshold(u64),
539        }
540        let cli_index_limit = match accounts_index_limit.as_str() {
541            "minimal" => CliIndexLimit::Minimal,
542            "unlimited" => CliIndexLimit::Unlimited,
543            "25GB" => CliIndexLimit::Threshold(25_000_000_000),
544            "50GB" => CliIndexLimit::Threshold(50_000_000_000),
545            "100GB" => CliIndexLimit::Threshold(100_000_000_000),
546            "200GB" => CliIndexLimit::Threshold(200_000_000_000),
547            "400GB" => CliIndexLimit::Threshold(400_000_000_000),
548            "800GB" => CliIndexLimit::Threshold(800_000_000_000),
549            x => {
550                // clap will enforce only the above values are possible
551                unreachable!("invalid value given to `--accounts-index-limit`: '{x}'")
552            }
553        };
554        match cli_index_limit {
555            CliIndexLimit::Minimal => IndexLimit::Minimal,
556            CliIndexLimit::Unlimited => IndexLimit::InMemOnly,
557            CliIndexLimit::Threshold(num_bytes) => IndexLimit::Threshold(IndexLimitThreshold {
558                num_bytes,
559                num_entries_overhead: DEFAULT_NUM_ENTRIES_OVERHEAD,
560                num_entries_to_evict: DEFAULT_NUM_ENTRIES_TO_EVICT,
561            }),
562        }
563    };
564    // Note: need to still handle --enable-accounts-disk-index until it is removed
565    let index_limit = if matches.is_present("enable_accounts_disk_index") {
566        IndexLimit::Minimal
567    } else {
568        index_limit
569    };
570
571    let mut accounts_index_config = AccountsIndexConfig {
572        num_flush_threads: Some(accounts_index_flush_threads),
573        index_limit,
574        ..AccountsIndexConfig::default()
575    };
576    if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
577        accounts_index_config.bins = Some(bins);
578    }
579    if let Ok(num_initial_accounts) =
580        value_t!(matches, "accounts_index_initial_accounts_count", usize)
581    {
582        accounts_index_config.num_initial_accounts = Some(num_initial_accounts);
583    }
584
585    {
586        let mut accounts_index_paths: Vec<PathBuf> = if matches.is_present("accounts_index_path") {
587            values_t_or_exit!(matches, "accounts_index_path", String)
588                .into_iter()
589                .map(PathBuf::from)
590                .collect()
591        } else {
592            vec![]
593        };
594        if accounts_index_paths.is_empty() {
595            accounts_index_paths = vec![ledger_path.join("accounts_index")];
596        }
597        accounts_index_config.drives = Some(accounts_index_paths);
598    }
599
600    const MB: usize = 1_024 * 1_024;
601    accounts_index_config.scan_results_limit_bytes =
602        value_t!(matches, "accounts_index_scan_results_limit_mb", usize)
603            .ok()
604            .map(|mb| mb * MB);
605
606    let account_shrink_paths: Option<Vec<PathBuf>> =
607        values_t!(matches, "account_shrink_path", String)
608            .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
609            .ok();
610    let account_shrink_paths = account_shrink_paths
611        .as_ref()
612        .map(|paths| {
613            create_and_canonicalize_directories(paths)
614                .map_err(|err| format!("unable to access account shrink path: {err}"))
615        })
616        .transpose()?;
617
618    let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
619        .map(|paths| {
620            create_all_accounts_run_and_snapshot_dirs(&paths)
621                .map_err(|err| format!("unable to create account subdirectories: {err}"))
622        })
623        .transpose()?
624        .unzip();
625
626    let read_cache_limit_bytes =
627        values_of::<usize>(matches, "accounts_db_read_cache_limit").map(|limits| {
628            match limits.len() {
629                2 => (limits[0], limits[1]),
630                _ => {
631                    // clap will enforce two values are given
632                    unreachable!("invalid number of values given to accounts-db-read-cache-limit")
633                }
634            }
635        });
636
637    let storage_access = matches
638        .value_of("accounts_db_access_storages_method")
639        .map(|method| match method {
640            "mmap" => {
641                warn!("Using `mmap` for `--accounts-db-access-storages-method` is now deprecated.");
642                #[allow(deprecated)]
643                StorageAccess::Mmap
644            }
645            "file" => StorageAccess::File,
646            _ => {
647                // clap will enforce one of the above values is given
648                unreachable!("invalid value given to accounts-db-access-storages-method")
649            }
650        })
651        .unwrap_or_default();
652
653    let scan_filter_for_shrinking = matches
654        .value_of("accounts_db_scan_filter_for_shrinking")
655        .map(|filter| match filter {
656            "all" => ScanFilter::All,
657            "only-abnormal" => ScanFilter::OnlyAbnormal,
658            "only-abnormal-with-verify" => ScanFilter::OnlyAbnormalWithVerify,
659            _ => {
660                // clap will enforce one of the above values is given
661                unreachable!("invalid value given to accounts_db_scan_filter_for_shrinking")
662            }
663        })
664        .unwrap_or_default();
665
666    let mark_obsolete_accounts = matches
667        .value_of("accounts_db_mark_obsolete_accounts")
668        .map(|mark_obsolete_accounts| {
669            match mark_obsolete_accounts {
670                "enabled" => MarkObsoleteAccounts::Enabled,
671                "disabled" => MarkObsoleteAccounts::Disabled,
672                _ => {
673                    // clap will enforce one of the above values is given
674                    unreachable!("invalid value given to accounts_db_mark_obsolete_accounts")
675                }
676            }
677        })
678        .unwrap_or_default();
679
680    let accounts_db_config = AccountsDbConfig {
681        index: Some(accounts_index_config),
682        account_indexes: Some(account_indexes.clone()),
683        bank_hash_details_dir: ledger_path.clone(),
684        shrink_paths: account_shrink_run_paths,
685        shrink_ratio,
686        read_cache_limit_bytes,
687        read_cache_evict_sample_size: None,
688        write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
689            .ok()
690            .map(|mb| mb * MB as u64),
691        ancient_append_vec_offset: value_t!(matches, "accounts_db_ancient_append_vecs", i64).ok(),
692        ancient_storage_ideal_size: value_t!(
693            matches,
694            "accounts_db_ancient_storage_ideal_size",
695            u64
696        )
697        .ok(),
698        max_ancient_storages: value_t!(matches, "accounts_db_max_ancient_storages", usize).ok(),
699        skip_initial_hash_calc: false,
700        exhaustively_verify_refcounts: matches.is_present("accounts_db_verify_refcounts"),
701        partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig::default(),
702        storage_access,
703        scan_filter_for_shrinking,
704        num_background_threads: Some(accounts_db_background_threads),
705        num_foreground_threads: Some(accounts_db_foreground_threads),
706        mark_obsolete_accounts,
707        use_registered_io_uring_buffers: resource_limits::check_memlock_limit_for_disk_io(
708            solana_accounts_db::accounts_db::TOTAL_IO_URING_BUFFERS_SIZE_LIMIT,
709        ),
710        snapshots_use_direct_io: !matches.is_present("no_accounts_db_snapshots_direct_io"),
711    };
712
713    let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
714        Some(
715            values_t_or_exit!(matches, "geyser_plugin_config", String)
716                .into_iter()
717                .map(PathBuf::from)
718                .collect(),
719        )
720    } else {
721        None
722    };
723    let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
724        || matches.is_present("geyser_plugin_always_enabled");
725
726    let account_paths: Vec<PathBuf> =
727        if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
728            account_paths
729                .join(",")
730                .split(',')
731                .map(PathBuf::from)
732                .collect()
733        } else {
734            vec![ledger_path.join("accounts")]
735        };
736    let account_paths = create_and_canonicalize_directories(account_paths)
737        .map_err(|err| format!("unable to access account path: {err}"))?;
738
739    // From now on, use run/ paths in the same way as the previous account_paths.
740    let (account_run_paths, account_snapshot_paths) =
741        create_all_accounts_run_and_snapshot_dirs(&account_paths)
742            .map_err(|err| format!("unable to create account directories: {err}"))?;
743
744    // These snapshot paths are only used for initial clean up, add in shrink paths if they exist.
745    let account_snapshot_paths =
746        if let Some(account_shrink_snapshot_paths) = account_shrink_snapshot_paths {
747            account_snapshot_paths
748                .into_iter()
749                .chain(account_shrink_snapshot_paths)
750                .collect()
751        } else {
752            account_snapshot_paths
753        };
754
755    let snapshot_config = new_snapshot_config(
756        matches,
757        &ledger_path,
758        &account_paths,
759        run_args.rpc_bootstrap_config.incremental_snapshot_fetch,
760    )?;
761
762    let use_snapshot_archives_at_startup = value_t_or_exit!(
763        matches,
764        use_snapshot_archives_at_startup::cli::NAME,
765        UseSnapshotArchivesAtStartup
766    );
767
768    let mut validator_config = ValidatorConfig {
769        logfile,
770        require_tower: matches.is_present("require_tower"),
771        tower_storage,
772        vote_history_storage,
773        max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
774        expected_genesis_hash: matches
775            .value_of("expected_genesis_hash")
776            .map(|s| Hash::from_str(s).unwrap()),
777        fixed_leader_schedule: None,
778        expected_bank_hash: matches
779            .value_of("expected_bank_hash")
780            .map(|s| Hash::from_str(s).unwrap()),
781        expected_shred_version,
782        new_hard_forks: hardforks_of(matches, "hard_forks"),
783        rpc_config: run_args.json_rpc_config,
784        on_start_geyser_plugin_config_files,
785        geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
786        rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
787            (
788                SocketAddr::new(rpc_bind_address, rpc_port),
789                SocketAddr::new(rpc_bind_address, rpc_port + 1),
790                // If additional ports are added, +2 needs to be skipped to avoid a conflict with
791                // the websocket port (which is +2) in web3.js This odd port shifting is tracked at
792                // https://github.com/solana-labs/solana/issues/12250
793            )
794        }),
795        pubsub_config: run_args.pub_sub_config,
796        voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
797        wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
798        known_validators: run_args.known_validators,
799        repair_validators,
800        repair_whitelist,
801        repair_handler_type: RepairHandlerType::default(),
802        gossip_validators,
803        max_ledger_shreds,
804        blockstore_options: run_args.blockstore_options,
805        run_verification: !matches.is_present("skip_startup_ledger_verification"),
806        debug_keys,
807        warp_slot: None,
808        generator_config: None,
809        contact_debug_interval,
810        contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
811        send_transaction_service_config: run_args.send_transaction_service_config,
812        no_poh_speed_test: matches.is_present("no_poh_speed_test"),
813        no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
814        no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
815        no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
816        no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
817        // The validator needs to open many files, check that the process has
818        // permission to do so in order to fail quickly and give a direct error
819        enforce_ulimit_nofile: true,
820        poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core")
821            .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
822        poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch")
823            .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
824        process_ledger_before_services: matches.is_present("process_ledger_before_services"),
825        account_paths: account_run_paths,
826        account_snapshot_paths,
827        accounts_db_config,
828        accounts_db_skip_shrink: true,
829        accounts_db_force_initial_clean: matches.is_present("no_skip_initial_accounts_db_clean"),
830        snapshot_config,
831        no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
832        wait_to_vote_slot: None,
833        runtime_config: RuntimeConfig {
834            log_messages_bytes_limit: value_of(matches, "log_messages_bytes_limit"),
835            ..RuntimeConfig::default()
836        },
837        staked_nodes_overrides: staked_nodes_overrides.clone(),
838        use_snapshot_archives_at_startup,
839        ip_echo_server_threads,
840        rayon_global_threads,
841        replay_forks_threads,
842        replay_transactions_threads,
843        tvu_shred_sigverify_threads: tvu_sigverify_threads,
844        delay_leader_block_for_pending_fork: matches
845            .is_present("delay_leader_block_for_pending_fork"),
846        turbine_disabled: Arc::<AtomicBool>::default(),
847        broadcast_stage_type: BroadcastStageType::Standard,
848        block_verification_method: value_t_or_exit!(
849            matches,
850            "block_verification_method",
851            BlockVerificationMethod
852        ),
853        unified_scheduler_handler_threads: value_t!(
854            matches,
855            "unified_scheduler_handler_threads",
856            usize
857        )
858        .ok(),
859        block_production_method: value_t_or_exit!(
860            matches,
861            "block_production_method",
862            BlockProductionMethod
863        ),
864        block_production_num_workers,
865        block_production_scheduler_config: SchedulerConfig {
866            scheduler_pacing: value_t_or_exit!(
867                matches,
868                "block_production_pacing_fill_time_millis",
869                SchedulerPacing
870            ),
871        },
872        enable_block_production_forwarding: staked_nodes_overrides_path.is_some(),
873        enable_scheduler_bindings: matches.is_present("enable_scheduler_bindings"),
874        banking_trace_dir_byte_limit: parse_banking_trace_dir_byte_limit(matches),
875        validator_exit: Arc::new(RwLock::new(Exit::default())),
876        validator_exit_backpressure: [(
877            SnapshotPackagerService::NAME.to_string(),
878            Arc::new(AtomicBool::new(false)),
879        )]
880        .into(),
881        voting_service_test_override: None,
882        snapshot_packager_niceness_adj: value_t_or_exit!(
883            matches,
884            "snapshot_packager_niceness_adj",
885            i8
886        ),
887    };
888
889    let vote_account = pubkey_of(matches, "vote_account").unwrap_or_else(|| {
890        if !validator_config.voting_disabled {
891            warn!("--vote-account not specified, validator will not vote");
892            validator_config.voting_disabled = true;
893        }
894        Keypair::new().pubkey()
895    });
896
897    let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
898    let minimal_snapshot_download_speed =
899        value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32);
900    let maximum_snapshot_download_abort =
901        value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64);
902
903    let public_rpc_addr = matches
904        .value_of("public_rpc_addr")
905        .map(|addr| {
906            solana_net_utils::parse_host_port(addr)
907                .map_err(|err| format!("failed to parse public rpc address: {err}"))
908        })
909        .transpose()?;
910
911    if !matches.is_present("no_os_network_limits_test") {
912        if SystemMonitorService::check_os_network_limits() {
913            info!("OS network limits test passed.");
914        } else {
915            Err("OS network limit test failed. See \
916                https://docs.anza.xyz/operations/guides/validator-start#system-tuning"
917                .to_string())?;
918        }
919    }
920
921    let mut ledger_lock = ledger_lockfile(&ledger_path);
922    let _ledger_write_guard = lock_ledger(&ledger_path, &mut ledger_lock);
923
924    let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
925    let admin_service_post_init = Arc::new(RwLock::new(None));
926    let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
927        if starting_with_geyser_plugins {
928            let (sender, receiver) = unbounded();
929            (Some(sender), Some(receiver))
930        } else {
931            (None, None)
932        };
933    admin_rpc_service::run(
934        &ledger_path,
935        admin_rpc_service::AdminRpcRequestMetadata {
936            rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
937            start_time: std::time::SystemTime::now(),
938            validator_exit: validator_config.validator_exit.clone(),
939            validator_exit_backpressure: validator_config.validator_exit_backpressure.clone(),
940            start_progress: start_progress.clone(),
941            authorized_voter_keypairs: authorized_voter_keypairs.clone(),
942            post_init: admin_service_post_init.clone(),
943            tower_storage: validator_config.tower_storage.clone(),
944            staked_nodes_overrides,
945            rpc_to_plugin_manager_sender,
946        },
947    );
948
949    let tpu_max_connections_per_peer: Option<u64> = matches
950        .value_of("tpu_max_connections_per_peer")
951        .and_then(|v| v.parse().ok());
952    let tpu_max_connections_per_unstaked_peer = tpu_max_connections_per_peer
953        .unwrap_or_else(|| value_t_or_exit!(matches, "tpu_max_connections_per_unstaked_peer", u64));
954    let tpu_max_connections_per_staked_peer = tpu_max_connections_per_peer
955        .unwrap_or_else(|| value_t_or_exit!(matches, "tpu_max_connections_per_staked_peer", u64));
956    let tpu_max_staked_connections = value_t_or_exit!(matches, "tpu_max_staked_connections", u64);
957    let tpu_max_unstaked_connections =
958        value_t_or_exit!(matches, "tpu_max_unstaked_connections", u64);
959
960    let tpu_max_fwd_staked_connections =
961        value_t_or_exit!(matches, "tpu_max_fwd_staked_connections", u64);
962    let tpu_max_fwd_unstaked_connections =
963        value_t_or_exit!(matches, "tpu_max_fwd_unstaked_connections", u64);
964
965    let tpu_max_connections_per_ipaddr_per_minute: u64 =
966        value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64);
967    let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64);
968
969    let cluster_entrypoints = entrypoint_addrs
970        .iter()
971        .map(ContactInfo::new_gossip_entry_point)
972        .collect::<Vec<_>>();
973
974    if restricted_repair_only_mode {
975        // When in --restricted_repair_only_mode is enabled only the gossip and repair ports
976        // need to be reachable by the entrypoint to respond to gossip pull requests and repair
977        // requests initiated by the node.  All other ports are unused.
978        node.info.remove_tpu();
979        node.info.remove_tpu_forwards();
980        node.info.remove_tvu();
981        node.info.remove_serve_repair();
982        node.info.remove_alpenglow();
983
984        // A node in this configuration shouldn't be an entrypoint to other nodes
985        node.sockets.ip_echo = None;
986    }
987
988    if !private_rpc {
989        macro_rules! set_socket {
990            ($method:ident, $addr:expr, $name:literal) => {
991                node.info.$method($addr).expect(&format!(
992                    "Operator must spin up node with valid {} address",
993                    $name
994                ))
995            };
996        }
997        if let Some(public_rpc_addr) = public_rpc_addr {
998            set_socket!(set_rpc, public_rpc_addr, "RPC");
999            set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub");
1000        } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
1001            let addr = node
1002                .info
1003                .gossip()
1004                .expect("Operator must spin up node with valid gossip address")
1005                .ip();
1006            set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC");
1007            set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub");
1008        }
1009    }
1010
1011    snapshot_utils::remove_tmp_snapshot_archives(
1012        &validator_config.snapshot_config.full_snapshot_archives_dir,
1013    );
1014    snapshot_utils::remove_tmp_snapshot_archives(
1015        &validator_config
1016            .snapshot_config
1017            .incremental_snapshot_archives_dir,
1018    );
1019
1020    let should_check_duplicate_instance = true;
1021    if !cluster_entrypoints.is_empty() {
1022        bootstrap::rpc_bootstrap(
1023            &node,
1024            &identity_keypair,
1025            &ledger_path,
1026            &vote_account,
1027            authorized_voter_keypairs.clone(),
1028            &cluster_entrypoints,
1029            &mut validator_config,
1030            run_args.rpc_bootstrap_config,
1031            do_port_check,
1032            use_progress_bar,
1033            maximum_local_snapshot_age,
1034            should_check_duplicate_instance,
1035            &start_progress,
1036            minimal_snapshot_download_speed,
1037            maximum_snapshot_download_abort,
1038            run_args.socket_addr_space,
1039        );
1040        *start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
1041    }
1042
1043    if operation == Operation::Initialize {
1044        info!("Validator ledger initialization complete");
1045        return Ok(());
1046    }
1047
1048    // Bootstrap code above pushes a contact-info with more recent timestamp to
1049    // gossip. If the node is staked the contact-info lingers in gossip causing
1050    // false duplicate nodes error.
1051    // Below line refreshes the timestamp on contact-info so that it overrides
1052    // the one pushed by bootstrap.
1053    node.info.hot_swap_pubkey(identity_keypair.pubkey());
1054
1055    let tpu_quic_server_config = SwQosQuicStreamerConfig {
1056        quic_streamer_config: QuicStreamerConfig {
1057            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1058            num_threads: tpu_transaction_receive_threads,
1059            ..Default::default()
1060        },
1061        qos_config: SwQosConfig {
1062            max_connections_per_unstaked_peer: tpu_max_connections_per_unstaked_peer
1063                .try_into()
1064                .unwrap(),
1065            max_connections_per_staked_peer: tpu_max_connections_per_staked_peer
1066                .try_into()
1067                .unwrap(),
1068            max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
1069            max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
1070            max_streams_per_ms,
1071        },
1072    };
1073
1074    let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
1075        quic_streamer_config: QuicStreamerConfig {
1076            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1077            num_threads: tpu_transaction_forward_receive_threads,
1078            ..Default::default()
1079        },
1080        qos_config: SwQosConfig {
1081            max_connections_per_staked_peer: tpu_max_connections_per_staked_peer
1082                .try_into()
1083                .unwrap(),
1084            max_connections_per_unstaked_peer: tpu_max_connections_per_unstaked_peer
1085                .try_into()
1086                .unwrap(),
1087            max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
1088            max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
1089            max_streams_per_ms,
1090        },
1091    };
1092
1093    let vote_quic_server_config = SimpleQosQuicStreamerConfig {
1094        quic_streamer_config: QuicStreamerConfig {
1095            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1096            num_threads: tpu_vote_transaction_receive_threads,
1097            ..Default::default()
1098        },
1099        qos_config: SimpleQosConfig {
1100            max_streams_per_second: MAX_VOTES_PER_SECOND,
1101            ..Default::default()
1102        },
1103    };
1104
1105    let validator = Validator::new_with_exit(
1106        node,
1107        identity_keypair,
1108        &ledger_path,
1109        &vote_account,
1110        authorized_voter_keypairs,
1111        cluster_entrypoints,
1112        &validator_config,
1113        should_check_duplicate_instance,
1114        rpc_to_plugin_manager_receiver,
1115        start_progress,
1116        run_args.socket_addr_space,
1117        ValidatorTpuConfig {
1118            vote_use_quic,
1119            tpu_connection_pool_size,
1120            tpu_quic_server_config,
1121            tpu_fwd_quic_server_config,
1122            vote_quic_server_config,
1123            sigverify_threads: tpu_sigverify_threads,
1124        },
1125        admin_service_post_init,
1126        maybe_xdp_retransmit_builder,
1127        exit,
1128    )
1129    .map_err(|err| format!("{err:?}"))?;
1130
1131    if let Some(filename) = init_complete_file {
1132        File::create(filename).map_err(|err| format!("unable to create {filename}: {err}"))?;
1133    }
1134    info!("Validator initialized");
1135    validator.listen_for_signals()?;
1136    validator.join();
1137    info!("Validator exiting..");
1138
1139    Ok(())
1140}
1141
1142// This function is duplicated in ledger-tool/src/main.rs...
1143fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
1144    if matches.is_present(name) {
1145        Some(values_t_or_exit!(matches, name, Slot))
1146    } else {
1147        None
1148    }
1149}
1150
1151fn validators_set(
1152    identity_pubkey: &Pubkey,
1153    matches: &ArgMatches<'_>,
1154    matches_name: &str,
1155    arg_name: &str,
1156) -> Result<Option<HashSet<Pubkey>>, String> {
1157    if matches.is_present(matches_name) {
1158        let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
1159            .into_iter()
1160            .collect();
1161        if validators_set.contains(identity_pubkey) {
1162            Err(format!(
1163                "the validator's identity pubkey cannot be a {arg_name}: {identity_pubkey}"
1164            ))?;
1165        }
1166        Ok(Some(validators_set))
1167    } else {
1168        Ok(None)
1169    }
1170}
1171
1172fn get_cluster_shred_version(entrypoints: &[SocketAddr], bind_address: IpAddr) -> Option<u16> {
1173    let entrypoints = {
1174        let mut index: Vec<_> = (0..entrypoints.len()).collect();
1175        index.shuffle(&mut rand::rng());
1176        index.into_iter().map(|i| &entrypoints[i])
1177    };
1178    for entrypoint in entrypoints {
1179        match solana_net_utils::get_cluster_shred_version_with_binding(entrypoint, bind_address) {
1180            Err(err) => eprintln!("get_cluster_shred_version failed: {entrypoint}, {err}"),
1181            Ok(0) => eprintln!("entrypoint {entrypoint} returned shred-version zero"),
1182            Ok(shred_version) => {
1183                info!("obtained shred-version {shred_version} from {entrypoint}");
1184                return Some(shred_version);
1185            }
1186        }
1187    }
1188    None
1189}
1190
1191fn parse_banking_trace_dir_byte_limit(matches: &ArgMatches) -> u64 {
1192    if matches.is_present("disable_banking_trace") {
1193        // disable with an explicit flag; This effectively becomes `opt-out` by resetting to
1194        // DISABLED_BAKING_TRACE_DIR, while allowing us to specify a default sensible limit in clap
1195        // configuration for cli help.
1196        DISABLED_BAKING_TRACE_DIR
1197    } else {
1198        // a default value in clap configuration (BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT) or
1199        // explicit user-supplied override value
1200        value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64)
1201    }
1202}
1203
1204fn new_snapshot_config(
1205    matches: &ArgMatches,
1206    ledger_path: &Path,
1207    account_paths: &[PathBuf],
1208    incremental_snapshot_fetch: bool,
1209) -> Result<SnapshotConfig, Box<dyn std::error::Error>> {
1210    let (full_snapshot_archive_interval, incremental_snapshot_archive_interval) =
1211        if matches.is_present("no_snapshots") {
1212            // snapshots are disabled
1213            (SnapshotInterval::Disabled, SnapshotInterval::Disabled)
1214        } else {
1215            match (
1216                incremental_snapshot_fetch,
1217                value_t_or_exit!(matches, "snapshot_interval_slots", NonZeroU64),
1218            ) {
1219                (true, incremental_snapshot_interval_slots) => {
1220                    // incremental snapshots are enabled
1221                    // use --snapshot-interval-slots for the incremental snapshot interval
1222                    let full_snapshot_interval_slots =
1223                        value_t_or_exit!(matches, "full_snapshot_interval_slots", NonZeroU64);
1224                    (
1225                        SnapshotInterval::Slots(full_snapshot_interval_slots),
1226                        SnapshotInterval::Slots(incremental_snapshot_interval_slots),
1227                    )
1228                }
1229                (false, full_snapshot_interval_slots) => {
1230                    // incremental snapshots are *disabled*
1231                    // use --snapshot-interval-slots for the *full* snapshot interval
1232                    // also warn if --full-snapshot-interval-slots was specified
1233                    if matches.occurrences_of("full_snapshot_interval_slots") > 0 {
1234                        warn!(
1235                            "Incremental snapshots are disabled, yet \
1236                             --full-snapshot-interval-slots was specified! Note that \
1237                             --full-snapshot-interval-slots is *ignored* when incremental \
1238                             snapshots are disabled. Use --snapshot-interval-slots instead.",
1239                        );
1240                    }
1241                    (
1242                        SnapshotInterval::Slots(full_snapshot_interval_slots),
1243                        SnapshotInterval::Disabled,
1244                    )
1245                }
1246            }
1247        };
1248
1249    info!(
1250        "Snapshot configuration: full snapshot interval: {}, incremental snapshot interval: {}",
1251        match full_snapshot_archive_interval {
1252            SnapshotInterval::Disabled => "disabled".to_string(),
1253            SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1254        },
1255        match incremental_snapshot_archive_interval {
1256            SnapshotInterval::Disabled => "disabled".to_string(),
1257            SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1258        },
1259    );
1260    // It is unlikely that a full snapshot interval greater than an epoch is a good idea.
1261    // Minimally we should warn the user in case this was a mistake.
1262    if let SnapshotInterval::Slots(full_snapshot_interval_slots) = full_snapshot_archive_interval {
1263        let full_snapshot_interval_slots = full_snapshot_interval_slots.get();
1264        if full_snapshot_interval_slots > DEFAULT_SLOTS_PER_EPOCH {
1265            warn!(
1266                "The full snapshot interval is excessively large: {full_snapshot_interval_slots}! \
1267                 This will negatively impact the background cleanup tasks in accounts-db. \
1268                 Consider a smaller value.",
1269            );
1270        }
1271    }
1272
1273    let snapshots_dir = matches
1274        .value_of("snapshots")
1275        .map(Path::new)
1276        .unwrap_or(ledger_path);
1277    let snapshots_dir = create_and_canonicalize_directory(snapshots_dir).map_err(|err| {
1278        format!(
1279            "failed to create snapshots directory '{}': {err}",
1280            snapshots_dir.display(),
1281        )
1282    })?;
1283    if account_paths
1284        .iter()
1285        .any(|account_path| account_path == &snapshots_dir)
1286    {
1287        Err(
1288            "the --accounts and --snapshots paths must be unique since they both create \
1289             'snapshots' subdirectories, otherwise there may be collisions"
1290                .to_string(),
1291        )?;
1292    }
1293
1294    let bank_snapshots_dir = snapshots_dir.join(BANK_SNAPSHOTS_DIR);
1295    fs::create_dir_all(&bank_snapshots_dir).map_err(|err| {
1296        format!(
1297            "failed to create bank snapshots directory '{}': {err}",
1298            bank_snapshots_dir.display(),
1299        )
1300    })?;
1301
1302    let full_snapshot_archives_dir = matches
1303        .value_of("full_snapshot_archive_path")
1304        .map(PathBuf::from)
1305        .unwrap_or_else(|| snapshots_dir.clone());
1306    fs::create_dir_all(&full_snapshot_archives_dir).map_err(|err| {
1307        format!(
1308            "failed to create full snapshot archives directory '{}': {err}",
1309            full_snapshot_archives_dir.display(),
1310        )
1311    })?;
1312
1313    let incremental_snapshot_archives_dir = matches
1314        .value_of("incremental_snapshot_archive_path")
1315        .map(PathBuf::from)
1316        .unwrap_or_else(|| snapshots_dir.clone());
1317    fs::create_dir_all(&incremental_snapshot_archives_dir).map_err(|err| {
1318        format!(
1319            "failed to create incremental snapshot archives directory '{}': {err}",
1320            incremental_snapshot_archives_dir.display(),
1321        )
1322    })?;
1323
1324    let archive_format = {
1325        let archive_format_str = value_t_or_exit!(matches, "snapshot_archive_format", String);
1326        let mut archive_format = ArchiveFormat::from_cli_arg(&archive_format_str)
1327            .unwrap_or_else(|| panic!("Archive format not recognized: {archive_format_str}"));
1328        if let ArchiveFormat::TarZstd { config } = &mut archive_format {
1329            config.compression_level =
1330                value_t_or_exit!(matches, "snapshot_zstd_compression_level", i32);
1331        }
1332        archive_format
1333    };
1334
1335    let snapshot_version = matches
1336        .value_of("snapshot_version")
1337        .map(|value| {
1338            value
1339                .parse::<SnapshotVersion>()
1340                .map_err(|err| format!("unable to parse snapshot version: {err}"))
1341        })
1342        .transpose()?
1343        .unwrap_or(SnapshotVersion::default());
1344
1345    let maximum_full_snapshot_archives_to_retain =
1346        value_t_or_exit!(matches, "maximum_full_snapshots_to_retain", NonZeroUsize);
1347    let maximum_incremental_snapshot_archives_to_retain = value_t_or_exit!(
1348        matches,
1349        "maximum_incremental_snapshots_to_retain",
1350        NonZeroUsize
1351    );
1352
1353    let snapshot_config = SnapshotConfig {
1354        usage: if full_snapshot_archive_interval == SnapshotInterval::Disabled {
1355            SnapshotUsage::LoadOnly
1356        } else {
1357            SnapshotUsage::LoadAndGenerate
1358        },
1359        full_snapshot_archive_interval,
1360        incremental_snapshot_archive_interval,
1361        bank_snapshots_dir,
1362        full_snapshot_archives_dir,
1363        incremental_snapshot_archives_dir,
1364        archive_format,
1365        snapshot_version,
1366        maximum_full_snapshot_archives_to_retain,
1367        maximum_incremental_snapshot_archives_to_retain,
1368    };
1369
1370    if !is_snapshot_config_valid(&snapshot_config) {
1371        Err(
1372            "invalid snapshot configuration provided: snapshot intervals are incompatible. full \
1373             snapshot interval MUST be larger than incremental snapshot interval (if enabled)"
1374                .to_string(),
1375        )?;
1376    }
1377
1378    Ok(snapshot_config)
1379}