agave_validator/commands/run/
execute.rs

1use {
2    crate::{
3        admin_rpc_service::{self, load_staked_nodes_overrides, StakedNodesOverrides},
4        bootstrap,
5        cli::{self},
6        commands::{run::args::RunArgs, FromClapArgMatches},
7        ledger_lockfile, lock_ledger,
8    },
9    clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
10    crossbeam_channel::unbounded,
11    log::*,
12    rand::{seq::SliceRandom, thread_rng},
13    solana_accounts_db::{
14        accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
15        accounts_file::StorageAccess,
16        accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig, IndexLimitMb, ScanFilter},
17        hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
18        utils::{
19            create_all_accounts_run_and_snapshot_dirs, create_and_canonicalize_directories,
20            create_and_canonicalize_directory,
21        },
22    },
23    solana_clap_utils::input_parsers::{
24        keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of,
25    },
26    solana_clock::{Slot, DEFAULT_SLOTS_PER_EPOCH},
27    solana_core::{
28        banking_trace::DISABLED_BAKING_TRACE_DIR,
29        consensus::tower_storage,
30        repair::repair_handler::RepairHandlerType,
31        snapshot_packager_service::SnapshotPackagerService,
32        system_monitor_service::SystemMonitorService,
33        validator::{
34            is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod,
35            TransactionStructure, Validator, ValidatorConfig, ValidatorError,
36            ValidatorStartProgress, ValidatorTpuConfig,
37        },
38    },
39    solana_gossip::{
40        cluster_info::{NodeConfig, DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS},
41        contact_info::ContactInfo,
42        node::Node,
43    },
44    solana_hash::Hash,
45    solana_keypair::Keypair,
46    solana_ledger::{
47        blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
48        use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
49    },
50    solana_logger::redirect_stderr_to_file,
51    solana_net_utils::multihomed_sockets::BindIpAddrs,
52    solana_perf::recycler::enable_recycler_warming,
53    solana_poh::poh_service,
54    solana_pubkey::Pubkey,
55    solana_runtime::{
56        runtime_config::RuntimeConfig,
57        snapshot_config::{SnapshotConfig, SnapshotUsage},
58        snapshot_utils::{
59            self, ArchiveFormat, SnapshotInterval, SnapshotVersion, BANK_SNAPSHOTS_DIR,
60        },
61    },
62    solana_send_transaction_service::send_transaction_service,
63    solana_signer::Signer,
64    solana_streamer::quic::{QuicServerParams, DEFAULT_TPU_COALESCE},
65    solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
66    solana_turbine::{
67        broadcast_stage::BroadcastStageType,
68        xdp::{set_cpu_affinity, XdpConfig},
69    },
70    solana_validator_exit::Exit,
71    std::{
72        collections::HashSet,
73        fs::{self, File},
74        net::{IpAddr, Ipv4Addr, SocketAddr},
75        num::{NonZeroU64, NonZeroUsize},
76        path::{Path, PathBuf},
77        process::exit,
78        str::FromStr,
79        sync::{atomic::AtomicBool, Arc, RwLock},
80        time::Duration,
81    },
82};
83
84#[derive(Debug, PartialEq, Eq)]
85pub enum Operation {
86    Initialize,
87    Run,
88}
89
90const MILLIS_PER_SECOND: u64 = 1000;
91
92pub fn execute(
93    matches: &ArgMatches,
94    solana_version: &str,
95    ledger_path: &Path,
96    operation: Operation,
97) -> Result<(), Box<dyn std::error::Error>> {
98    let run_args = RunArgs::from_clap_arg_match(matches)?;
99
100    let cli::thread_args::NumThreadConfig {
101        accounts_db_background_threads,
102        accounts_db_foreground_threads,
103        accounts_db_hash_threads,
104        accounts_index_flush_threads,
105        block_production_num_workers,
106        ip_echo_server_threads,
107        rayon_global_threads,
108        replay_forks_threads,
109        replay_transactions_threads,
110        tpu_transaction_forward_receive_threads,
111        tpu_transaction_receive_threads,
112        tpu_vote_transaction_receive_threads,
113        tvu_receive_threads,
114        tvu_retransmit_threads,
115        tvu_sigverify_threads,
116    } = cli::thread_args::parse_num_threads_args(matches);
117
118    let identity_keypair = Arc::new(run_args.identity_keypair);
119
120    let logfile = run_args.logfile;
121    let logfile = if logfile == "-" {
122        None
123    } else {
124        println!("log file: {logfile}");
125        Some(logfile)
126    };
127    let use_progress_bar = logfile.is_none();
128    let _logger_thread = redirect_stderr_to_file(logfile);
129
130    info!("{} {}", crate_name!(), solana_version);
131    info!("Starting validator with: {:#?}", std::env::args_os());
132
133    let cuda = matches.is_present("cuda");
134    if cuda {
135        solana_perf::perf_libs::init_cuda();
136        enable_recycler_warming();
137    }
138
139    solana_core::validator::report_target_features();
140
141    let authorized_voter_keypairs = keypairs_of(matches, "authorized_voter_keypairs")
142        .map(|keypairs| keypairs.into_iter().map(Arc::new).collect())
143        .unwrap_or_else(|| vec![Arc::new(keypair_of(matches, "identity").expect("identity"))]);
144    let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
145
146    let staked_nodes_overrides_path = matches
147        .value_of("staked_nodes_overrides")
148        .map(str::to_string);
149    let staked_nodes_overrides = Arc::new(RwLock::new(
150        match &staked_nodes_overrides_path {
151            None => StakedNodesOverrides::default(),
152            Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| {
153                error!("Failed to load stake-nodes-overrides from {p}: {err}");
154                clap::Error::with_description(
155                    "Failed to load configuration of stake-nodes-overrides argument",
156                    clap::ErrorKind::InvalidValue,
157                )
158                .exit()
159            }),
160        }
161        .staked_map_id,
162    ));
163
164    let init_complete_file = matches.value_of("init_complete_file");
165
166    let private_rpc = matches.is_present("private_rpc");
167    let do_port_check = !matches.is_present("no_port_check");
168    let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64)
169        .map(Duration::from_millis)
170        .unwrap_or(DEFAULT_TPU_COALESCE);
171
172    // Canonicalize ledger path to avoid issues with symlink creation
173    let ledger_path = create_and_canonicalize_directory(ledger_path).map_err(|err| {
174        format!(
175            "unable to access ledger path '{}': {err}",
176            ledger_path.display(),
177        )
178    })?;
179
180    let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
181        let limit_ledger_size = match matches.value_of("limit_ledger_size") {
182            Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
183            None => DEFAULT_MAX_LEDGER_SHREDS,
184        };
185        if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
186            Err(format!(
187                "The provided --limit-ledger-size value was too small, the minimum value is \
188                 {DEFAULT_MIN_MAX_LEDGER_SHREDS}"
189            ))?;
190        }
191        Some(limit_ledger_size)
192    } else {
193        None
194    };
195
196    let debug_keys: Option<Arc<HashSet<_>>> = if matches.is_present("debug_key") {
197        Some(Arc::new(
198            values_t_or_exit!(matches, "debug_key", Pubkey)
199                .into_iter()
200                .collect(),
201        ))
202    } else {
203        None
204    };
205
206    let repair_validators = validators_set(
207        &identity_keypair.pubkey(),
208        matches,
209        "repair_validators",
210        "--repair-validator",
211    )?;
212    let repair_whitelist = validators_set(
213        &identity_keypair.pubkey(),
214        matches,
215        "repair_whitelist",
216        "--repair-whitelist",
217    )?;
218    let repair_whitelist = Arc::new(RwLock::new(repair_whitelist.unwrap_or_default()));
219    let gossip_validators = validators_set(
220        &identity_keypair.pubkey(),
221        matches,
222        "gossip_validators",
223        "--gossip-validator",
224    )?;
225
226    let bind_addresses = {
227        let parsed = matches
228            .values_of("bind_address")
229            .expect("bind_address should always be present due to default")
230            .map(solana_net_utils::parse_host)
231            .collect::<Result<Vec<_>, _>>()?;
232        BindIpAddrs::new(parsed).map_err(|err| format!("invalid bind_addresses: {err}"))?
233    };
234
235    let rpc_bind_address = if matches.is_present("rpc_bind_address") {
236        solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap())
237            .expect("invalid rpc_bind_address")
238    } else if private_rpc {
239        solana_net_utils::parse_host("127.0.0.1").unwrap()
240    } else {
241        bind_addresses.active()
242    };
243
244    let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
245
246    let account_indexes = AccountSecondaryIndexes::from_clap_arg_match(matches)?;
247
248    let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
249    let accounts_shrink_optimize_total_space =
250        value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
251    let tpu_use_quic = !matches.is_present("tpu_disable_quic");
252    if !tpu_use_quic {
253        warn!(
254            "TPU QUIC was disabled via --tpu_disable_quic, this will prevent validator from \
255             receiving transactions!"
256        );
257    }
258    let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool);
259
260    let tpu_enable_udp = if matches.is_present("tpu_enable_udp") {
261        warn!("Submission of TPU transactions via UDP is deprecated.");
262        true
263    } else {
264        DEFAULT_TPU_ENABLE_UDP
265    };
266
267    let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);
268
269    let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
270    if !(0.0..=1.0).contains(&shrink_ratio) {
271        Err(format!(
272            "the specified account-shrink-ratio is invalid, it must be between 0. and 1.0 \
273             inclusive: {shrink_ratio}"
274        ))?;
275    }
276
277    let shrink_ratio = if accounts_shrink_optimize_total_space {
278        AccountShrinkThreshold::TotalSpace { shrink_ratio }
279    } else {
280        AccountShrinkThreshold::IndividualStore { shrink_ratio }
281    };
282    let entrypoint_addrs = run_args.entrypoints;
283    for addr in &entrypoint_addrs {
284        if !run_args.socket_addr_space.check(addr) {
285            Err(format!("invalid entrypoint address: {addr}"))?;
286        }
287    }
288    // TODO: Once entrypoints are updated to return shred-version, this should
289    // abort if it fails to obtain a shred-version, so that nodes always join
290    // gossip with a valid shred-version. The code to adopt entrypoint shred
291    // version can then be deleted from gossip and get_rpc_node above.
292    let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
293        .ok()
294        .or_else(|| get_cluster_shred_version(&entrypoint_addrs, bind_addresses.active()));
295
296    let tower_path = value_t!(matches, "tower", PathBuf)
297        .ok()
298        .unwrap_or_else(|| ledger_path.clone());
299    let tower_storage: Arc<dyn tower_storage::TowerStorage> =
300        Arc::new(tower_storage::FileTowerStorage::new(tower_path));
301
302    let mut accounts_index_config = AccountsIndexConfig {
303        num_flush_threads: Some(accounts_index_flush_threads),
304        ..AccountsIndexConfig::default()
305    };
306    if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
307        accounts_index_config.bins = Some(bins);
308    }
309
310    accounts_index_config.index_limit_mb = if matches.is_present("disable_accounts_disk_index") {
311        IndexLimitMb::InMemOnly
312    } else {
313        IndexLimitMb::Minimal
314    };
315
316    {
317        let mut accounts_index_paths: Vec<PathBuf> = if matches.is_present("accounts_index_path") {
318            values_t_or_exit!(matches, "accounts_index_path", String)
319                .into_iter()
320                .map(PathBuf::from)
321                .collect()
322        } else {
323            vec![]
324        };
325        if accounts_index_paths.is_empty() {
326            accounts_index_paths = vec![ledger_path.join("accounts_index")];
327        }
328        accounts_index_config.drives = Some(accounts_index_paths);
329    }
330
331    const MB: usize = 1_024 * 1_024;
332    accounts_index_config.scan_results_limit_bytes =
333        value_t!(matches, "accounts_index_scan_results_limit_mb", usize)
334            .ok()
335            .map(|mb| mb * MB);
336
337    let account_shrink_paths: Option<Vec<PathBuf>> =
338        values_t!(matches, "account_shrink_path", String)
339            .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
340            .ok();
341    let account_shrink_paths = account_shrink_paths
342        .as_ref()
343        .map(|paths| {
344            create_and_canonicalize_directories(paths)
345                .map_err(|err| format!("unable to access account shrink path: {err}"))
346        })
347        .transpose()?;
348
349    let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
350        .map(|paths| {
351            create_all_accounts_run_and_snapshot_dirs(&paths)
352                .map_err(|err| format!("unable to create account subdirectories: {err}"))
353        })
354        .transpose()?
355        .unzip();
356
357    let read_cache_limit_bytes =
358        values_of::<usize>(matches, "accounts_db_read_cache_limit").map(|limits| {
359            match limits.len() {
360                2 => (limits[0], limits[1]),
361                _ => {
362                    // clap will enforce two values are given
363                    unreachable!("invalid number of values given to accounts-db-read-cache-limit")
364                }
365            }
366        });
367    // accounts-db-read-cache-limit-mb was deprecated in v3.0.0
368    let read_cache_limit_mb =
369        values_of::<usize>(matches, "accounts_db_read_cache_limit_mb").map(|limits| {
370            match limits.len() {
371                // we were given explicit low and high watermark values, so use them
372                2 => (limits[0] * MB, limits[1] * MB),
373                // we were given a single value, so use it for both low and high watermarks
374                1 => (limits[0] * MB, limits[0] * MB),
375                _ => {
376                    // clap will enforce either one or two values is given
377                    unreachable!(
378                        "invalid number of values given to accounts-db-read-cache-limit-mb"
379                    )
380                }
381            }
382        });
383    // clap will enforce only one cli arg is provided, so pick whichever is Some
384    let read_cache_limit_bytes = read_cache_limit_bytes.or(read_cache_limit_mb);
385
386    let storage_access = matches
387        .value_of("accounts_db_access_storages_method")
388        .map(|method| match method {
389            "mmap" => StorageAccess::Mmap,
390            "file" => StorageAccess::File,
391            _ => {
392                // clap will enforce one of the above values is given
393                unreachable!("invalid value given to accounts-db-access-storages-method")
394            }
395        })
396        .unwrap_or_default();
397
398    let scan_filter_for_shrinking = matches
399        .value_of("accounts_db_scan_filter_for_shrinking")
400        .map(|filter| match filter {
401            "all" => ScanFilter::All,
402            "only-abnormal" => ScanFilter::OnlyAbnormal,
403            "only-abnormal-with-verify" => ScanFilter::OnlyAbnormalWithVerify,
404            _ => {
405                // clap will enforce one of the above values is given
406                unreachable!("invalid value given to accounts_db_scan_filter_for_shrinking")
407            }
408        })
409        .unwrap_or_default();
410
411    let mark_obsolete_accounts = matches.is_present("accounts_db_mark_obsolete_accounts");
412
413    let accounts_db_config = AccountsDbConfig {
414        index: Some(accounts_index_config),
415        account_indexes: Some(account_indexes.clone()),
416        base_working_path: Some(ledger_path.clone()),
417        shrink_paths: account_shrink_run_paths,
418        shrink_ratio,
419        read_cache_limit_bytes,
420        write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
421            .ok()
422            .map(|mb| mb * MB as u64),
423        ancient_append_vec_offset: value_t!(matches, "accounts_db_ancient_append_vecs", i64).ok(),
424        ancient_storage_ideal_size: value_t!(
425            matches,
426            "accounts_db_ancient_storage_ideal_size",
427            u64
428        )
429        .ok(),
430        max_ancient_storages: value_t!(matches, "accounts_db_max_ancient_storages", usize).ok(),
431        exhaustively_verify_refcounts: matches.is_present("accounts_db_verify_refcounts"),
432        storage_access,
433        scan_filter_for_shrinking,
434        num_background_threads: Some(accounts_db_background_threads),
435        num_foreground_threads: Some(accounts_db_foreground_threads),
436        num_hash_threads: Some(accounts_db_hash_threads),
437        mark_obsolete_accounts,
438        ..AccountsDbConfig::default()
439    };
440
441    let accounts_db_config = Some(accounts_db_config);
442
443    let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
444        Some(
445            values_t_or_exit!(matches, "geyser_plugin_config", String)
446                .into_iter()
447                .map(PathBuf::from)
448                .collect(),
449        )
450    } else {
451        None
452    };
453    let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
454        || matches.is_present("geyser_plugin_always_enabled");
455
456    let rpc_send_retry_rate_ms = value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64);
457    let rpc_send_batch_size = value_t_or_exit!(matches, "rpc_send_transaction_batch_size", usize);
458    let rpc_send_batch_send_rate_ms =
459        value_t_or_exit!(matches, "rpc_send_transaction_batch_ms", u64);
460
461    if rpc_send_batch_send_rate_ms > rpc_send_retry_rate_ms {
462        Err(format!(
463            "the specified rpc-send-batch-ms ({rpc_send_batch_send_rate_ms}) is invalid, it must \
464             be <= rpc-send-retry-ms ({rpc_send_retry_rate_ms})"
465        ))?;
466    }
467
468    let tps = rpc_send_batch_size as u64 * MILLIS_PER_SECOND / rpc_send_batch_send_rate_ms;
469    if tps > send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND {
470        Err(format!(
471            "either the specified rpc-send-batch-size ({}) or rpc-send-batch-ms ({}) is invalid, \
472             'rpc-send-batch-size * 1000 / rpc-send-batch-ms' must be smaller than ({}) .",
473            rpc_send_batch_size,
474            rpc_send_batch_send_rate_ms,
475            send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND
476        ))?;
477    }
478    let rpc_send_transaction_tpu_peers = matches
479        .values_of("rpc_send_transaction_tpu_peer")
480        .map(|values| {
481            values
482                .map(solana_net_utils::parse_host_port)
483                .collect::<Result<Vec<SocketAddr>, String>>()
484        })
485        .transpose()
486        .map_err(|err| {
487            format!("failed to parse rpc send-transaction-service tpu peer address: {err}")
488        })?;
489    let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
490    let leader_forward_count =
491        if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
492            // rpc-sts is configured to send only to specific tpu peers. disable leader forwards
493            0
494        } else {
495            value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
496        };
497
498    let xdp_interface = matches.value_of("retransmit_xdp_interface");
499    let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
500    let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
501        XdpConfig::new(
502            xdp_interface,
503            parse_cpu_ranges(cpus).unwrap(),
504            xdp_zero_copy,
505        )
506    });
507
508    let account_paths: Vec<PathBuf> =
509        if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
510            account_paths
511                .join(",")
512                .split(',')
513                .map(PathBuf::from)
514                .collect()
515        } else {
516            vec![ledger_path.join("accounts")]
517        };
518    let account_paths = create_and_canonicalize_directories(account_paths)
519        .map_err(|err| format!("unable to access account path: {err}"))?;
520
521    // From now on, use run/ paths in the same way as the previous account_paths.
522    let (account_run_paths, account_snapshot_paths) =
523        create_all_accounts_run_and_snapshot_dirs(&account_paths)
524            .map_err(|err| format!("unable to create account directories: {err}"))?;
525
526    // These snapshot paths are only used for initial clean up, add in shrink paths if they exist.
527    let account_snapshot_paths =
528        if let Some(account_shrink_snapshot_paths) = account_shrink_snapshot_paths {
529            account_snapshot_paths
530                .into_iter()
531                .chain(account_shrink_snapshot_paths)
532                .collect()
533        } else {
534            account_snapshot_paths
535        };
536
537    let snapshot_config = new_snapshot_config(
538        matches,
539        &ledger_path,
540        &account_paths,
541        run_args.rpc_bootstrap_config.incremental_snapshot_fetch,
542    )?;
543
544    let use_snapshot_archives_at_startup = value_t_or_exit!(
545        matches,
546        use_snapshot_archives_at_startup::cli::NAME,
547        UseSnapshotArchivesAtStartup
548    );
549
550    if mark_obsolete_accounts
551        && use_snapshot_archives_at_startup != UseSnapshotArchivesAtStartup::Always
552    {
553        Err(format!(
554            "The --accounts-db-mark-obsolete-accounts option requires \
555             the --use-snapshot-archives-at-startup option to be set to {}. \
556             Current value: {}",
557            UseSnapshotArchivesAtStartup::Always,
558            use_snapshot_archives_at_startup
559        ))?;
560    }
561
562    let mut validator_config = ValidatorConfig {
563        require_tower: matches.is_present("require_tower"),
564        tower_storage,
565        halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
566        max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
567        expected_genesis_hash: matches
568            .value_of("expected_genesis_hash")
569            .map(|s| Hash::from_str(s).unwrap()),
570        fixed_leader_schedule: None,
571        expected_bank_hash: matches
572            .value_of("expected_bank_hash")
573            .map(|s| Hash::from_str(s).unwrap()),
574        expected_shred_version,
575        new_hard_forks: hardforks_of(matches, "hard_forks"),
576        rpc_config: run_args.json_rpc_config,
577        on_start_geyser_plugin_config_files,
578        geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
579        rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
580            (
581                SocketAddr::new(rpc_bind_address, rpc_port),
582                SocketAddr::new(rpc_bind_address, rpc_port + 1),
583                // If additional ports are added, +2 needs to be skipped to avoid a conflict with
584                // the websocket port (which is +2) in web3.js This odd port shifting is tracked at
585                // https://github.com/solana-labs/solana/issues/12250
586            )
587        }),
588        pubsub_config: run_args.pub_sub_config,
589        voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
590        wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
591        known_validators: run_args.known_validators,
592        repair_validators,
593        repair_whitelist,
594        repair_handler_type: RepairHandlerType::default(),
595        gossip_validators,
596        max_ledger_shreds,
597        blockstore_options: run_args.blockstore_options,
598        run_verification: !matches.is_present("skip_startup_ledger_verification"),
599        debug_keys,
600        warp_slot: None,
601        generator_config: None,
602        contact_debug_interval,
603        contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
604        send_transaction_service_config: send_transaction_service::Config {
605            retry_rate_ms: rpc_send_retry_rate_ms,
606            leader_forward_count,
607            default_max_retries: value_t!(
608                matches,
609                "rpc_send_transaction_default_max_retries",
610                usize
611            )
612            .ok(),
613            service_max_retries: value_t_or_exit!(
614                matches,
615                "rpc_send_transaction_service_max_retries",
616                usize
617            ),
618            batch_send_rate_ms: rpc_send_batch_send_rate_ms,
619            batch_size: rpc_send_batch_size,
620            retry_pool_max_size: value_t_or_exit!(
621                matches,
622                "rpc_send_transaction_retry_pool_max_size",
623                usize
624            ),
625            tpu_peers: rpc_send_transaction_tpu_peers,
626        },
627        no_poh_speed_test: matches.is_present("no_poh_speed_test"),
628        no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
629        no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
630        no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
631        no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
632        poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core")
633            .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
634        poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch")
635            .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
636        process_ledger_before_services: matches.is_present("process_ledger_before_services"),
637        account_paths: account_run_paths,
638        account_snapshot_paths,
639        accounts_db_config,
640        accounts_db_skip_shrink: true,
641        accounts_db_force_initial_clean: matches.is_present("no_skip_initial_accounts_db_clean"),
642        snapshot_config,
643        tpu_coalesce,
644        no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
645        wait_to_vote_slot: None,
646        runtime_config: RuntimeConfig {
647            log_messages_bytes_limit: value_of(matches, "log_messages_bytes_limit"),
648            ..RuntimeConfig::default()
649        },
650        staked_nodes_overrides: staked_nodes_overrides.clone(),
651        use_snapshot_archives_at_startup,
652        ip_echo_server_threads,
653        rayon_global_threads,
654        replay_forks_threads,
655        replay_transactions_threads,
656        tvu_shred_sigverify_threads: tvu_sigverify_threads,
657        delay_leader_block_for_pending_fork: matches
658            .is_present("delay_leader_block_for_pending_fork"),
659        wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),
660        wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(),
661        turbine_disabled: Arc::<AtomicBool>::default(),
662        retransmit_xdp,
663        broadcast_stage_type: BroadcastStageType::Standard,
664        use_tpu_client_next: !matches.is_present("use_connection_cache"),
665        block_verification_method: value_t_or_exit!(
666            matches,
667            "block_verification_method",
668            BlockVerificationMethod
669        ),
670        unified_scheduler_handler_threads: value_t!(
671            matches,
672            "unified_scheduler_handler_threads",
673            usize
674        )
675        .ok(),
676        block_production_method: value_t_or_exit!(
677            matches,
678            "block_production_method",
679            BlockProductionMethod
680        ),
681        block_production_num_workers,
682        transaction_struct: value_t_or_exit!(matches, "transaction_struct", TransactionStructure),
683        enable_block_production_forwarding: staked_nodes_overrides_path.is_some(),
684        banking_trace_dir_byte_limit: parse_banking_trace_dir_byte_limit(matches),
685        validator_exit: Arc::new(RwLock::new(Exit::default())),
686        validator_exit_backpressure: [(
687            SnapshotPackagerService::NAME.to_string(),
688            Arc::new(AtomicBool::new(false)),
689        )]
690        .into(),
691    };
692
693    let reserved = validator_config
694        .retransmit_xdp
695        .as_ref()
696        .map(|xdp| xdp.cpus.clone())
697        .unwrap_or_default()
698        .iter()
699        .cloned()
700        .collect::<HashSet<_>>();
701    if !reserved.is_empty() {
702        let available = core_affinity::get_core_ids()
703            .unwrap_or_default()
704            .into_iter()
705            .map(|core_id| core_id.id)
706            .collect::<HashSet<_>>();
707        let available = available.difference(&reserved);
708        set_cpu_affinity(available.into_iter().copied()).unwrap();
709    }
710
711    let vote_account = pubkey_of(matches, "vote_account").unwrap_or_else(|| {
712        if !validator_config.voting_disabled {
713            warn!("--vote-account not specified, validator will not vote");
714            validator_config.voting_disabled = true;
715        }
716        Keypair::new().pubkey()
717    });
718
719    let dynamic_port_range =
720        solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
721            .expect("invalid dynamic_port_range");
722
723    let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
724    let minimal_snapshot_download_speed =
725        value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32);
726    let maximum_snapshot_download_abort =
727        value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64);
728
729    match validator_config.block_verification_method {
730        BlockVerificationMethod::BlockstoreProcessor => {
731            warn!(
732                "The value \"blockstore-processor\" for --block-verification-method has been \
733                 deprecated. The value \"blockstore-processor\" is still allowed for now, but is \
734                 planned for removal in the near future. To update, either set the value \
735                 \"unified-scheduler\" or remove the --block-verification-method argument"
736            );
737        }
738        BlockVerificationMethod::UnifiedScheduler => {}
739    }
740
741    let public_rpc_addr = matches
742        .value_of("public_rpc_addr")
743        .map(|addr| {
744            solana_net_utils::parse_host_port(addr)
745                .map_err(|err| format!("failed to parse public rpc address: {err}"))
746        })
747        .transpose()?;
748
749    if !matches.is_present("no_os_network_limits_test") {
750        if SystemMonitorService::check_os_network_limits() {
751            info!("OS network limits test passed.");
752        } else {
753            Err("OS network limit test failed. See \
754                https://docs.anza.xyz/operations/guides/validator-start#system-tuning"
755                .to_string())?;
756        }
757    }
758
759    let mut ledger_lock = ledger_lockfile(&ledger_path);
760    let _ledger_write_guard = lock_ledger(&ledger_path, &mut ledger_lock);
761
762    let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
763    let admin_service_post_init = Arc::new(RwLock::new(None));
764    let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
765        if starting_with_geyser_plugins {
766            let (sender, receiver) = unbounded();
767            (Some(sender), Some(receiver))
768        } else {
769            (None, None)
770        };
771    admin_rpc_service::run(
772        &ledger_path,
773        admin_rpc_service::AdminRpcRequestMetadata {
774            rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
775            start_time: std::time::SystemTime::now(),
776            validator_exit: validator_config.validator_exit.clone(),
777            validator_exit_backpressure: validator_config.validator_exit_backpressure.clone(),
778            start_progress: start_progress.clone(),
779            authorized_voter_keypairs: authorized_voter_keypairs.clone(),
780            post_init: admin_service_post_init.clone(),
781            tower_storage: validator_config.tower_storage.clone(),
782            staked_nodes_overrides,
783            rpc_to_plugin_manager_sender,
784        },
785    );
786
787    let gossip_host = matches
788        .value_of("gossip_host")
789        .map(|gossip_host| {
790            warn!(
791                "--gossip-host is deprecated. Use --bind-address or rely on automatic public IP \
792                 discovery instead."
793            );
794            solana_net_utils::parse_host(gossip_host)
795                .map_err(|err| format!("failed to parse --gossip-host: {err}"))
796        })
797        .transpose()?;
798
799    let advertised_ip = if let Some(ip) = gossip_host {
800        ip
801    } else if !bind_addresses.active().is_unspecified() && !bind_addresses.active().is_loopback() {
802        bind_addresses.active()
803    } else if !entrypoint_addrs.is_empty() {
804        let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
805        order.shuffle(&mut thread_rng());
806
807        order
808            .into_iter()
809            .find_map(|i| {
810                let entrypoint_addr = &entrypoint_addrs[i];
811                info!(
812                    "Contacting {entrypoint_addr} to determine the validator's public IP address"
813                );
814                solana_net_utils::get_public_ip_addr_with_binding(
815                    entrypoint_addr,
816                    bind_addresses.active(),
817                )
818                .map_or_else(
819                    |err| {
820                        warn!("Failed to contact cluster entrypoint {entrypoint_addr}: {err}");
821                        None
822                    },
823                    Some,
824                )
825            })
826            .ok_or_else(|| "unable to determine the validator's public IP address".to_string())?
827    } else {
828        IpAddr::V4(Ipv4Addr::LOCALHOST)
829    };
830    let gossip_port = value_t!(matches, "gossip_port", u16).or_else(|_| {
831        solana_net_utils::find_available_port_in_range(bind_addresses.active(), (0, 1))
832            .map_err(|err| format!("unable to find an available gossip port: {err}"))
833    })?;
834
835    let public_tpu_addr = matches
836        .value_of("public_tpu_addr")
837        .map(|public_tpu_addr| {
838            solana_net_utils::parse_host_port(public_tpu_addr)
839                .map_err(|err| format!("failed to parse --public-tpu-address: {err}"))
840        })
841        .transpose()?;
842
843    let public_tpu_forwards_addr = matches
844        .value_of("public_tpu_forwards_addr")
845        .map(|public_tpu_forwards_addr| {
846            solana_net_utils::parse_host_port(public_tpu_forwards_addr)
847                .map_err(|err| format!("failed to parse --public-tpu-forwards-address: {err}"))
848        })
849        .transpose()?;
850
851    let tpu_vortexor_receiver_address =
852        matches
853            .value_of("tpu_vortexor_receiver_address")
854            .map(|tpu_vortexor_receiver_address| {
855                solana_net_utils::parse_host_port(tpu_vortexor_receiver_address).unwrap_or_else(
856                    |err| {
857                        eprintln!("Failed to parse --tpu-vortexor-receiver-address: {err}");
858                        exit(1);
859                    },
860                )
861            });
862
863    info!("tpu_vortexor_receiver_address is {tpu_vortexor_receiver_address:?}");
864    let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
865
866    let tpu_max_connections_per_peer =
867        value_t_or_exit!(matches, "tpu_max_connections_per_peer", u64);
868    let tpu_max_staked_connections = value_t_or_exit!(matches, "tpu_max_staked_connections", u64);
869    let tpu_max_unstaked_connections =
870        value_t_or_exit!(matches, "tpu_max_unstaked_connections", u64);
871
872    let tpu_max_fwd_staked_connections =
873        value_t_or_exit!(matches, "tpu_max_fwd_staked_connections", u64);
874    let tpu_max_fwd_unstaked_connections =
875        value_t_or_exit!(matches, "tpu_max_fwd_unstaked_connections", u64);
876
877    let tpu_max_connections_per_ipaddr_per_minute: u64 =
878        value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64);
879    let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64);
880
881    let node_config = NodeConfig {
882        advertised_ip,
883        gossip_port,
884        port_range: dynamic_port_range,
885        bind_ip_addrs: Arc::new(bind_addresses),
886        public_tpu_addr,
887        public_tpu_forwards_addr,
888        num_tvu_receive_sockets: tvu_receive_threads,
889        num_tvu_retransmit_sockets: tvu_retransmit_threads,
890        num_quic_endpoints,
891        vortexor_receiver_addr: tpu_vortexor_receiver_address,
892    };
893
894    let cluster_entrypoints = entrypoint_addrs
895        .iter()
896        .map(ContactInfo::new_gossip_entry_point)
897        .collect::<Vec<_>>();
898
899    let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), node_config);
900
901    if restricted_repair_only_mode {
902        if validator_config.wen_restart_proto_path.is_some() {
903            Err("--restricted-repair-only-mode is not compatible with --wen_restart".to_string())?;
904        }
905
906        // When in --restricted_repair_only_mode is enabled only the gossip and repair ports
907        // need to be reachable by the entrypoint to respond to gossip pull requests and repair
908        // requests initiated by the node.  All other ports are unused.
909        node.info.remove_tpu();
910        node.info.remove_tpu_forwards();
911        node.info.remove_tvu();
912        node.info.remove_serve_repair();
913
914        // A node in this configuration shouldn't be an entrypoint to other nodes
915        node.sockets.ip_echo = None;
916    }
917
918    if !private_rpc {
919        macro_rules! set_socket {
920            ($method:ident, $addr:expr, $name:literal) => {
921                node.info.$method($addr).expect(&format!(
922                    "Operator must spin up node with valid {} address",
923                    $name
924                ))
925            };
926        }
927        if let Some(public_rpc_addr) = public_rpc_addr {
928            set_socket!(set_rpc, public_rpc_addr, "RPC");
929            set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub");
930        } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
931            let addr = node
932                .info
933                .gossip()
934                .expect("Operator must spin up node with valid gossip address")
935                .ip();
936            set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC");
937            set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub");
938        }
939    }
940
941    solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
942    solana_metrics::set_panic_hook("validator", Some(String::from(solana_version)));
943    solana_entry::entry::init_poh();
944    snapshot_utils::remove_tmp_snapshot_archives(
945        &validator_config.snapshot_config.full_snapshot_archives_dir,
946    );
947    snapshot_utils::remove_tmp_snapshot_archives(
948        &validator_config
949            .snapshot_config
950            .incremental_snapshot_archives_dir,
951    );
952
953    let should_check_duplicate_instance = true;
954    if !cluster_entrypoints.is_empty() {
955        bootstrap::rpc_bootstrap(
956            &node,
957            &identity_keypair,
958            &ledger_path,
959            &vote_account,
960            authorized_voter_keypairs.clone(),
961            &cluster_entrypoints,
962            &mut validator_config,
963            run_args.rpc_bootstrap_config,
964            do_port_check,
965            use_progress_bar,
966            maximum_local_snapshot_age,
967            should_check_duplicate_instance,
968            &start_progress,
969            minimal_snapshot_download_speed,
970            maximum_snapshot_download_abort,
971            run_args.socket_addr_space,
972        );
973        *start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
974    }
975
976    if operation == Operation::Initialize {
977        info!("Validator ledger initialization complete");
978        return Ok(());
979    }
980
981    // Bootstrap code above pushes a contact-info with more recent timestamp to
982    // gossip. If the node is staked the contact-info lingers in gossip causing
983    // false duplicate nodes error.
984    // Below line refreshes the timestamp on contact-info so that it overrides
985    // the one pushed by bootstrap.
986    node.info.hot_swap_pubkey(identity_keypair.pubkey());
987
988    let tpu_quic_server_config = QuicServerParams {
989        max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
990        max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
991        max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
992        max_streams_per_ms,
993        max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
994        coalesce: tpu_coalesce,
995        num_threads: tpu_transaction_receive_threads,
996        ..Default::default()
997    };
998
999    let tpu_fwd_quic_server_config = QuicServerParams {
1000        max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
1001        max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
1002        max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
1003        max_streams_per_ms,
1004        max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1005        coalesce: tpu_coalesce,
1006        num_threads: tpu_transaction_forward_receive_threads,
1007        ..Default::default()
1008    };
1009
1010    // Vote shares TPU forward's characteristics, except that we accept 1 connection
1011    // per peer and no unstaked connections are accepted.
1012    let mut vote_quic_server_config = tpu_fwd_quic_server_config.clone();
1013    vote_quic_server_config.max_connections_per_peer = 1;
1014    vote_quic_server_config.max_unstaked_connections = 0;
1015    vote_quic_server_config.num_threads = tpu_vote_transaction_receive_threads;
1016
1017    let validator = match Validator::new(
1018        node,
1019        identity_keypair,
1020        &ledger_path,
1021        &vote_account,
1022        authorized_voter_keypairs,
1023        cluster_entrypoints,
1024        &validator_config,
1025        should_check_duplicate_instance,
1026        rpc_to_plugin_manager_receiver,
1027        start_progress,
1028        run_args.socket_addr_space,
1029        ValidatorTpuConfig {
1030            use_quic: tpu_use_quic,
1031            vote_use_quic,
1032            tpu_connection_pool_size,
1033            tpu_enable_udp,
1034            tpu_quic_server_config,
1035            tpu_fwd_quic_server_config,
1036            vote_quic_server_config,
1037        },
1038        admin_service_post_init,
1039    ) {
1040        Ok(validator) => Ok(validator),
1041        Err(err) => {
1042            if matches!(
1043                err.downcast_ref(),
1044                Some(&ValidatorError::WenRestartFinished)
1045            ) {
1046                // 200 is a special error code, see
1047                // https://github.com/solana-foundation/solana-improvement-documents/pull/46
1048                error!(
1049                    "Please remove --wen_restart and use --wait_for_supermajority as instructed \
1050                     above"
1051                );
1052                exit(200);
1053            }
1054            Err(format!("{err:?}"))
1055        }
1056    }?;
1057
1058    if let Some(filename) = init_complete_file {
1059        File::create(filename).map_err(|err| format!("unable to create {filename}: {err}"))?;
1060    }
1061    info!("Validator initialized");
1062    validator.join();
1063    info!("Validator exiting..");
1064
1065    Ok(())
1066}
1067
1068// This function is duplicated in ledger-tool/src/main.rs...
1069fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
1070    if matches.is_present(name) {
1071        Some(values_t_or_exit!(matches, name, Slot))
1072    } else {
1073        None
1074    }
1075}
1076
1077fn validators_set(
1078    identity_pubkey: &Pubkey,
1079    matches: &ArgMatches<'_>,
1080    matches_name: &str,
1081    arg_name: &str,
1082) -> Result<Option<HashSet<Pubkey>>, String> {
1083    if matches.is_present(matches_name) {
1084        let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
1085            .into_iter()
1086            .collect();
1087        if validators_set.contains(identity_pubkey) {
1088            Err(format!(
1089                "the validator's identity pubkey cannot be a {arg_name}: {identity_pubkey}"
1090            ))?;
1091        }
1092        Ok(Some(validators_set))
1093    } else {
1094        Ok(None)
1095    }
1096}
1097
1098fn get_cluster_shred_version(entrypoints: &[SocketAddr], bind_address: IpAddr) -> Option<u16> {
1099    let entrypoints = {
1100        let mut index: Vec<_> = (0..entrypoints.len()).collect();
1101        index.shuffle(&mut rand::thread_rng());
1102        index.into_iter().map(|i| &entrypoints[i])
1103    };
1104    for entrypoint in entrypoints {
1105        match solana_net_utils::get_cluster_shred_version_with_binding(entrypoint, bind_address) {
1106            Err(err) => eprintln!("get_cluster_shred_version failed: {entrypoint}, {err}"),
1107            Ok(0) => eprintln!("entrypoint {entrypoint} returned shred-version zero"),
1108            Ok(shred_version) => {
1109                info!("obtained shred-version {shred_version} from {entrypoint}");
1110                return Some(shred_version);
1111            }
1112        }
1113    }
1114    None
1115}
1116
1117fn parse_banking_trace_dir_byte_limit(matches: &ArgMatches) -> u64 {
1118    if matches.is_present("disable_banking_trace") {
1119        // disable with an explicit flag; This effectively becomes `opt-out` by resetting to
1120        // DISABLED_BAKING_TRACE_DIR, while allowing us to specify a default sensible limit in clap
1121        // configuration for cli help.
1122        DISABLED_BAKING_TRACE_DIR
1123    } else {
1124        // a default value in clap configuration (BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT) or
1125        // explicit user-supplied override value
1126        value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64)
1127    }
1128}
1129
1130fn new_snapshot_config(
1131    matches: &ArgMatches,
1132    ledger_path: &Path,
1133    account_paths: &[PathBuf],
1134    incremental_snapshot_fetch: bool,
1135) -> Result<SnapshotConfig, Box<dyn std::error::Error>> {
1136    let (full_snapshot_archive_interval, incremental_snapshot_archive_interval) =
1137        if matches.is_present("no_snapshots") {
1138            // snapshots are disabled
1139            (SnapshotInterval::Disabled, SnapshotInterval::Disabled)
1140        } else {
1141            match (
1142                incremental_snapshot_fetch,
1143                value_t_or_exit!(matches, "snapshot_interval_slots", NonZeroU64),
1144            ) {
1145                (true, incremental_snapshot_interval_slots) => {
1146                    // incremental snapshots are enabled
1147                    // use --snapshot-interval-slots for the incremental snapshot interval
1148                    let full_snapshot_interval_slots =
1149                        value_t_or_exit!(matches, "full_snapshot_interval_slots", NonZeroU64);
1150                    (
1151                        SnapshotInterval::Slots(full_snapshot_interval_slots),
1152                        SnapshotInterval::Slots(incremental_snapshot_interval_slots),
1153                    )
1154                }
1155                (false, full_snapshot_interval_slots) => {
1156                    // incremental snapshots are *disabled*
1157                    // use --snapshot-interval-slots for the *full* snapshot interval
1158                    // also warn if --full-snapshot-interval-slots was specified
1159                    if matches.occurrences_of("full_snapshot_interval_slots") > 0 {
1160                        warn!(
1161                            "Incremental snapshots are disabled, yet \
1162                             --full-snapshot-interval-slots was specified! Note that \
1163                             --full-snapshot-interval-slots is *ignored* when incremental \
1164                             snapshots are disabled. Use --snapshot-interval-slots instead.",
1165                        );
1166                    }
1167                    (
1168                        SnapshotInterval::Slots(full_snapshot_interval_slots),
1169                        SnapshotInterval::Disabled,
1170                    )
1171                }
1172            }
1173        };
1174
1175    info!(
1176        "Snapshot configuration: full snapshot interval: {}, incremental snapshot interval: {}",
1177        match full_snapshot_archive_interval {
1178            SnapshotInterval::Disabled => "disabled".to_string(),
1179            SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1180        },
1181        match incremental_snapshot_archive_interval {
1182            SnapshotInterval::Disabled => "disabled".to_string(),
1183            SnapshotInterval::Slots(interval) => format!("{interval} slots"),
1184        },
1185    );
1186    // It is unlikely that a full snapshot interval greater than an epoch is a good idea.
1187    // Minimally we should warn the user in case this was a mistake.
1188    if let SnapshotInterval::Slots(full_snapshot_interval_slots) = full_snapshot_archive_interval {
1189        let full_snapshot_interval_slots = full_snapshot_interval_slots.get();
1190        if full_snapshot_interval_slots > DEFAULT_SLOTS_PER_EPOCH {
1191            warn!(
1192                "The full snapshot interval is excessively large: {full_snapshot_interval_slots}! \
1193                 This will negatively impact the background cleanup tasks in accounts-db. \
1194                 Consider a smaller value.",
1195            );
1196        }
1197    }
1198
1199    let snapshots_dir = if let Some(snapshots) = matches.value_of("snapshots") {
1200        Path::new(snapshots)
1201    } else {
1202        ledger_path
1203    };
1204    let snapshots_dir = create_and_canonicalize_directory(snapshots_dir).map_err(|err| {
1205        format!(
1206            "failed to create snapshots directory '{}': {err}",
1207            snapshots_dir.display(),
1208        )
1209    })?;
1210    if account_paths
1211        .iter()
1212        .any(|account_path| account_path == &snapshots_dir)
1213    {
1214        Err(
1215            "the --accounts and --snapshots paths must be unique since they both create \
1216             'snapshots' subdirectories, otherwise there may be collisions"
1217                .to_string(),
1218        )?;
1219    }
1220
1221    let bank_snapshots_dir = snapshots_dir.join(BANK_SNAPSHOTS_DIR);
1222    fs::create_dir_all(&bank_snapshots_dir).map_err(|err| {
1223        format!(
1224            "failed to create bank snapshots directory '{}': {err}",
1225            bank_snapshots_dir.display(),
1226        )
1227    })?;
1228
1229    let full_snapshot_archives_dir =
1230        if let Some(full_snapshot_archive_path) = matches.value_of("full_snapshot_archive_path") {
1231            PathBuf::from(full_snapshot_archive_path)
1232        } else {
1233            snapshots_dir.clone()
1234        };
1235    fs::create_dir_all(&full_snapshot_archives_dir).map_err(|err| {
1236        format!(
1237            "failed to create full snapshot archives directory '{}': {err}",
1238            full_snapshot_archives_dir.display(),
1239        )
1240    })?;
1241
1242    let incremental_snapshot_archives_dir = if let Some(incremental_snapshot_archive_path) =
1243        matches.value_of("incremental_snapshot_archive_path")
1244    {
1245        PathBuf::from(incremental_snapshot_archive_path)
1246    } else {
1247        snapshots_dir.clone()
1248    };
1249    fs::create_dir_all(&incremental_snapshot_archives_dir).map_err(|err| {
1250        format!(
1251            "failed to create incremental snapshot archives directory '{}': {err}",
1252            incremental_snapshot_archives_dir.display(),
1253        )
1254    })?;
1255
1256    let archive_format = {
1257        let archive_format_str = value_t_or_exit!(matches, "snapshot_archive_format", String);
1258        let mut archive_format = ArchiveFormat::from_cli_arg(&archive_format_str)
1259            .unwrap_or_else(|| panic!("Archive format not recognized: {archive_format_str}"));
1260        if let ArchiveFormat::TarZstd { config } = &mut archive_format {
1261            config.compression_level =
1262                value_t_or_exit!(matches, "snapshot_zstd_compression_level", i32);
1263        }
1264        archive_format
1265    };
1266
1267    let snapshot_version = matches
1268        .value_of("snapshot_version")
1269        .map(|value| {
1270            value
1271                .parse::<SnapshotVersion>()
1272                .map_err(|err| format!("unable to parse snapshot version: {err}"))
1273        })
1274        .transpose()?
1275        .unwrap_or(SnapshotVersion::default());
1276
1277    let maximum_full_snapshot_archives_to_retain =
1278        value_t_or_exit!(matches, "maximum_full_snapshots_to_retain", NonZeroUsize);
1279    let maximum_incremental_snapshot_archives_to_retain = value_t_or_exit!(
1280        matches,
1281        "maximum_incremental_snapshots_to_retain",
1282        NonZeroUsize
1283    );
1284
1285    let snapshot_packager_niceness_adj =
1286        value_t_or_exit!(matches, "snapshot_packager_niceness_adj", i8);
1287
1288    let snapshot_config = SnapshotConfig {
1289        usage: if full_snapshot_archive_interval == SnapshotInterval::Disabled {
1290            SnapshotUsage::LoadOnly
1291        } else {
1292            SnapshotUsage::LoadAndGenerate
1293        },
1294        full_snapshot_archive_interval,
1295        incremental_snapshot_archive_interval,
1296        bank_snapshots_dir,
1297        full_snapshot_archives_dir,
1298        incremental_snapshot_archives_dir,
1299        archive_format,
1300        snapshot_version,
1301        maximum_full_snapshot_archives_to_retain,
1302        maximum_incremental_snapshot_archives_to_retain,
1303        packager_thread_niceness_adj: snapshot_packager_niceness_adj,
1304    };
1305
1306    if !is_snapshot_config_valid(&snapshot_config) {
1307        Err(
1308            "invalid snapshot configuration provided: snapshot intervals are incompatible. full \
1309             snapshot interval MUST be larger than incremental snapshot interval (if enabled)"
1310                .to_string(),
1311        )?;
1312    }
1313
1314    Ok(snapshot_config)
1315}