agave_validator/commands/run/
execute.rs

1use {
2    crate::{
3        admin_rpc_service::{self, load_staked_nodes_overrides, StakedNodesOverrides},
4        bootstrap,
5        cli::{self},
6        commands::{run::args::RunArgs, FromClapArgMatches},
7        ledger_lockfile, lock_ledger,
8    },
9    agave_logger::redirect_stderr_to_file,
10    agave_snapshots::{
11        paths::BANK_SNAPSHOTS_DIR,
12        snapshot_config::{SnapshotConfig, SnapshotUsage},
13        ArchiveFormat, SnapshotInterval, SnapshotVersion,
14    },
15    clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
16    crossbeam_channel::unbounded,
17    log::*,
18    rand::{seq::SliceRandom, thread_rng},
19    solana_accounts_db::{
20        accounts_db::{AccountShrinkThreshold, AccountsDbConfig, MarkObsoleteAccounts},
21        accounts_file::StorageAccess,
22        accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig, IndexLimitMb, ScanFilter},
23        utils::{
24            create_all_accounts_run_and_snapshot_dirs, create_and_canonicalize_directories,
25            create_and_canonicalize_directory,
26        },
27    },
28    solana_clap_utils::input_parsers::{
29        keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of,
30    },
31    solana_clock::{Slot, DEFAULT_SLOTS_PER_EPOCH},
32    solana_core::{
33        banking_stage::transaction_scheduler::scheduler_controller::SchedulerConfig,
34        banking_trace::DISABLED_BAKING_TRACE_DIR,
35        consensus::tower_storage,
36        repair::repair_handler::RepairHandlerType,
37        snapshot_packager_service::SnapshotPackagerService,
38        system_monitor_service::SystemMonitorService,
39        tpu::MAX_VOTES_PER_SECOND,
40        validator::{
41            is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod,
42            SchedulerPacing, Validator, ValidatorConfig, ValidatorError, ValidatorStartProgress,
43            ValidatorTpuConfig,
44        },
45    },
46    solana_genesis_utils::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
47    solana_gossip::{
48        cluster_info::{NodeConfig, DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS},
49        contact_info::ContactInfo,
50        node::Node,
51    },
52    solana_hash::Hash,
53    solana_keypair::Keypair,
54    solana_ledger::{
55        blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
56        use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
57    },
58    solana_net_utils::multihomed_sockets::BindIpAddrs,
59    solana_perf::recycler::enable_recycler_warming,
60    solana_poh::poh_service,
61    solana_pubkey::Pubkey,
62    solana_runtime::{runtime_config::RuntimeConfig, snapshot_utils},
63    solana_signer::Signer,
64    solana_streamer::{
65        nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
66        quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
67    },
68    solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
69    solana_turbine::{
70        broadcast_stage::BroadcastStageType,
71        xdp::{set_cpu_affinity, XdpConfig},
72    },
73    solana_validator_exit::Exit,
74    std::{
75        collections::HashSet,
76        fs::{self, File},
77        net::{IpAddr, Ipv4Addr, SocketAddr},
78        num::{NonZeroU64, NonZeroUsize},
79        path::{Path, PathBuf},
80        process::exit,
81        str::FromStr,
82        sync::{atomic::AtomicBool, Arc, RwLock},
83    },
84};
85
86#[derive(Debug, PartialEq, Eq)]
87pub enum Operation {
88    Initialize,
89    Run,
90}
91
92pub fn execute(
93    matches: &ArgMatches,
94    solana_version: &str,
95    operation: Operation,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let run_args = RunArgs::from_clap_arg_match(matches)?;
98
99    let cli::thread_args::NumThreadConfig {
100        accounts_db_background_threads,
101        accounts_db_foreground_threads,
102        accounts_index_flush_threads,
103        block_production_num_workers,
104        ip_echo_server_threads,
105        rayon_global_threads,
106        replay_forks_threads,
107        replay_transactions_threads,
108        tpu_transaction_forward_receive_threads,
109        tpu_transaction_receive_threads,
110        tpu_vote_transaction_receive_threads,
111        tvu_receive_threads,
112        tvu_retransmit_threads,
113        tvu_sigverify_threads,
114    } = cli::thread_args::parse_num_threads_args(matches);
115
116    let identity_keypair = Arc::new(run_args.identity_keypair);
117
118    let logfile = run_args.logfile;
119    if let Some(logfile) = logfile.as_ref() {
120        println!("log file: {}", logfile.display());
121    }
122    let use_progress_bar = logfile.is_none();
123    let _logger_thread = redirect_stderr_to_file(logfile);
124
125    info!("{} {}", crate_name!(), solana_version);
126    info!("Starting validator with: {:#?}", std::env::args_os());
127
128    let cuda = matches.is_present("cuda");
129    if cuda {
130        solana_perf::perf_libs::init_cuda();
131        enable_recycler_warming();
132    }
133
134    solana_core::validator::report_target_features();
135
136    let authorized_voter_keypairs = keypairs_of(matches, "authorized_voter_keypairs")
137        .map(|keypairs| keypairs.into_iter().map(Arc::new).collect())
138        .unwrap_or_else(|| vec![Arc::new(keypair_of(matches, "identity").expect("identity"))]);
139    let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
140
141    let staked_nodes_overrides_path = matches
142        .value_of("staked_nodes_overrides")
143        .map(str::to_string);
144    let staked_nodes_overrides = Arc::new(RwLock::new(
145        match &staked_nodes_overrides_path {
146            None => StakedNodesOverrides::default(),
147            Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| {
148                error!("Failed to load stake-nodes-overrides from {p}: {err}");
149                clap::Error::with_description(
150                    "Failed to load configuration of stake-nodes-overrides argument",
151                    clap::ErrorKind::InvalidValue,
152                )
153                .exit()
154            }),
155        }
156        .staked_map_id,
157    ));
158
159    let init_complete_file = matches.value_of("init_complete_file");
160
161    let private_rpc = matches.is_present("private_rpc");
162    let do_port_check = !matches.is_present("no_port_check");
163
164    let ledger_path = run_args.ledger_path;
165
166    let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
167        let limit_ledger_size = match matches.value_of("limit_ledger_size") {
168            Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
169            None => DEFAULT_MAX_LEDGER_SHREDS,
170        };
171        if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
172            Err(format!(
173                "The provided --limit-ledger-size value was too small, the minimum value is \
174                 {DEFAULT_MIN_MAX_LEDGER_SHREDS}"
175            ))?;
176        }
177        Some(limit_ledger_size)
178    } else {
179        None
180    };
181
182    let debug_keys: Option<Arc<HashSet<_>>> = if matches.is_present("debug_key") {
183        Some(Arc::new(
184            values_t_or_exit!(matches, "debug_key", Pubkey)
185                .into_iter()
186                .collect(),
187        ))
188    } else {
189        None
190    };
191
192    let repair_validators = validators_set(
193        &identity_keypair.pubkey(),
194        matches,
195        "repair_validators",
196        "--repair-validator",
197    )?;
198    let repair_whitelist = validators_set(
199        &identity_keypair.pubkey(),
200        matches,
201        "repair_whitelist",
202        "--repair-whitelist",
203    )?;
204    let repair_whitelist = Arc::new(RwLock::new(repair_whitelist.unwrap_or_default()));
205    let gossip_validators = validators_set(
206        &identity_keypair.pubkey(),
207        matches,
208        "gossip_validators",
209        "--gossip-validator",
210    )?;
211
212    let bind_addresses = {
213        let parsed = matches
214            .values_of("bind_address")
215            .expect("bind_address should always be present due to default")
216            .map(solana_net_utils::parse_host)
217            .collect::<Result<Vec<_>, _>>()?;
218        BindIpAddrs::new(parsed).map_err(|err| format!("invalid bind_addresses: {err}"))?
219    };
220
221    if bind_addresses.len() > 1 {
222        for (flag, msg) in [
223            (
224                "use_connection_cache",
225                "Connection cache can not be used in a multihoming context",
226            ),
227            (
228                "advertised_ip",
229                "--advertised-ip cannot be used in a multihoming context. In multihoming, the \
230                 validator will advertise the first --bind-address as this node's public IP \
231                 address.",
232            ),
233            (
234                "tpu_vortexor_receiver_address",
235                "--tpu-vortexor-receiver-address can not be used in a multihoming context",
236            ),
237            (
238                "public_tpu_addr",
239                "--public-tpu-address can not be used in a multihoming context",
240            ),
241        ] {
242            if matches.is_present(flag) {
243                Err(String::from(msg))?;
244            }
245        }
246    }
247
248    let rpc_bind_address = if matches.is_present("rpc_bind_address") {
249        solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap())
250            .expect("invalid rpc_bind_address")
251    } else if private_rpc {
252        solana_net_utils::parse_host("127.0.0.1").unwrap()
253    } else {
254        bind_addresses.active()
255    };
256
257    let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
258
259    let account_indexes = AccountSecondaryIndexes::from_clap_arg_match(matches)?;
260
261    let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
262    let accounts_shrink_optimize_total_space =
263        value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
264    let tpu_use_quic = !matches.is_present("tpu_disable_quic");
265    if !tpu_use_quic {
266        warn!(
267            "TPU QUIC was disabled via --tpu_disable_quic, this will prevent validator from \
268             receiving transactions!"
269        );
270    }
271    let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool);
272
273    let tpu_enable_udp = if matches.is_present("tpu_enable_udp") {
274        warn!("Submission of TPU transactions via UDP is deprecated.");
275        true
276    } else {
277        DEFAULT_TPU_ENABLE_UDP
278    };
279
280    let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);
281
282    let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
283    if !(0.0..=1.0).contains(&shrink_ratio) {
284        Err(format!(
285            "the specified account-shrink-ratio is invalid, it must be between 0. and 1.0 \
286             inclusive: {shrink_ratio}"
287        ))?;
288    }
289
290    let shrink_ratio = if accounts_shrink_optimize_total_space {
291        AccountShrinkThreshold::TotalSpace { shrink_ratio }
292    } else {
293        AccountShrinkThreshold::IndividualStore { shrink_ratio }
294    };
295    let entrypoint_addrs = run_args.entrypoints;
296    for addr in &entrypoint_addrs {
297        if !run_args.socket_addr_space.check(addr) {
298            Err(format!("invalid entrypoint address: {addr}"))?;
299        }
300    }
301    // TODO: Once entrypoints are updated to return shred-version, this should
302    // abort if it fails to obtain a shred-version, so that nodes always join
303    // gossip with a valid shred-version. The code to adopt entrypoint shred
304    // version can then be deleted from gossip and get_rpc_node above.
305    let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
306        .ok()
307        .or_else(|| get_cluster_shred_version(&entrypoint_addrs, bind_addresses.active()));
308
309    let tower_path = value_t!(matches, "tower", PathBuf)
310        .ok()
311        .unwrap_or_else(|| ledger_path.clone());
312    let tower_storage: Arc<dyn tower_storage::TowerStorage> =
313        Arc::new(tower_storage::FileTowerStorage::new(tower_path));
314
315    let mut accounts_index_config = AccountsIndexConfig {
316        num_flush_threads: Some(accounts_index_flush_threads),
317        ..AccountsIndexConfig::default()
318    };
319    if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
320        accounts_index_config.bins = Some(bins);
321    }
322    if let Ok(num_initial_accounts) =
323        value_t!(matches, "accounts_index_initial_accounts_count", usize)
324    {
325        accounts_index_config.num_initial_accounts = Some(num_initial_accounts);
326    }
327
328    accounts_index_config.index_limit_mb = if !matches.is_present("enable_accounts_disk_index") {
329        IndexLimitMb::InMemOnly
330    } else {
331        IndexLimitMb::Minimal
332    };
333
334    {
335        let mut accounts_index_paths: Vec<PathBuf> = if matches.is_present("accounts_index_path") {
336            values_t_or_exit!(matches, "accounts_index_path", String)
337                .into_iter()
338                .map(PathBuf::from)
339                .collect()
340        } else {
341            vec![]
342        };
343        if accounts_index_paths.is_empty() {
344            accounts_index_paths = vec![ledger_path.join("accounts_index")];
345        }
346        accounts_index_config.drives = Some(accounts_index_paths);
347    }
348
349    const MB: usize = 1_024 * 1_024;
350    accounts_index_config.scan_results_limit_bytes =
351        value_t!(matches, "accounts_index_scan_results_limit_mb", usize)
352            .ok()
353            .map(|mb| mb * MB);
354
355    let account_shrink_paths: Option<Vec<PathBuf>> =
356        values_t!(matches, "account_shrink_path", String)
357            .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
358            .ok();
359    let account_shrink_paths = account_shrink_paths
360        .as_ref()
361        .map(|paths| {
362            create_and_canonicalize_directories(paths)
363                .map_err(|err| format!("unable to access account shrink path: {err}"))
364        })
365        .transpose()?;
366
367    let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
368        .map(|paths| {
369            create_all_accounts_run_and_snapshot_dirs(&paths)
370                .map_err(|err| format!("unable to create account subdirectories: {err}"))
371        })
372        .transpose()?
373        .unzip();
374
375    let read_cache_limit_bytes =
376        values_of::<usize>(matches, "accounts_db_read_cache_limit").map(|limits| {
377            match limits.len() {
378                2 => (limits[0], limits[1]),
379                _ => {
380                    // clap will enforce two values are given
381                    unreachable!("invalid number of values given to accounts-db-read-cache-limit")
382                }
383            }
384        });
385    // accounts-db-read-cache-limit-mb was deprecated in v3.0.0
386    let read_cache_limit_mb =
387        values_of::<usize>(matches, "accounts_db_read_cache_limit_mb").map(|limits| {
388            match limits.len() {
389                // we were given explicit low and high watermark values, so use them
390                2 => (limits[0] * MB, limits[1] * MB),
391                // we were given a single value, so use it for both low and high watermarks
392                1 => (limits[0] * MB, limits[0] * MB),
393                _ => {
394                    // clap will enforce either one or two values is given
395                    unreachable!(
396                        "invalid number of values given to accounts-db-read-cache-limit-mb"
397                    )
398                }
399            }
400        });
401    // clap will enforce only one cli arg is provided, so pick whichever is Some
402    let read_cache_limit_bytes = read_cache_limit_bytes.or(read_cache_limit_mb);
403
404    let storage_access = matches
405        .value_of("accounts_db_access_storages_method")
406        .map(|method| match method {
407            "mmap" => StorageAccess::Mmap,
408            "file" => StorageAccess::File,
409            _ => {
410                // clap will enforce one of the above values is given
411                unreachable!("invalid value given to accounts-db-access-storages-method")
412            }
413        })
414        .unwrap_or_default();
415
416    let scan_filter_for_shrinking = matches
417        .value_of("accounts_db_scan_filter_for_shrinking")
418        .map(|filter| match filter {
419            "all" => ScanFilter::All,
420            "only-abnormal" => ScanFilter::OnlyAbnormal,
421            "only-abnormal-with-verify" => ScanFilter::OnlyAbnormalWithVerify,
422            _ => {
423                // clap will enforce one of the above values is given
424                unreachable!("invalid value given to accounts_db_scan_filter_for_shrinking")
425            }
426        })
427        .unwrap_or_default();
428
429    let mark_obsolete_accounts = if matches.is_present("accounts_db_mark_obsolete_accounts") {
430        MarkObsoleteAccounts::Enabled
431    } else {
432        MarkObsoleteAccounts::Disabled
433    };
434
435    let accounts_db_config = AccountsDbConfig {
436        index: Some(accounts_index_config),
437        account_indexes: Some(account_indexes.clone()),
438        base_working_path: Some(ledger_path.clone()),
439        shrink_paths: account_shrink_run_paths,
440        shrink_ratio,
441        read_cache_limit_bytes,
442        write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
443            .ok()
444            .map(|mb| mb * MB as u64),
445        ancient_append_vec_offset: value_t!(matches, "accounts_db_ancient_append_vecs", i64).ok(),
446        ancient_storage_ideal_size: value_t!(
447            matches,
448            "accounts_db_ancient_storage_ideal_size",
449            u64
450        )
451        .ok(),
452        max_ancient_storages: value_t!(matches, "accounts_db_max_ancient_storages", usize).ok(),
453        exhaustively_verify_refcounts: matches.is_present("accounts_db_verify_refcounts"),
454        storage_access,
455        scan_filter_for_shrinking,
456        num_background_threads: Some(accounts_db_background_threads),
457        num_foreground_threads: Some(accounts_db_foreground_threads),
458        mark_obsolete_accounts,
459        memlock_budget_size: solana_accounts_db::accounts_db::DEFAULT_MEMLOCK_BUDGET_SIZE,
460        ..AccountsDbConfig::default()
461    };
462
463    let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
464        Some(
465            values_t_or_exit!(matches, "geyser_plugin_config", String)
466                .into_iter()
467                .map(PathBuf::from)
468                .collect(),
469        )
470    } else {
471        None
472    };
473    let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
474        || matches.is_present("geyser_plugin_always_enabled");
475
476    let xdp_interface = matches.value_of("retransmit_xdp_interface");
477    let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
478    let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
479        XdpConfig::new(
480            xdp_interface,
481            parse_cpu_ranges(cpus).unwrap(),
482            xdp_zero_copy,
483        )
484    });
485
486    let account_paths: Vec<PathBuf> =
487        if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
488            account_paths
489                .join(",")
490                .split(',')
491                .map(PathBuf::from)
492                .collect()
493        } else {
494            vec![ledger_path.join("accounts")]
495        };
496    let account_paths = create_and_canonicalize_directories(account_paths)
497        .map_err(|err| format!("unable to access account path: {err}"))?;
498
499    // From now on, use run/ paths in the same way as the previous account_paths.
500    let (account_run_paths, account_snapshot_paths) =
501        create_all_accounts_run_and_snapshot_dirs(&account_paths)
502            .map_err(|err| format!("unable to create account directories: {err}"))?;
503
504    // These snapshot paths are only used for initial clean up, add in shrink paths if they exist.
505    let account_snapshot_paths =
506        if let Some(account_shrink_snapshot_paths) = account_shrink_snapshot_paths {
507            account_snapshot_paths
508                .into_iter()
509                .chain(account_shrink_snapshot_paths)
510                .collect()
511        } else {
512            account_snapshot_paths
513        };
514
515    let snapshot_config = new_snapshot_config(
516        matches,
517        &ledger_path,
518        &account_paths,
519        run_args.rpc_bootstrap_config.incremental_snapshot_fetch,
520    )?;
521
522    let use_snapshot_archives_at_startup = value_t_or_exit!(
523        matches,
524        use_snapshot_archives_at_startup::cli::NAME,
525        UseSnapshotArchivesAtStartup
526    );
527
528    let mut validator_config = ValidatorConfig {
529        require_tower: matches.is_present("require_tower"),
530        tower_storage,
531        halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
532        max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
533        expected_genesis_hash: matches
534            .value_of("expected_genesis_hash")
535            .map(|s| Hash::from_str(s).unwrap()),
536        fixed_leader_schedule: None,
537        expected_bank_hash: matches
538            .value_of("expected_bank_hash")
539            .map(|s| Hash::from_str(s).unwrap()),
540        expected_shred_version,
541        new_hard_forks: hardforks_of(matches, "hard_forks"),
542        rpc_config: run_args.json_rpc_config,
543        on_start_geyser_plugin_config_files,
544        geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
545        rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
546            (
547                SocketAddr::new(rpc_bind_address, rpc_port),
548                SocketAddr::new(rpc_bind_address, rpc_port + 1),
549                // If additional ports are added, +2 needs to be skipped to avoid a conflict with
550                // the websocket port (which is +2) in web3.js This odd port shifting is tracked at
551                // https://github.com/solana-labs/solana/issues/12250
552            )
553        }),
554        pubsub_config: run_args.pub_sub_config,
555        voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
556        wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
557        known_validators: run_args.known_validators,
558        repair_validators,
559        repair_whitelist,
560        repair_handler_type: RepairHandlerType::default(),
561        gossip_validators,
562        max_ledger_shreds,
563        blockstore_options: run_args.blockstore_options,
564        run_verification: !matches.is_present("skip_startup_ledger_verification"),
565        debug_keys,
566        warp_slot: None,
567        generator_config: None,
568        contact_debug_interval,
569        contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
570        send_transaction_service_config: run_args.send_transaction_service_config,
571        no_poh_speed_test: matches.is_present("no_poh_speed_test"),
572        no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
573        no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
574        no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
575        no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
576        // The validator needs to open many files, check that the process has
577        // permission to do so in order to fail quickly and give a direct error
578        enforce_ulimit_nofile: true,
579        poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core")
580            .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
581        poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch")
582            .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
583        process_ledger_before_services: matches.is_present("process_ledger_before_services"),
584        account_paths: account_run_paths,
585        account_snapshot_paths,
586        accounts_db_config,
587        accounts_db_skip_shrink: true,
588        accounts_db_force_initial_clean: matches.is_present("no_skip_initial_accounts_db_clean"),
589        snapshot_config,
590        no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
591        wait_to_vote_slot: None,
592        runtime_config: RuntimeConfig {
593            log_messages_bytes_limit: value_of(matches, "log_messages_bytes_limit"),
594            ..RuntimeConfig::default()
595        },
596        staked_nodes_overrides: staked_nodes_overrides.clone(),
597        use_snapshot_archives_at_startup,
598        ip_echo_server_threads,
599        rayon_global_threads,
600        replay_forks_threads,
601        replay_transactions_threads,
602        tvu_shred_sigverify_threads: tvu_sigverify_threads,
603        delay_leader_block_for_pending_fork: matches
604            .is_present("delay_leader_block_for_pending_fork"),
605        wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),
606        wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(),
607        turbine_disabled: Arc::<AtomicBool>::default(),
608        retransmit_xdp,
609        broadcast_stage_type: BroadcastStageType::Standard,
610        use_tpu_client_next: !matches.is_present("use_connection_cache"),
611        block_verification_method: value_t_or_exit!(
612            matches,
613            "block_verification_method",
614            BlockVerificationMethod
615        ),
616        unified_scheduler_handler_threads: value_t!(
617            matches,
618            "unified_scheduler_handler_threads",
619            usize
620        )
621        .ok(),
622        block_production_method: value_t_or_exit!(
623            matches,
624            "block_production_method",
625            BlockProductionMethod
626        ),
627        block_production_num_workers,
628        block_production_scheduler_config: SchedulerConfig {
629            scheduler_pacing: value_t_or_exit!(
630                matches,
631                "block_production_pacing_fill_time_millis",
632                SchedulerPacing
633            ),
634        },
635        enable_block_production_forwarding: staked_nodes_overrides_path.is_some(),
636        banking_trace_dir_byte_limit: parse_banking_trace_dir_byte_limit(matches),
637        validator_exit: Arc::new(RwLock::new(Exit::default())),
638        validator_exit_backpressure: [(
639            SnapshotPackagerService::NAME.to_string(),
640            Arc::new(AtomicBool::new(false)),
641        )]
642        .into(),
643    };
644
645    let reserved = validator_config
646        .retransmit_xdp
647        .as_ref()
648        .map(|xdp| xdp.cpus.clone())
649        .unwrap_or_default()
650        .iter()
651        .cloned()
652        .collect::<HashSet<_>>();
653    if !reserved.is_empty() {
654        let available = core_affinity::get_core_ids()
655            .unwrap_or_default()
656            .into_iter()
657            .map(|core_id| core_id.id)
658            .collect::<HashSet<_>>();
659        let available = available.difference(&reserved);
660        set_cpu_affinity(available.into_iter().copied()).unwrap();
661    }
662
663    let vote_account = pubkey_of(matches, "vote_account").unwrap_or_else(|| {
664        if !validator_config.voting_disabled {
665            warn!("--vote-account not specified, validator will not vote");
666            validator_config.voting_disabled = true;
667        }
668        Keypair::new().pubkey()
669    });
670
671    let dynamic_port_range =
672        solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
673            .expect("invalid dynamic_port_range");
674
675    let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
676    let minimal_snapshot_download_speed =
677        value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32);
678    let maximum_snapshot_download_abort =
679        value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64);
680
681    match validator_config.block_verification_method {
682        BlockVerificationMethod::BlockstoreProcessor => {
683            warn!(
684                "The value \"blockstore-processor\" for --block-verification-method has been \
685                 deprecated. The value \"blockstore-processor\" is still allowed for now, but is \
686                 planned for removal in the near future. To update, either set the value \
687                 \"unified-scheduler\" or remove the --block-verification-method argument"
688            );
689        }
690        BlockVerificationMethod::UnifiedScheduler => {}
691    }
692
693    let public_rpc_addr = matches
694        .value_of("public_rpc_addr")
695        .map(|addr| {
696            solana_net_utils::parse_host_port(addr)
697                .map_err(|err| format!("failed to parse public rpc address: {err}"))
698        })
699        .transpose()?;
700
701    if !matches.is_present("no_os_network_limits_test") {
702        if SystemMonitorService::check_os_network_limits() {
703            info!("OS network limits test passed.");
704        } else {
705            Err("OS network limit test failed. See \
706                https://docs.anza.xyz/operations/guides/validator-start#system-tuning"
707                .to_string())?;
708        }
709    }
710
711    let mut ledger_lock = ledger_lockfile(&ledger_path);
712    let _ledger_write_guard = lock_ledger(&ledger_path, &mut ledger_lock);
713
714    let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
715    let admin_service_post_init = Arc::new(RwLock::new(None));
716    let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
717        if starting_with_geyser_plugins {
718            let (sender, receiver) = unbounded();
719            (Some(sender), Some(receiver))
720        } else {
721            (None, None)
722        };
723    admin_rpc_service::run(
724        &ledger_path,
725        admin_rpc_service::AdminRpcRequestMetadata {
726            rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
727            start_time: std::time::SystemTime::now(),
728            validator_exit: validator_config.validator_exit.clone(),
729            validator_exit_backpressure: validator_config.validator_exit_backpressure.clone(),
730            start_progress: start_progress.clone(),
731            authorized_voter_keypairs: authorized_voter_keypairs.clone(),
732            post_init: admin_service_post_init.clone(),
733            tower_storage: validator_config.tower_storage.clone(),
734            staked_nodes_overrides,
735            rpc_to_plugin_manager_sender,
736        },
737    );
738
739    let gossip_host = matches
740        .value_of("gossip_host")
741        .map(|gossip_host| {
742            warn!(
743                "--gossip-host is deprecated. Use --bind-address or rely on automatic public IP \
744                 discovery instead."
745            );
746            solana_net_utils::parse_host(gossip_host)
747                .map_err(|err| format!("failed to parse --gossip-host: {err}"))
748        })
749        .transpose()?;
750
751    let advertised_ip = matches
752        .value_of("advertised_ip")
753        .map(|advertised_ip| {
754            solana_net_utils::parse_host(advertised_ip)
755                .map_err(|err| format!("failed to parse --advertised-ip: {err}"))
756        })
757        .transpose()?;
758
759    let advertised_ip = if let Some(ip) = gossip_host {
760        ip
761    } else if let Some(cli_ip) = advertised_ip {
762        cli_ip
763    } else if !bind_addresses.active().is_unspecified() && !bind_addresses.active().is_loopback() {
764        bind_addresses.active()
765    } else if !entrypoint_addrs.is_empty() {
766        let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
767        order.shuffle(&mut thread_rng());
768
769        order
770            .into_iter()
771            .find_map(|i| {
772                let entrypoint_addr = &entrypoint_addrs[i];
773                info!(
774                    "Contacting {entrypoint_addr} to determine the validator's public IP address"
775                );
776                solana_net_utils::get_public_ip_addr_with_binding(
777                    entrypoint_addr,
778                    bind_addresses.active(),
779                )
780                .map_or_else(
781                    |err| {
782                        warn!("Failed to contact cluster entrypoint {entrypoint_addr}: {err}");
783                        None
784                    },
785                    Some,
786                )
787            })
788            .ok_or_else(|| "unable to determine the validator's public IP address".to_string())?
789    } else {
790        IpAddr::V4(Ipv4Addr::LOCALHOST)
791    };
792    let gossip_port = value_t!(matches, "gossip_port", u16).or_else(|_| {
793        solana_net_utils::find_available_port_in_range(bind_addresses.active(), (0, 1))
794            .map_err(|err| format!("unable to find an available gossip port: {err}"))
795    })?;
796
797    let public_tpu_addr = matches
798        .value_of("public_tpu_addr")
799        .map(|public_tpu_addr| {
800            solana_net_utils::parse_host_port(public_tpu_addr)
801                .map_err(|err| format!("failed to parse --public-tpu-address: {err}"))
802        })
803        .transpose()?;
804
805    let public_tpu_forwards_addr = matches
806        .value_of("public_tpu_forwards_addr")
807        .map(|public_tpu_forwards_addr| {
808            solana_net_utils::parse_host_port(public_tpu_forwards_addr)
809                .map_err(|err| format!("failed to parse --public-tpu-forwards-address: {err}"))
810        })
811        .transpose()?;
812
813    let tpu_vortexor_receiver_address =
814        matches
815            .value_of("tpu_vortexor_receiver_address")
816            .map(|tpu_vortexor_receiver_address| {
817                solana_net_utils::parse_host_port(tpu_vortexor_receiver_address).unwrap_or_else(
818                    |err| {
819                        eprintln!("Failed to parse --tpu-vortexor-receiver-address: {err}");
820                        exit(1);
821                    },
822                )
823            });
824
825    info!("tpu_vortexor_receiver_address is {tpu_vortexor_receiver_address:?}");
826    let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
827
828    let tpu_max_connections_per_peer =
829        value_t_or_exit!(matches, "tpu_max_connections_per_peer", u64);
830    let tpu_max_staked_connections = value_t_or_exit!(matches, "tpu_max_staked_connections", u64);
831    let tpu_max_unstaked_connections =
832        value_t_or_exit!(matches, "tpu_max_unstaked_connections", u64);
833
834    let tpu_max_fwd_staked_connections =
835        value_t_or_exit!(matches, "tpu_max_fwd_staked_connections", u64);
836    let tpu_max_fwd_unstaked_connections =
837        value_t_or_exit!(matches, "tpu_max_fwd_unstaked_connections", u64);
838
839    let tpu_max_connections_per_ipaddr_per_minute: u64 =
840        value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64);
841    let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64);
842
843    let node_config = NodeConfig {
844        advertised_ip,
845        gossip_port,
846        port_range: dynamic_port_range,
847        bind_ip_addrs: bind_addresses,
848        public_tpu_addr,
849        public_tpu_forwards_addr,
850        num_tvu_receive_sockets: tvu_receive_threads,
851        num_tvu_retransmit_sockets: tvu_retransmit_threads,
852        num_quic_endpoints,
853        vortexor_receiver_addr: tpu_vortexor_receiver_address,
854    };
855
856    let cluster_entrypoints = entrypoint_addrs
857        .iter()
858        .map(ContactInfo::new_gossip_entry_point)
859        .collect::<Vec<_>>();
860
861    let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), node_config);
862
863    if restricted_repair_only_mode {
864        if validator_config.wen_restart_proto_path.is_some() {
865            Err("--restricted-repair-only-mode is not compatible with --wen_restart".to_string())?;
866        }
867
868        // When in --restricted_repair_only_mode is enabled only the gossip and repair ports
869        // need to be reachable by the entrypoint to respond to gossip pull requests and repair
870        // requests initiated by the node.  All other ports are unused.
871        node.info.remove_tpu();
872        node.info.remove_tpu_forwards();
873        node.info.remove_tvu();
874        node.info.remove_serve_repair();
875        node.info.remove_alpenglow();
876
877        // A node in this configuration shouldn't be an entrypoint to other nodes
878        node.sockets.ip_echo = None;
879    }
880
881    if !private_rpc {
882        macro_rules! set_socket {
883            ($method:ident, $addr:expr, $name:literal) => {
884                node.info.$method($addr).expect(&format!(
885                    "Operator must spin up node with valid {} address",
886                    $name
887                ))
888            };
889        }
890        if let Some(public_rpc_addr) = public_rpc_addr {
891            set_socket!(set_rpc, public_rpc_addr, "RPC");
892            set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub");
893        } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
894            let addr = node
895                .info
896                .gossip()
897                .expect("Operator must spin up node with valid gossip address")
898                .ip();
899            set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC");
900            set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub");
901        }
902    }
903
904    solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
905    solana_metrics::set_panic_hook("validator", Some(String::from(solana_version)));
906    solana_entry::entry::init_poh();
907    snapshot_utils::remove_tmp_snapshot_archives(
908        &validator_config.snapshot_config.full_snapshot_archives_dir,
909    );
910    snapshot_utils::remove_tmp_snapshot_archives(
911        &validator_config
912            .snapshot_config
913            .incremental_snapshot_archives_dir,
914    );
915
916    let should_check_duplicate_instance = true;
917    if !cluster_entrypoints.is_empty() {
918        bootstrap::rpc_bootstrap(
919            &node,
920            &identity_keypair,
921            &ledger_path,
922            &vote_account,
923            authorized_voter_keypairs.clone(),
924            &cluster_entrypoints,
925            &mut validator_config,
926            run_args.rpc_bootstrap_config,
927            do_port_check,
928            use_progress_bar,
929            maximum_local_snapshot_age,
930            should_check_duplicate_instance,
931            &start_progress,
932            minimal_snapshot_download_speed,
933            maximum_snapshot_download_abort,
934            run_args.socket_addr_space,
935        );
936        *start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
937    }
938
939    if operation == Operation::Initialize {
940        info!("Validator ledger initialization complete");
941        return Ok(());
942    }
943
944    // Bootstrap code above pushes a contact-info with more recent timestamp to
945    // gossip. If the node is staked the contact-info lingers in gossip causing
946    // false duplicate nodes error.
947    // Below line refreshes the timestamp on contact-info so that it overrides
948    // the one pushed by bootstrap.
949    node.info.hot_swap_pubkey(identity_keypair.pubkey());
950
951    let tpu_quic_server_config = SwQosQuicStreamerConfig {
952        quic_streamer_config: QuicStreamerConfig {
953            max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
954            max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
955            max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
956            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
957            num_threads: tpu_transaction_receive_threads,
958            ..Default::default()
959        },
960        qos_config: SwQosConfig { max_streams_per_ms },
961    };
962
963    let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
964        quic_streamer_config: QuicStreamerConfig {
965            max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
966            max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
967            max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
968            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
969            num_threads: tpu_transaction_forward_receive_threads,
970            ..Default::default()
971        },
972        qos_config: SwQosConfig { max_streams_per_ms },
973    };
974
975    let vote_quic_server_config = SimpleQosQuicStreamerConfig {
976        quic_streamer_config: QuicStreamerConfig {
977            max_connections_per_peer: 1,
978            max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
979            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
980            num_threads: tpu_vote_transaction_receive_threads,
981            ..Default::default()
982        },
983        qos_config: SimpleQosConfig {
984            max_streams_per_second: MAX_VOTES_PER_SECOND,
985        },
986    };
987
988    let validator = match Validator::new(
989        node,
990        identity_keypair,
991        &ledger_path,
992        &vote_account,
993        authorized_voter_keypairs,
994        cluster_entrypoints,
995        &validator_config,
996        should_check_duplicate_instance,
997        rpc_to_plugin_manager_receiver,
998        start_progress,
999        run_args.socket_addr_space,
1000        ValidatorTpuConfig {
1001            use_quic: tpu_use_quic,
1002            vote_use_quic,
1003            tpu_connection_pool_size,
1004            tpu_enable_udp,
1005            tpu_quic_server_config,
1006            tpu_fwd_quic_server_config,
1007            vote_quic_server_config,
1008        },
1009        admin_service_post_init,
1010    ) {
1011        Ok(validator) => Ok(validator),
1012        Err(err) => {
1013            if matches!(
1014                err.downcast_ref(),
1015                Some(&ValidatorError::WenRestartFinished)
1016            ) {
1017                // 200 is a special error code, see
1018                // https://github.com/solana-foundation/solana-improvement-documents/pull/46
1019                error!(
1020                    "Please remove --wen_restart and use --wait_for_supermajority as instructed \
1021                     above"
1022                );
1023                exit(200);
1024            }
1025            Err(format!("{err:?}"))
1026        }
1027    }?;
1028
1029    if let Some(filename) = init_complete_file {
1030        File::create(filename).map_err(|err| format!("unable to create {filename}: {err}"))?;
1031    }
1032    info!("Validator initialized");
1033    validator.join();
1034    info!("Validator exiting..");
1035
1036    Ok(())
1037}
1038
1039// This function is duplicated in ledger-tool/src/main.rs...
1040fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
1041    if matches.is_present(name) {
1042        Some(values_t_or_exit!(matches, name, Slot))
1043    } else {
1044        None
1045    }
1046}
1047
1048fn validators_set(
1049    identity_pubkey: &Pubkey,
1050    matches: &ArgMatches<'_>,
1051    matches_name: &str,
1052    arg_name: &str,
1053) -> Result<Option<HashSet<Pubkey>>, String> {
1054    if matches.is_present(matches_name) {
1055        let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
1056            .into_iter()
1057            .collect();
1058        if validators_set.contains(identity_pubkey) {
1059            Err(format!(
1060                "the validator's identity pubkey cannot be a {arg_name}: {identity_pubkey}"
1061            ))?;
1062        }
1063        Ok(Some(validators_set))
1064    } else {
1065        Ok(None)
1066    }
1067}
1068
1069fn get_cluster_shred_version(entrypoints: &[SocketAddr], bind_address: IpAddr) -> Option<u16> {
1070    let entrypoints = {
1071        let mut index: Vec<_> = (0..entrypoints.len()).collect();
1072        index.shuffle(&mut rand::thread_rng());
1073        index.into_iter().map(|i| &entrypoints[i])
1074    };
1075    for entrypoint in entrypoints {
1076        match solana_net_utils::get_cluster_shred_version_with_binding(entrypoint, bind_address) {
1077            Err(err) => eprintln!("get_cluster_shred_version failed: {entrypoint}, {err}"),
1078            Ok(0) => eprintln!("entrypoint {entrypoint} returned shred-version zero"),
1079            Ok(shred_version) => {
1080                info!("obtained shred-version {shred_version} from {entrypoint}");
1081                return Some(shred_version);
1082            }
1083        }
1084    }
1085    None
1086}
1087
1088fn parse_banking_trace_dir_byte_limit(matches: &ArgMatches) -> u64 {
1089    if matches.is_present("disable_banking_trace") {
1090        // disable with an explicit flag; This effectively becomes `opt-out` by resetting to
1091        // DISABLED_BAKING_TRACE_DIR, while allowing us to specify a default sensible limit in clap
1092        // configuration for cli help.
1093        DISABLED_BAKING_TRACE_DIR
1094    } else {
1095        // a default value in clap configuration (BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT) or
1096        // explicit user-supplied override value
1097        value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64)
1098    }
1099}
1100
1101fn new_snapshot_config(
1102    matches: &ArgMatches,
1103    ledger_path: &Path,
1104    account_paths: &[PathBuf],
1105    incremental_snapshot_fetch: bool,
1106) -> Result<SnapshotConfig, Box<dyn std::error::Error>> {
1107    let (full_snapshot_archive_interval, incremental_snapshot_archive_interval) =
1108        if matches.is_present("no_snapshots") {
1109            // snapshots are disabled
1110            (SnapshotInterval::Disabled, SnapshotInterval::Disabled)
1111        } else {
1112            match (
1113                incremental_snapshot_fetch,
1114                value_t_or_exit!(matches, "snapshot_interval_slots", NonZeroU64),
1115            ) {
1116                (true, incremental_snapshot_interval_slots) => {
1117                    // incremental snapshots are enabled
1118                    // use --snapshot-interval-slots for the incremental snapshot interval
1119                    let full_snapshot_interval_slots =
1120                        value_t_or_exit!(matches, "full_snapshot_interval_slots", NonZeroU64);
1121                    (
1122                        SnapshotInterval::Slots(full_snapshot_interval_slots),
1123                        SnapshotInterval::Slots(incremental_snapshot_interval_slots),
1124                    )
1125                }
1126                (false, full_snapshot_interval_slots) => {
1127                    // incremental snapshots are *disabled*
1128                    // use --snapshot-interval-slots for the *full* snapshot interval
1129                    // also warn if --full-snapshot-interval-slots was specified
1130                    if matches.occurrences_of("full_snapshot_interval_slots") > 0 {
1131                        warn!(
1132                            "Incremental snapshots are disabled, yet \
1133                             --full-snapshot-interval-slots was specified! Note that \
1134                             --full-snapshot-interval-slots is *ignored* when incremental \
1135                             snapshots are disabled. Use --snapshot-interval-slots instead.",
1136                        );
1137                    }
1138                    (
1139                        SnapshotInterval::Slots(full_snapshot_interval_slots),
1140                        SnapshotInterval::Disabled,
1141                    )
1142                }
1143            }
1144        };
1145
1146    info!(
1147        "Snapshot configuration: full snapshot interval: {}, incremental snapshot interval: {}",
1148        match full_snapshot_archive_interval {
1149            SnapshotInterval::Disabled => "disabled".to_string(),
1150            SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1151        },
1152        match incremental_snapshot_archive_interval {
1153            SnapshotInterval::Disabled => "disabled".to_string(),
1154            SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1155        },
1156    );
1157    // It is unlikely that a full snapshot interval greater than an epoch is a good idea.
1158    // Minimally we should warn the user in case this was a mistake.
1159    if let SnapshotInterval::Slots(full_snapshot_interval_slots) = full_snapshot_archive_interval {
1160        let full_snapshot_interval_slots = full_snapshot_interval_slots.get();
1161        if full_snapshot_interval_slots > DEFAULT_SLOTS_PER_EPOCH {
1162            warn!(
1163                "The full snapshot interval is excessively large: {full_snapshot_interval_slots}! \
1164                 This will negatively impact the background cleanup tasks in accounts-db. \
1165                 Consider a smaller value.",
1166            );
1167        }
1168    }
1169
1170    let snapshots_dir = matches
1171        .value_of("snapshots")
1172        .map(Path::new)
1173        .unwrap_or(ledger_path);
1174    let snapshots_dir = create_and_canonicalize_directory(snapshots_dir).map_err(|err| {
1175        format!(
1176            "failed to create snapshots directory '{}': {err}",
1177            snapshots_dir.display(),
1178        )
1179    })?;
1180    if account_paths
1181        .iter()
1182        .any(|account_path| account_path == &snapshots_dir)
1183    {
1184        Err(
1185            "the --accounts and --snapshots paths must be unique since they both create \
1186             'snapshots' subdirectories, otherwise there may be collisions"
1187                .to_string(),
1188        )?;
1189    }
1190
1191    let bank_snapshots_dir = snapshots_dir.join(BANK_SNAPSHOTS_DIR);
1192    fs::create_dir_all(&bank_snapshots_dir).map_err(|err| {
1193        format!(
1194            "failed to create bank snapshots directory '{}': {err}",
1195            bank_snapshots_dir.display(),
1196        )
1197    })?;
1198
1199    let full_snapshot_archives_dir = matches
1200        .value_of("full_snapshot_archive_path")
1201        .map(PathBuf::from)
1202        .unwrap_or_else(|| snapshots_dir.clone());
1203    fs::create_dir_all(&full_snapshot_archives_dir).map_err(|err| {
1204        format!(
1205            "failed to create full snapshot archives directory '{}': {err}",
1206            full_snapshot_archives_dir.display(),
1207        )
1208    })?;
1209
1210    let incremental_snapshot_archives_dir = matches
1211        .value_of("incremental_snapshot_archive_path")
1212        .map(PathBuf::from)
1213        .unwrap_or_else(|| snapshots_dir.clone());
1214    fs::create_dir_all(&incremental_snapshot_archives_dir).map_err(|err| {
1215        format!(
1216            "failed to create incremental snapshot archives directory '{}': {err}",
1217            incremental_snapshot_archives_dir.display(),
1218        )
1219    })?;
1220
1221    let archive_format = {
1222        let archive_format_str = value_t_or_exit!(matches, "snapshot_archive_format", String);
1223        let mut archive_format = ArchiveFormat::from_cli_arg(&archive_format_str)
1224            .unwrap_or_else(|| panic!("Archive format not recognized: {archive_format_str}"));
1225        if let ArchiveFormat::TarZstd { config } = &mut archive_format {
1226            config.compression_level =
1227                value_t_or_exit!(matches, "snapshot_zstd_compression_level", i32);
1228        }
1229        archive_format
1230    };
1231
1232    let snapshot_version = matches
1233        .value_of("snapshot_version")
1234        .map(|value| {
1235            value
1236                .parse::<SnapshotVersion>()
1237                .map_err(|err| format!("unable to parse snapshot version: {err}"))
1238        })
1239        .transpose()?
1240        .unwrap_or(SnapshotVersion::default());
1241
1242    let maximum_full_snapshot_archives_to_retain =
1243        value_t_or_exit!(matches, "maximum_full_snapshots_to_retain", NonZeroUsize);
1244    let maximum_incremental_snapshot_archives_to_retain = value_t_or_exit!(
1245        matches,
1246        "maximum_incremental_snapshots_to_retain",
1247        NonZeroUsize
1248    );
1249
1250    let snapshot_packager_niceness_adj =
1251        value_t_or_exit!(matches, "snapshot_packager_niceness_adj", i8);
1252
1253    let snapshot_config = SnapshotConfig {
1254        usage: if full_snapshot_archive_interval == SnapshotInterval::Disabled {
1255            SnapshotUsage::LoadOnly
1256        } else {
1257            SnapshotUsage::LoadAndGenerate
1258        },
1259        full_snapshot_archive_interval,
1260        incremental_snapshot_archive_interval,
1261        bank_snapshots_dir,
1262        full_snapshot_archives_dir,
1263        incremental_snapshot_archives_dir,
1264        archive_format,
1265        snapshot_version,
1266        maximum_full_snapshot_archives_to_retain,
1267        maximum_incremental_snapshot_archives_to_retain,
1268        packager_thread_niceness_adj: snapshot_packager_niceness_adj,
1269    };
1270
1271    if !is_snapshot_config_valid(&snapshot_config) {
1272        Err(
1273            "invalid snapshot configuration provided: snapshot intervals are incompatible. full \
1274             snapshot interval MUST be larger than incremental snapshot interval (if enabled)"
1275                .to_string(),
1276        )?;
1277    }
1278
1279    Ok(snapshot_config)
1280}