agave_validator/commands/run/
execute.rs

1use {
2    crate::{
3        admin_rpc_service::{self, load_staked_nodes_overrides, StakedNodesOverrides},
4        bootstrap,
5        cli::{self},
6        ledger_lockfile, lock_ledger,
7    },
8    clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
9    crossbeam_channel::unbounded,
10    log::*,
11    rand::{seq::SliceRandom, thread_rng},
12    solana_accounts_db::{
13        accounts_db::{AccountShrinkThreshold, AccountsDb, AccountsDbConfig},
14        accounts_file::StorageAccess,
15        accounts_index::{
16            AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude,
17            AccountsIndexConfig, IndexLimitMb, ScanFilter,
18        },
19        utils::{
20            create_all_accounts_run_and_snapshot_dirs, create_and_canonicalize_directories,
21            create_and_canonicalize_directory,
22        },
23    },
24    solana_clap_utils::input_parsers::{
25        keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of,
26    },
27    solana_clock::{Slot, DEFAULT_SLOTS_PER_EPOCH},
28    solana_core::{
29        banking_trace::DISABLED_BAKING_TRACE_DIR,
30        consensus::tower_storage,
31        snapshot_packager_service::SnapshotPackagerService,
32        system_monitor_service::SystemMonitorService,
33        validator::{
34            is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod,
35            TransactionStructure, Validator, ValidatorConfig, ValidatorError,
36            ValidatorStartProgress, ValidatorTpuConfig,
37        },
38    },
39    solana_gossip::{
40        cluster_info::{Node, NodeConfig},
41        contact_info::ContactInfo,
42    },
43    solana_hash::Hash,
44    solana_keypair::Keypair,
45    solana_ledger::{
46        blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
47        blockstore_options::{
48            AccessType, BlockstoreCompressionType, BlockstoreOptions, BlockstoreRecoveryMode,
49            LedgerColumnOptions,
50        },
51        use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
52    },
53    solana_logger::redirect_stderr_to_file,
54    solana_perf::recycler::enable_recycler_warming,
55    solana_poh::poh_service,
56    solana_pubkey::Pubkey,
57    solana_rpc::{
58        rpc::{JsonRpcConfig, RpcBigtableConfig},
59        rpc_pubsub_service::PubSubConfig,
60    },
61    solana_runtime::{
62        runtime_config::RuntimeConfig,
63        snapshot_bank_utils::DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
64        snapshot_config::{SnapshotConfig, SnapshotUsage},
65        snapshot_utils::{self, ArchiveFormat, SnapshotVersion},
66    },
67    solana_send_transaction_service::send_transaction_service,
68    solana_signer::Signer,
69    solana_streamer::{
70        quic::{QuicServerParams, DEFAULT_TPU_COALESCE},
71        socket::SocketAddrSpace,
72    },
73    solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
74    solana_turbine::xdp::{set_cpu_affinity, XdpConfig},
75    std::{
76        collections::HashSet,
77        fs::{self, File},
78        net::{IpAddr, Ipv4Addr, SocketAddr},
79        num::NonZeroUsize,
80        path::{Path, PathBuf},
81        process::exit,
82        str::FromStr,
83        sync::{atomic::AtomicBool, Arc, RwLock},
84        time::Duration,
85    },
86};
87
88#[derive(Debug, PartialEq, Eq)]
89pub enum Operation {
90    Initialize,
91    Run,
92}
93
94const MILLIS_PER_SECOND: u64 = 1000;
95
96pub fn execute(
97    matches: &ArgMatches,
98    solana_version: &str,
99    socket_addr_space: SocketAddrSpace,
100    ledger_path: &Path,
101    operation: Operation,
102) -> Result<(), Box<dyn std::error::Error>> {
103    let cli::thread_args::NumThreadConfig {
104        accounts_db_clean_threads,
105        accounts_db_foreground_threads,
106        accounts_db_hash_threads,
107        accounts_index_flush_threads,
108        ip_echo_server_threads,
109        rayon_global_threads,
110        replay_forks_threads,
111        replay_transactions_threads,
112        rocksdb_compaction_threads,
113        rocksdb_flush_threads,
114        tvu_receive_threads,
115        tvu_retransmit_threads,
116        tvu_sigverify_threads,
117    } = cli::thread_args::parse_num_threads_args(matches);
118
119    let identity_keypair = keypair_of(matches, "identity").unwrap_or_else(|| {
120        clap::Error::with_description(
121            "The --identity <KEYPAIR> argument is required",
122            clap::ErrorKind::ArgumentNotFound,
123        )
124        .exit();
125    });
126
127    let logfile = {
128        let logfile = matches
129            .value_of("logfile")
130            .map(|s| s.into())
131            .unwrap_or_else(|| format!("agave-validator-{}.log", identity_keypair.pubkey()));
132
133        if logfile == "-" {
134            None
135        } else {
136            println!("log file: {logfile}");
137            Some(logfile)
138        }
139    };
140    let use_progress_bar = logfile.is_none();
141    let _logger_thread = redirect_stderr_to_file(logfile);
142
143    info!("{} {}", crate_name!(), solana_version);
144    info!("Starting validator with: {:#?}", std::env::args_os());
145
146    let cuda = matches.is_present("cuda");
147    if cuda {
148        solana_perf::perf_libs::init_cuda();
149        enable_recycler_warming();
150    }
151
152    solana_core::validator::report_target_features();
153
154    let authorized_voter_keypairs = keypairs_of(matches, "authorized_voter_keypairs")
155        .map(|keypairs| keypairs.into_iter().map(Arc::new).collect())
156        .unwrap_or_else(|| vec![Arc::new(keypair_of(matches, "identity").expect("identity"))]);
157    let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
158
159    let staked_nodes_overrides_path = matches
160        .value_of("staked_nodes_overrides")
161        .map(str::to_string);
162    let staked_nodes_overrides = Arc::new(RwLock::new(
163        match &staked_nodes_overrides_path {
164            None => StakedNodesOverrides::default(),
165            Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| {
166                error!("Failed to load stake-nodes-overrides from {}: {}", p, err);
167                clap::Error::with_description(
168                    "Failed to load configuration of stake-nodes-overrides argument",
169                    clap::ErrorKind::InvalidValue,
170                )
171                .exit()
172            }),
173        }
174        .staked_map_id,
175    ));
176
177    let init_complete_file = matches.value_of("init_complete_file");
178
179    let rpc_bootstrap_config = bootstrap::RpcBootstrapConfig {
180        no_genesis_fetch: matches.is_present("no_genesis_fetch"),
181        no_snapshot_fetch: matches.is_present("no_snapshot_fetch"),
182        check_vote_account: matches
183            .value_of("check_vote_account")
184            .map(|url| url.to_string()),
185        only_known_rpc: matches.is_present("only_known_rpc"),
186        max_genesis_archive_unpacked_size: value_t_or_exit!(
187            matches,
188            "max_genesis_archive_unpacked_size",
189            u64
190        ),
191        incremental_snapshot_fetch: !matches.is_present("no_incremental_snapshots"),
192    };
193
194    let private_rpc = matches.is_present("private_rpc");
195    let do_port_check = !matches.is_present("no_port_check");
196    let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64)
197        .map(Duration::from_millis)
198        .unwrap_or(DEFAULT_TPU_COALESCE);
199
200    // Canonicalize ledger path to avoid issues with symlink creation
201    let ledger_path = create_and_canonicalize_directory(ledger_path).map_err(|err| {
202        format!(
203            "unable to access ledger path '{}': {err}",
204            ledger_path.display(),
205        )
206    })?;
207
208    let recovery_mode = matches
209        .value_of("wal_recovery_mode")
210        .map(BlockstoreRecoveryMode::from);
211
212    let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
213        let limit_ledger_size = match matches.value_of("limit_ledger_size") {
214            Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
215            None => DEFAULT_MAX_LEDGER_SHREDS,
216        };
217        if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
218            Err(format!(
219                "The provided --limit-ledger-size value was too small, the minimum value is \
220                 {DEFAULT_MIN_MAX_LEDGER_SHREDS}"
221            ))?;
222        }
223        Some(limit_ledger_size)
224    } else {
225        None
226    };
227
228    let column_options = LedgerColumnOptions {
229        compression_type: match matches.value_of("rocksdb_ledger_compression") {
230            None => BlockstoreCompressionType::default(),
231            Some(ledger_compression_string) => match ledger_compression_string {
232                "none" => BlockstoreCompressionType::None,
233                "snappy" => BlockstoreCompressionType::Snappy,
234                "lz4" => BlockstoreCompressionType::Lz4,
235                "zlib" => BlockstoreCompressionType::Zlib,
236                _ => panic!("Unsupported ledger_compression: {ledger_compression_string}"),
237            },
238        },
239        rocks_perf_sample_interval: value_t_or_exit!(
240            matches,
241            "rocksdb_perf_sample_interval",
242            usize
243        ),
244    };
245
246    let blockstore_options = BlockstoreOptions {
247        recovery_mode,
248        column_options,
249        // The validator needs to open many files, check that the process has
250        // permission to do so in order to fail quickly and give a direct error
251        enforce_ulimit_nofile: true,
252        // The validator needs primary (read/write)
253        access_type: AccessType::Primary,
254        num_rocksdb_compaction_threads: rocksdb_compaction_threads,
255        num_rocksdb_flush_threads: rocksdb_flush_threads,
256    };
257
258    let accounts_hash_cache_path = matches
259        .value_of("accounts_hash_cache_path")
260        .map(Into::into)
261        .unwrap_or_else(|| ledger_path.join(AccountsDb::DEFAULT_ACCOUNTS_HASH_CACHE_DIR));
262    let accounts_hash_cache_path = create_and_canonicalize_directory(&accounts_hash_cache_path)
263        .map_err(|err| {
264            format!(
265                "Unable to access accounts hash cache path '{}': {err}",
266                accounts_hash_cache_path.display(),
267            )
268        })?;
269
270    let debug_keys: Option<Arc<HashSet<_>>> = if matches.is_present("debug_key") {
271        Some(Arc::new(
272            values_t_or_exit!(matches, "debug_key", Pubkey)
273                .into_iter()
274                .collect(),
275        ))
276    } else {
277        None
278    };
279
280    let known_validators = validators_set(
281        &identity_keypair.pubkey(),
282        matches,
283        "known_validators",
284        "--known-validator",
285    )?;
286    let repair_validators = validators_set(
287        &identity_keypair.pubkey(),
288        matches,
289        "repair_validators",
290        "--repair-validator",
291    )?;
292    let repair_whitelist = validators_set(
293        &identity_keypair.pubkey(),
294        matches,
295        "repair_whitelist",
296        "--repair-whitelist",
297    )?;
298    let repair_whitelist = Arc::new(RwLock::new(repair_whitelist.unwrap_or_default()));
299    let gossip_validators = validators_set(
300        &identity_keypair.pubkey(),
301        matches,
302        "gossip_validators",
303        "--gossip-validator",
304    )?;
305
306    let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap())
307        .expect("invalid bind_address");
308    let rpc_bind_address = if matches.is_present("rpc_bind_address") {
309        solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap())
310            .expect("invalid rpc_bind_address")
311    } else if private_rpc {
312        solana_net_utils::parse_host("127.0.0.1").unwrap()
313    } else {
314        bind_address
315    };
316
317    let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
318
319    let account_indexes = process_account_indexes(matches);
320
321    let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
322    let accounts_shrink_optimize_total_space =
323        value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
324    let tpu_use_quic = !matches.is_present("tpu_disable_quic");
325    let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool);
326
327    let tpu_enable_udp = if matches.is_present("tpu_enable_udp") {
328        true
329    } else {
330        DEFAULT_TPU_ENABLE_UDP
331    };
332
333    let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);
334
335    let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
336    if !(0.0..=1.0).contains(&shrink_ratio) {
337        Err(format!(
338            "the specified account-shrink-ratio is invalid, it must be between 0. and 1.0 \
339             inclusive: {shrink_ratio}"
340        ))?;
341    }
342
343    let shrink_ratio = if accounts_shrink_optimize_total_space {
344        AccountShrinkThreshold::TotalSpace { shrink_ratio }
345    } else {
346        AccountShrinkThreshold::IndividualStore { shrink_ratio }
347    };
348    let entrypoint_addrs = values_t!(matches, "entrypoint", String)
349        .unwrap_or_default()
350        .into_iter()
351        .map(|entrypoint| {
352            solana_net_utils::parse_host_port(&entrypoint)
353                .map_err(|err| format!("failed to parse entrypoint address: {err}"))
354        })
355        .collect::<Result<HashSet<_>, _>>()?
356        .into_iter()
357        .collect::<Vec<_>>();
358    for addr in &entrypoint_addrs {
359        if !socket_addr_space.check(addr) {
360            Err(format!("invalid entrypoint address: {addr}"))?;
361        }
362    }
363    // TODO: Once entrypoints are updated to return shred-version, this should
364    // abort if it fails to obtain a shred-version, so that nodes always join
365    // gossip with a valid shred-version. The code to adopt entrypoint shred
366    // version can then be deleted from gossip and get_rpc_node above.
367    let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
368        .ok()
369        .or_else(|| get_cluster_shred_version(&entrypoint_addrs, bind_address));
370
371    let tower_storage: Arc<dyn tower_storage::TowerStorage> =
372        match value_t_or_exit!(matches, "tower_storage", String).as_str() {
373            "file" => {
374                let tower_path = value_t!(matches, "tower", PathBuf)
375                    .ok()
376                    .unwrap_or_else(|| ledger_path.clone());
377
378                Arc::new(tower_storage::FileTowerStorage::new(tower_path))
379            }
380            "etcd" => {
381                let endpoints = values_t_or_exit!(matches, "etcd_endpoint", String);
382                let domain_name = value_t_or_exit!(matches, "etcd_domain_name", String);
383                let ca_certificate_file = value_t_or_exit!(matches, "etcd_cacert_file", String);
384                let identity_certificate_file = value_t_or_exit!(matches, "etcd_cert_file", String);
385                let identity_private_key_file = value_t_or_exit!(matches, "etcd_key_file", String);
386
387                let read =
388                    |file| fs::read(&file).map_err(|err| format!("unable to read {file}: {err}"));
389
390                let tls_config = tower_storage::EtcdTlsConfig {
391                    domain_name,
392                    ca_certificate: read(ca_certificate_file)?,
393                    identity_certificate: read(identity_certificate_file)?,
394                    identity_private_key: read(identity_private_key_file)?,
395                };
396
397                Arc::new(
398                    tower_storage::EtcdTowerStorage::new(endpoints, Some(tls_config))
399                        .map_err(|err| format!("failed to connect to etcd: {err}"))?,
400                )
401            }
402            _ => unreachable!(),
403        };
404
405    let mut accounts_index_config = AccountsIndexConfig {
406        num_flush_threads: Some(accounts_index_flush_threads),
407        ..AccountsIndexConfig::default()
408    };
409    if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
410        accounts_index_config.bins = Some(bins);
411    }
412
413    accounts_index_config.index_limit_mb = if matches.is_present("disable_accounts_disk_index") {
414        IndexLimitMb::InMemOnly
415    } else {
416        IndexLimitMb::Minimal
417    };
418
419    {
420        let mut accounts_index_paths: Vec<PathBuf> = if matches.is_present("accounts_index_path") {
421            values_t_or_exit!(matches, "accounts_index_path", String)
422                .into_iter()
423                .map(PathBuf::from)
424                .collect()
425        } else {
426            vec![]
427        };
428        if accounts_index_paths.is_empty() {
429            accounts_index_paths = vec![ledger_path.join("accounts_index")];
430        }
431        accounts_index_config.drives = Some(accounts_index_paths);
432    }
433
434    const MB: usize = 1_024 * 1_024;
435    accounts_index_config.scan_results_limit_bytes =
436        value_t!(matches, "accounts_index_scan_results_limit_mb", usize)
437            .ok()
438            .map(|mb| mb * MB);
439
440    let account_shrink_paths: Option<Vec<PathBuf>> =
441        values_t!(matches, "account_shrink_path", String)
442            .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
443            .ok();
444    let account_shrink_paths = account_shrink_paths
445        .as_ref()
446        .map(|paths| {
447            create_and_canonicalize_directories(paths)
448                .map_err(|err| format!("unable to access account shrink path: {err}"))
449        })
450        .transpose()?;
451
452    let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
453        .map(|paths| {
454            create_all_accounts_run_and_snapshot_dirs(&paths)
455                .map_err(|err| format!("unable to create account subdirectories: {err}"))
456        })
457        .transpose()?
458        .unzip();
459
460    let read_cache_limit_bytes = values_of::<usize>(matches, "accounts_db_read_cache_limit_mb")
461        .map(|limits| {
462            match limits.len() {
463                // we were given explicit low and high watermark values, so use them
464                2 => (limits[0] * MB, limits[1] * MB),
465                // we were given a single value, so use it for both low and high watermarks
466                1 => (limits[0] * MB, limits[0] * MB),
467                _ => {
468                    // clap will enforce either one or two values is given
469                    unreachable!(
470                        "invalid number of values given to accounts-db-read-cache-limit-mb"
471                    )
472                }
473            }
474        });
475    let storage_access = matches
476        .value_of("accounts_db_access_storages_method")
477        .map(|method| match method {
478            "mmap" => StorageAccess::Mmap,
479            "file" => StorageAccess::File,
480            _ => {
481                // clap will enforce one of the above values is given
482                unreachable!("invalid value given to accounts-db-access-storages-method")
483            }
484        })
485        .unwrap_or_default();
486
487    let scan_filter_for_shrinking = matches
488        .value_of("accounts_db_scan_filter_for_shrinking")
489        .map(|filter| match filter {
490            "all" => ScanFilter::All,
491            "only-abnormal" => ScanFilter::OnlyAbnormal,
492            "only-abnormal-with-verify" => ScanFilter::OnlyAbnormalWithVerify,
493            _ => {
494                // clap will enforce one of the above values is given
495                unreachable!("invalid value given to accounts_db_scan_filter_for_shrinking")
496            }
497        })
498        .unwrap_or_default();
499
500    let accounts_db_config = AccountsDbConfig {
501        index: Some(accounts_index_config),
502        account_indexes: Some(account_indexes.clone()),
503        base_working_path: Some(ledger_path.clone()),
504        accounts_hash_cache_path: Some(accounts_hash_cache_path),
505        shrink_paths: account_shrink_run_paths,
506        shrink_ratio,
507        read_cache_limit_bytes,
508        write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
509            .ok()
510            .map(|mb| mb * MB as u64),
511        ancient_append_vec_offset: value_t!(matches, "accounts_db_ancient_append_vecs", i64).ok(),
512        ancient_storage_ideal_size: value_t!(
513            matches,
514            "accounts_db_ancient_storage_ideal_size",
515            u64
516        )
517        .ok(),
518        max_ancient_storages: value_t!(matches, "accounts_db_max_ancient_storages", usize).ok(),
519        hash_calculation_pubkey_bins: value_t!(
520            matches,
521            "accounts_db_hash_calculation_pubkey_bins",
522            usize
523        )
524        .ok(),
525        exhaustively_verify_refcounts: matches.is_present("accounts_db_verify_refcounts"),
526        test_skip_rewrites_but_include_in_bank_hash: false,
527        storage_access,
528        scan_filter_for_shrinking,
529        enable_experimental_accumulator_hash: !matches
530            .is_present("no_accounts_db_experimental_accumulator_hash"),
531        verify_experimental_accumulator_hash: matches
532            .is_present("accounts_db_verify_experimental_accumulator_hash"),
533        snapshots_use_experimental_accumulator_hash: matches
534            .is_present("accounts_db_snapshots_use_experimental_accumulator_hash"),
535        num_clean_threads: Some(accounts_db_clean_threads),
536        num_foreground_threads: Some(accounts_db_foreground_threads),
537        num_hash_threads: Some(accounts_db_hash_threads),
538        ..AccountsDbConfig::default()
539    };
540
541    let accounts_db_config = Some(accounts_db_config);
542
543    let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
544        Some(
545            values_t_or_exit!(matches, "geyser_plugin_config", String)
546                .into_iter()
547                .map(PathBuf::from)
548                .collect(),
549        )
550    } else {
551        None
552    };
553    let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
554        || matches.is_present("geyser_plugin_always_enabled");
555
556    let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
557        || matches.is_present("enable_bigtable_ledger_upload")
558    {
559        Some(RpcBigtableConfig {
560            enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"),
561            bigtable_instance_name: value_t_or_exit!(matches, "rpc_bigtable_instance_name", String),
562            bigtable_app_profile_id: value_t_or_exit!(
563                matches,
564                "rpc_bigtable_app_profile_id",
565                String
566            ),
567            timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
568                .ok()
569                .map(Duration::from_secs),
570            max_message_size: value_t_or_exit!(matches, "rpc_bigtable_max_message_size", usize),
571        })
572    } else {
573        None
574    };
575
576    let rpc_send_retry_rate_ms = value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64);
577    let rpc_send_batch_size = value_t_or_exit!(matches, "rpc_send_transaction_batch_size", usize);
578    let rpc_send_batch_send_rate_ms =
579        value_t_or_exit!(matches, "rpc_send_transaction_batch_ms", u64);
580
581    if rpc_send_batch_send_rate_ms > rpc_send_retry_rate_ms {
582        Err(format!(
583            "the specified rpc-send-batch-ms ({rpc_send_batch_send_rate_ms}) is invalid, it must \
584             be <= rpc-send-retry-ms ({rpc_send_retry_rate_ms})"
585        ))?;
586    }
587
588    let tps = rpc_send_batch_size as u64 * MILLIS_PER_SECOND / rpc_send_batch_send_rate_ms;
589    if tps > send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND {
590        Err(format!(
591            "either the specified rpc-send-batch-size ({}) or rpc-send-batch-ms ({}) is invalid, \
592             'rpc-send-batch-size * 1000 / rpc-send-batch-ms' must be smaller than ({}) .",
593            rpc_send_batch_size,
594            rpc_send_batch_send_rate_ms,
595            send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND
596        ))?;
597    }
598    let rpc_send_transaction_tpu_peers = matches
599        .values_of("rpc_send_transaction_tpu_peer")
600        .map(|values| {
601            values
602                .map(solana_net_utils::parse_host_port)
603                .collect::<Result<Vec<SocketAddr>, String>>()
604        })
605        .transpose()
606        .map_err(|err| {
607            format!("failed to parse rpc send-transaction-service tpu peer address: {err}")
608        })?;
609    let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
610    let leader_forward_count =
611        if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
612            // rpc-sts is configured to send only to specific tpu peers. disable leader forwards
613            0
614        } else {
615            value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
616        };
617
618    let full_api = matches.is_present("full_rpc_api");
619
620    let xdp_interface = matches.value_of("retransmit_xdp_interface");
621    let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
622    let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
623        XdpConfig::new(
624            xdp_interface,
625            parse_cpu_ranges(cpus).unwrap(),
626            xdp_zero_copy,
627        )
628    });
629
630    let mut validator_config = ValidatorConfig {
631        require_tower: matches.is_present("require_tower"),
632        tower_storage,
633        halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
634        expected_genesis_hash: matches
635            .value_of("expected_genesis_hash")
636            .map(|s| Hash::from_str(s).unwrap()),
637        expected_bank_hash: matches
638            .value_of("expected_bank_hash")
639            .map(|s| Hash::from_str(s).unwrap()),
640        expected_shred_version,
641        new_hard_forks: hardforks_of(matches, "hard_forks"),
642        rpc_config: JsonRpcConfig {
643            enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
644            enable_extended_tx_metadata_storage: matches.is_present("enable_cpi_and_log_storage")
645                || matches.is_present("enable_extended_tx_metadata_storage"),
646            rpc_bigtable_config,
647            faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
648                solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
649            }),
650            full_api,
651            max_multiple_accounts: Some(value_t_or_exit!(
652                matches,
653                "rpc_max_multiple_accounts",
654                usize
655            )),
656            health_check_slot_distance: value_t_or_exit!(
657                matches,
658                "health_check_slot_distance",
659                u64
660            ),
661            disable_health_check: false,
662            rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize),
663            rpc_blocking_threads: value_t_or_exit!(matches, "rpc_blocking_threads", usize),
664            rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8),
665            account_indexes: account_indexes.clone(),
666            rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),
667            max_request_body_size: Some(value_t_or_exit!(
668                matches,
669                "rpc_max_request_body_size",
670                usize
671            )),
672            skip_preflight_health_check: matches.is_present("skip_preflight_health_check"),
673        },
674        on_start_geyser_plugin_config_files,
675        geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
676        rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
677            (
678                SocketAddr::new(rpc_bind_address, rpc_port),
679                SocketAddr::new(rpc_bind_address, rpc_port + 1),
680                // If additional ports are added, +2 needs to be skipped to avoid a conflict with
681                // the websocket port (which is +2) in web3.js This odd port shifting is tracked at
682                // https://github.com/solana-labs/solana/issues/12250
683            )
684        }),
685        pubsub_config: PubSubConfig {
686            enable_block_subscription: matches.is_present("rpc_pubsub_enable_block_subscription"),
687            enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"),
688            max_active_subscriptions: value_t_or_exit!(
689                matches,
690                "rpc_pubsub_max_active_subscriptions",
691                usize
692            ),
693            queue_capacity_items: value_t_or_exit!(
694                matches,
695                "rpc_pubsub_queue_capacity_items",
696                usize
697            ),
698            queue_capacity_bytes: value_t_or_exit!(
699                matches,
700                "rpc_pubsub_queue_capacity_bytes",
701                usize
702            ),
703            worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize),
704            notification_threads: value_t!(matches, "rpc_pubsub_notification_threads", usize)
705                .ok()
706                .and_then(NonZeroUsize::new),
707        },
708        voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
709        wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
710        known_validators,
711        repair_validators,
712        repair_whitelist,
713        gossip_validators,
714        max_ledger_shreds,
715        blockstore_options,
716        run_verification: !(matches.is_present("skip_poh_verify")
717            || matches.is_present("skip_startup_ledger_verification")),
718        debug_keys,
719        contact_debug_interval,
720        send_transaction_service_config: send_transaction_service::Config {
721            retry_rate_ms: rpc_send_retry_rate_ms,
722            leader_forward_count,
723            default_max_retries: value_t!(
724                matches,
725                "rpc_send_transaction_default_max_retries",
726                usize
727            )
728            .ok(),
729            service_max_retries: value_t_or_exit!(
730                matches,
731                "rpc_send_transaction_service_max_retries",
732                usize
733            ),
734            batch_send_rate_ms: rpc_send_batch_send_rate_ms,
735            batch_size: rpc_send_batch_size,
736            retry_pool_max_size: value_t_or_exit!(
737                matches,
738                "rpc_send_transaction_retry_pool_max_size",
739                usize
740            ),
741            tpu_peers: rpc_send_transaction_tpu_peers,
742        },
743        no_poh_speed_test: matches.is_present("no_poh_speed_test"),
744        no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
745        no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
746        no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
747        no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
748        poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core")
749            .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
750        poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch")
751            .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
752        process_ledger_before_services: matches.is_present("process_ledger_before_services"),
753        accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"),
754        accounts_db_config,
755        accounts_db_skip_shrink: true,
756        accounts_db_force_initial_clean: matches.is_present("no_skip_initial_accounts_db_clean"),
757        tpu_coalesce,
758        no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
759        runtime_config: RuntimeConfig {
760            log_messages_bytes_limit: value_of(matches, "log_messages_bytes_limit"),
761            ..RuntimeConfig::default()
762        },
763        staked_nodes_overrides: staked_nodes_overrides.clone(),
764        use_snapshot_archives_at_startup: value_t_or_exit!(
765            matches,
766            use_snapshot_archives_at_startup::cli::NAME,
767            UseSnapshotArchivesAtStartup
768        ),
769        ip_echo_server_threads,
770        rayon_global_threads,
771        replay_forks_threads,
772        replay_transactions_threads,
773        tvu_shred_sigverify_threads: tvu_sigverify_threads,
774        delay_leader_block_for_pending_fork: matches
775            .is_present("delay_leader_block_for_pending_fork"),
776        wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),
777        wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(),
778        retransmit_xdp,
779        use_tpu_client_next: !matches.is_present("use_connection_cache"),
780        ..ValidatorConfig::default()
781    };
782
783    let reserved = validator_config
784        .retransmit_xdp
785        .as_ref()
786        .map(|xdp| xdp.cpus.clone())
787        .unwrap_or_default()
788        .iter()
789        .cloned()
790        .collect::<HashSet<_>>();
791    if !reserved.is_empty() {
792        let available = core_affinity::get_core_ids()
793            .unwrap_or_default()
794            .into_iter()
795            .map(|core_id| core_id.id)
796            .collect::<HashSet<_>>();
797        let available = available.difference(&reserved);
798        set_cpu_affinity(available.into_iter().copied()).unwrap();
799    }
800
801    let vote_account = pubkey_of(matches, "vote_account").unwrap_or_else(|| {
802        if !validator_config.voting_disabled {
803            warn!("--vote-account not specified, validator will not vote");
804            validator_config.voting_disabled = true;
805        }
806        Keypair::new().pubkey()
807    });
808
809    let dynamic_port_range =
810        solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
811            .expect("invalid dynamic_port_range");
812
813    let account_paths: Vec<PathBuf> =
814        if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
815            account_paths
816                .join(",")
817                .split(',')
818                .map(PathBuf::from)
819                .collect()
820        } else {
821            vec![ledger_path.join("accounts")]
822        };
823    let account_paths = create_and_canonicalize_directories(account_paths)
824        .map_err(|err| format!("unable to access account path: {err}"))?;
825
826    let (account_run_paths, account_snapshot_paths) =
827        create_all_accounts_run_and_snapshot_dirs(&account_paths)
828            .map_err(|err| format!("unable to create account directories: {err}"))?;
829
830    // From now on, use run/ paths in the same way as the previous account_paths.
831    validator_config.account_paths = account_run_paths;
832
833    // These snapshot paths are only used for initial clean up, add in shrink paths if they exist.
834    validator_config.account_snapshot_paths =
835        if let Some(account_shrink_snapshot_paths) = account_shrink_snapshot_paths {
836            account_snapshot_paths
837                .into_iter()
838                .chain(account_shrink_snapshot_paths)
839                .collect()
840        } else {
841            account_snapshot_paths
842        };
843
844    let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
845    let maximum_full_snapshot_archives_to_retain =
846        value_t_or_exit!(matches, "maximum_full_snapshots_to_retain", NonZeroUsize);
847    let maximum_incremental_snapshot_archives_to_retain = value_t_or_exit!(
848        matches,
849        "maximum_incremental_snapshots_to_retain",
850        NonZeroUsize
851    );
852    let snapshot_packager_niceness_adj =
853        value_t_or_exit!(matches, "snapshot_packager_niceness_adj", i8);
854    let minimal_snapshot_download_speed =
855        value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32);
856    let maximum_snapshot_download_abort =
857        value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64);
858
859    let snapshots_dir = if let Some(snapshots) = matches.value_of("snapshots") {
860        Path::new(snapshots)
861    } else {
862        &ledger_path
863    };
864    let snapshots_dir = create_and_canonicalize_directory(snapshots_dir).map_err(|err| {
865        format!(
866            "failed to create snapshots directory '{}': {err}",
867            snapshots_dir.display(),
868        )
869    })?;
870
871    if account_paths
872        .iter()
873        .any(|account_path| account_path == &snapshots_dir)
874    {
875        Err(
876            "the --accounts and --snapshots paths must be unique since they \
877             both create 'snapshots' subdirectories, otherwise there may be collisions"
878                .to_string(),
879        )?;
880    }
881
882    let bank_snapshots_dir = snapshots_dir.join("snapshots");
883    fs::create_dir_all(&bank_snapshots_dir).map_err(|err| {
884        format!(
885            "failed to create bank snapshots directory '{}': {err}",
886            bank_snapshots_dir.display(),
887        )
888    })?;
889
890    let full_snapshot_archives_dir =
891        if let Some(full_snapshot_archive_path) = matches.value_of("full_snapshot_archive_path") {
892            PathBuf::from(full_snapshot_archive_path)
893        } else {
894            snapshots_dir.clone()
895        };
896    fs::create_dir_all(&full_snapshot_archives_dir).map_err(|err| {
897        format!(
898            "failed to create full snapshot archives directory '{}': {err}",
899            full_snapshot_archives_dir.display(),
900        )
901    })?;
902
903    let incremental_snapshot_archives_dir = if let Some(incremental_snapshot_archive_path) =
904        matches.value_of("incremental_snapshot_archive_path")
905    {
906        PathBuf::from(incremental_snapshot_archive_path)
907    } else {
908        snapshots_dir.clone()
909    };
910    fs::create_dir_all(&incremental_snapshot_archives_dir).map_err(|err| {
911        format!(
912            "failed to create incremental snapshot archives directory '{}': {err}",
913            incremental_snapshot_archives_dir.display(),
914        )
915    })?;
916
917    let archive_format = {
918        let archive_format_str = value_t_or_exit!(matches, "snapshot_archive_format", String);
919        let mut archive_format = ArchiveFormat::from_cli_arg(&archive_format_str)
920            .unwrap_or_else(|| panic!("Archive format not recognized: {archive_format_str}"));
921        if let ArchiveFormat::TarZstd { config } = &mut archive_format {
922            config.compression_level =
923                value_t_or_exit!(matches, "snapshot_zstd_compression_level", i32);
924        }
925        archive_format
926    };
927
928    let snapshot_version = matches
929        .value_of("snapshot_version")
930        .map(|value| {
931            value
932                .parse::<SnapshotVersion>()
933                .map_err(|err| format!("unable to parse snapshot version: {err}"))
934        })
935        .transpose()?
936        .unwrap_or(SnapshotVersion::default());
937
938    let (full_snapshot_archive_interval_slots, incremental_snapshot_archive_interval_slots) =
939        if matches.is_present("no_snapshots") {
940            // snapshots are disabled
941            (
942                DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
943                DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
944            )
945        } else {
946            match (
947                !matches.is_present("no_incremental_snapshots"),
948                value_t_or_exit!(matches, "snapshot_interval_slots", u64),
949            ) {
950                (_, 0) => {
951                    // snapshots are disabled
952                    warn!(
953                        "Snapshot generation was disabled with `--snapshot-interval-slots 0`, \
954                         which is now deprecated. Use `--no-snapshots` instead.",
955                    );
956                    (
957                        DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
958                        DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
959                    )
960                }
961                (true, incremental_snapshot_interval_slots) => {
962                    // incremental snapshots are enabled
963                    // use --snapshot-interval-slots for the incremental snapshot interval
964                    (
965                        value_t_or_exit!(matches, "full_snapshot_interval_slots", u64),
966                        incremental_snapshot_interval_slots,
967                    )
968                }
969                (false, full_snapshot_interval_slots) => {
970                    // incremental snapshots are *disabled*
971                    // use --snapshot-interval-slots for the *full* snapshot interval
972                    // also warn if --full-snapshot-interval-slots was specified
973                    if matches.occurrences_of("full_snapshot_interval_slots") > 0 {
974                        warn!(
975                            "Incremental snapshots are disabled, yet --full-snapshot-interval-slots was specified! \
976                             Note that --full-snapshot-interval-slots is *ignored* when incremental snapshots are disabled. \
977                             Use --snapshot-interval-slots instead.",
978                        );
979                    }
980                    (
981                        full_snapshot_interval_slots,
982                        DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
983                    )
984                }
985            }
986        };
987
988    validator_config.snapshot_config = SnapshotConfig {
989        usage: if full_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
990            SnapshotUsage::LoadOnly
991        } else {
992            SnapshotUsage::LoadAndGenerate
993        },
994        full_snapshot_archive_interval_slots,
995        incremental_snapshot_archive_interval_slots,
996        bank_snapshots_dir,
997        full_snapshot_archives_dir: full_snapshot_archives_dir.clone(),
998        incremental_snapshot_archives_dir: incremental_snapshot_archives_dir.clone(),
999        archive_format,
1000        snapshot_version,
1001        maximum_full_snapshot_archives_to_retain,
1002        maximum_incremental_snapshot_archives_to_retain,
1003        packager_thread_niceness_adj: snapshot_packager_niceness_adj,
1004    };
1005
1006    info!(
1007        "Snapshot configuration: full snapshot interval: {}, incremental snapshot interval: {}",
1008        if full_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
1009            "disabled".to_string()
1010        } else {
1011            format!("{full_snapshot_archive_interval_slots} slots")
1012        },
1013        if incremental_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
1014            "disabled".to_string()
1015        } else {
1016            format!("{incremental_snapshot_archive_interval_slots} slots")
1017        },
1018    );
1019
1020    // It is unlikely that a full snapshot interval greater than an epoch is a good idea.
1021    // Minimally we should warn the user in case this was a mistake.
1022    if full_snapshot_archive_interval_slots > DEFAULT_SLOTS_PER_EPOCH
1023        && full_snapshot_archive_interval_slots != DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
1024    {
1025        warn!(
1026            "The full snapshot interval is excessively large: {}! This will negatively \
1027            impact the background cleanup tasks in accounts-db. Consider a smaller value.",
1028            full_snapshot_archive_interval_slots,
1029        );
1030    }
1031
1032    if !is_snapshot_config_valid(&validator_config.snapshot_config) {
1033        Err(
1034            "invalid snapshot configuration provided: snapshot intervals are incompatible. \
1035             \n\t- full snapshot interval MUST be larger than incremental snapshot interval \
1036             (if enabled)"
1037                .to_string(),
1038        )?;
1039    }
1040
1041    configure_banking_trace_dir_byte_limit(&mut validator_config, matches);
1042    validator_config.block_verification_method = value_t_or_exit!(
1043        matches,
1044        "block_verification_method",
1045        BlockVerificationMethod
1046    );
1047    match validator_config.block_verification_method {
1048        BlockVerificationMethod::BlockstoreProcessor => {
1049            warn!(
1050                "The value \"blockstore-processor\" for --block-verification-method has been \
1051                deprecated. The value \"blockstore-processor\" is still allowed for now, but \
1052                is planned for removal in the near future. To update, either set the value \
1053                \"unified-scheduler\" or remove the --block-verification-method argument"
1054            );
1055        }
1056        BlockVerificationMethod::UnifiedScheduler => {}
1057    }
1058    validator_config.block_production_method = value_t_or_exit!(
1059        matches, // comment to align formatting...
1060        "block_production_method",
1061        BlockProductionMethod
1062    );
1063    validator_config.transaction_struct = value_t_or_exit!(
1064        matches, // comment to align formatting...
1065        "transaction_struct",
1066        TransactionStructure
1067    );
1068    validator_config.enable_block_production_forwarding = staked_nodes_overrides_path.is_some();
1069    validator_config.unified_scheduler_handler_threads =
1070        value_t!(matches, "unified_scheduler_handler_threads", usize).ok();
1071
1072    let public_rpc_addr = matches
1073        .value_of("public_rpc_addr")
1074        .map(|addr| {
1075            solana_net_utils::parse_host_port(addr)
1076                .map_err(|err| format!("failed to parse public rpc address: {err}"))
1077        })
1078        .transpose()?;
1079
1080    if !matches.is_present("no_os_network_limits_test") {
1081        if SystemMonitorService::check_os_network_limits() {
1082            info!("OS network limits test passed.");
1083        } else {
1084            Err("OS network limit test failed. See \
1085                https://docs.solanalabs.com/operations/guides/validator-start#system-tuning"
1086                .to_string())?;
1087        }
1088    }
1089
1090    let validator_exit_backpressure = [(
1091        SnapshotPackagerService::NAME.to_string(),
1092        Arc::new(AtomicBool::new(false)),
1093    )]
1094    .into();
1095    validator_config.validator_exit_backpressure = validator_exit_backpressure;
1096
1097    let mut ledger_lock = ledger_lockfile(&ledger_path);
1098    let _ledger_write_guard = lock_ledger(&ledger_path, &mut ledger_lock);
1099
1100    let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1101    let admin_service_post_init = Arc::new(RwLock::new(None));
1102    let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
1103        if starting_with_geyser_plugins {
1104            let (sender, receiver) = unbounded();
1105            (Some(sender), Some(receiver))
1106        } else {
1107            (None, None)
1108        };
1109    admin_rpc_service::run(
1110        &ledger_path,
1111        admin_rpc_service::AdminRpcRequestMetadata {
1112            rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1113            start_time: std::time::SystemTime::now(),
1114            validator_exit: validator_config.validator_exit.clone(),
1115            validator_exit_backpressure: validator_config.validator_exit_backpressure.clone(),
1116            start_progress: start_progress.clone(),
1117            authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1118            post_init: admin_service_post_init.clone(),
1119            tower_storage: validator_config.tower_storage.clone(),
1120            staked_nodes_overrides,
1121            rpc_to_plugin_manager_sender,
1122        },
1123    );
1124
1125    let gossip_host: IpAddr = matches
1126        .value_of("gossip_host")
1127        .map(|gossip_host| {
1128            solana_net_utils::parse_host(gossip_host)
1129                .map_err(|err| format!("failed to parse --gossip-host: {err}"))
1130        })
1131        .transpose()?
1132        .or_else(|| {
1133            if !entrypoint_addrs.is_empty() {
1134                let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
1135                order.shuffle(&mut thread_rng());
1136                // Return once we determine our IP from an entrypoint
1137                order.into_iter().find_map(|i| {
1138                    let entrypoint_addr = &entrypoint_addrs[i];
1139                    info!(
1140                        "Contacting {} to determine the validator's public IP address",
1141                        entrypoint_addr
1142                    );
1143                    solana_net_utils::get_public_ip_addr_with_binding(entrypoint_addr, bind_address)
1144                        .map_or_else(
1145                            |err| {
1146                                warn!(
1147                                    "Failed to contact cluster entrypoint {entrypoint_addr}: {err}"
1148                                );
1149                                None
1150                            },
1151                            Some,
1152                        )
1153                })
1154            } else {
1155                Some(IpAddr::V4(Ipv4Addr::LOCALHOST))
1156            }
1157        })
1158        .ok_or_else(|| "unable to determine the validator's public IP address".to_string())?;
1159    let gossip_port = value_t!(matches, "gossip_port", u16).or_else(|_| {
1160        solana_net_utils::find_available_port_in_range(bind_address, (0, 1))
1161            .map_err(|err| format!("unable to find an available gossip port: {err}"))
1162    })?;
1163    let gossip_addr = SocketAddr::new(gossip_host, gossip_port);
1164
1165    let public_tpu_addr = matches
1166        .value_of("public_tpu_addr")
1167        .map(|public_tpu_addr| {
1168            solana_net_utils::parse_host_port(public_tpu_addr)
1169                .map_err(|err| format!("failed to parse --public-tpu-address: {err}"))
1170        })
1171        .transpose()?;
1172
1173    let public_tpu_forwards_addr = matches
1174        .value_of("public_tpu_forwards_addr")
1175        .map(|public_tpu_forwards_addr| {
1176            solana_net_utils::parse_host_port(public_tpu_forwards_addr)
1177                .map_err(|err| format!("failed to parse --public-tpu-forwards-address: {err}"))
1178        })
1179        .transpose()?;
1180
1181    let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
1182
1183    let tpu_max_connections_per_peer =
1184        value_t_or_exit!(matches, "tpu_max_connections_per_peer", u64);
1185    let tpu_max_staked_connections = value_t_or_exit!(matches, "tpu_max_staked_connections", u64);
1186    let tpu_max_unstaked_connections =
1187        value_t_or_exit!(matches, "tpu_max_unstaked_connections", u64);
1188
1189    let tpu_max_fwd_staked_connections =
1190        value_t_or_exit!(matches, "tpu_max_fwd_staked_connections", u64);
1191    let tpu_max_fwd_unstaked_connections =
1192        value_t_or_exit!(matches, "tpu_max_fwd_unstaked_connections", u64);
1193
1194    let tpu_max_connections_per_ipaddr_per_minute: u64 =
1195        value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64);
1196    let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64);
1197
1198    let node_config = NodeConfig {
1199        gossip_addr,
1200        port_range: dynamic_port_range,
1201        bind_ip_addr: bind_address,
1202        public_tpu_addr,
1203        public_tpu_forwards_addr,
1204        num_tvu_receive_sockets: tvu_receive_threads,
1205        num_tvu_retransmit_sockets: tvu_retransmit_threads,
1206        num_quic_endpoints,
1207    };
1208
1209    let cluster_entrypoints = entrypoint_addrs
1210        .iter()
1211        .map(ContactInfo::new_gossip_entry_point)
1212        .collect::<Vec<_>>();
1213
1214    let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), node_config);
1215
1216    if restricted_repair_only_mode {
1217        if validator_config.wen_restart_proto_path.is_some() {
1218            Err("--restricted-repair-only-mode is not compatible with --wen_restart".to_string())?;
1219        }
1220
1221        // When in --restricted_repair_only_mode is enabled only the gossip and repair ports
1222        // need to be reachable by the entrypoint to respond to gossip pull requests and repair
1223        // requests initiated by the node.  All other ports are unused.
1224        node.info.remove_tpu();
1225        node.info.remove_tpu_forwards();
1226        node.info.remove_tvu();
1227        node.info.remove_serve_repair();
1228
1229        // A node in this configuration shouldn't be an entrypoint to other nodes
1230        node.sockets.ip_echo = None;
1231    }
1232
1233    if !private_rpc {
1234        macro_rules! set_socket {
1235            ($method:ident, $addr:expr, $name:literal) => {
1236                node.info.$method($addr).expect(&format!(
1237                    "Operator must spin up node with valid {} address",
1238                    $name
1239                ))
1240            };
1241        }
1242        if let Some(public_rpc_addr) = public_rpc_addr {
1243            set_socket!(set_rpc, public_rpc_addr, "RPC");
1244            set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub");
1245        } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
1246            let addr = node
1247                .info
1248                .gossip()
1249                .expect("Operator must spin up node with valid gossip address")
1250                .ip();
1251            set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC");
1252            set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub");
1253        }
1254    }
1255
1256    solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
1257    solana_metrics::set_panic_hook("validator", Some(String::from(solana_version)));
1258    solana_entry::entry::init_poh();
1259    snapshot_utils::remove_tmp_snapshot_archives(&full_snapshot_archives_dir);
1260    snapshot_utils::remove_tmp_snapshot_archives(&incremental_snapshot_archives_dir);
1261
1262    let identity_keypair = Arc::new(identity_keypair);
1263
1264    let should_check_duplicate_instance = true;
1265    if !cluster_entrypoints.is_empty() {
1266        bootstrap::rpc_bootstrap(
1267            &node,
1268            &identity_keypair,
1269            &ledger_path,
1270            &full_snapshot_archives_dir,
1271            &incremental_snapshot_archives_dir,
1272            &vote_account,
1273            authorized_voter_keypairs.clone(),
1274            &cluster_entrypoints,
1275            &mut validator_config,
1276            rpc_bootstrap_config,
1277            do_port_check,
1278            use_progress_bar,
1279            maximum_local_snapshot_age,
1280            should_check_duplicate_instance,
1281            &start_progress,
1282            minimal_snapshot_download_speed,
1283            maximum_snapshot_download_abort,
1284            socket_addr_space,
1285        );
1286        *start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
1287    }
1288
1289    if operation == Operation::Initialize {
1290        info!("Validator ledger initialization complete");
1291        return Ok(());
1292    }
1293
1294    // Bootstrap code above pushes a contact-info with more recent timestamp to
1295    // gossip. If the node is staked the contact-info lingers in gossip causing
1296    // false duplicate nodes error.
1297    // Below line refreshes the timestamp on contact-info so that it overrides
1298    // the one pushed by bootstrap.
1299    node.info.hot_swap_pubkey(identity_keypair.pubkey());
1300
1301    let tpu_quic_server_config = QuicServerParams {
1302        max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
1303        max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
1304        max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
1305        max_streams_per_ms,
1306        max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1307        coalesce: tpu_coalesce,
1308        ..Default::default()
1309    };
1310
1311    let tpu_fwd_quic_server_config = QuicServerParams {
1312        max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
1313        max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
1314        max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
1315        max_streams_per_ms,
1316        max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1317        coalesce: tpu_coalesce,
1318        ..Default::default()
1319    };
1320
1321    // Vote shares TPU forward's characteristics, except that we accept 1 connection
1322    // per peer and no unstaked connections are accepted.
1323    let mut vote_quic_server_config = tpu_fwd_quic_server_config.clone();
1324    vote_quic_server_config.max_connections_per_peer = 1;
1325    vote_quic_server_config.max_unstaked_connections = 0;
1326
1327    let validator = match Validator::new(
1328        node,
1329        identity_keypair,
1330        &ledger_path,
1331        &vote_account,
1332        authorized_voter_keypairs,
1333        cluster_entrypoints,
1334        &validator_config,
1335        should_check_duplicate_instance,
1336        rpc_to_plugin_manager_receiver,
1337        start_progress,
1338        socket_addr_space,
1339        ValidatorTpuConfig {
1340            use_quic: tpu_use_quic,
1341            vote_use_quic,
1342            tpu_connection_pool_size,
1343            tpu_enable_udp,
1344            tpu_quic_server_config,
1345            tpu_fwd_quic_server_config,
1346            vote_quic_server_config,
1347        },
1348        admin_service_post_init,
1349    ) {
1350        Ok(validator) => Ok(validator),
1351        Err(err) => {
1352            if matches!(
1353                err.downcast_ref(),
1354                Some(&ValidatorError::WenRestartFinished)
1355            ) {
1356                // 200 is a special error code, see
1357                // https://github.com/solana-foundation/solana-improvement-documents/pull/46
1358                error!("Please remove --wen_restart and use --wait_for_supermajority as instructed above");
1359                exit(200);
1360            }
1361            Err(format!("{err:?}"))
1362        }
1363    }?;
1364
1365    if let Some(filename) = init_complete_file {
1366        File::create(filename).map_err(|err| format!("unable to create {filename}: {err}"))?;
1367    }
1368    info!("Validator initialized");
1369    validator.join();
1370    info!("Validator exiting..");
1371
1372    Ok(())
1373}
1374
1375// This function is duplicated in ledger-tool/src/main.rs...
1376fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
1377    if matches.is_present(name) {
1378        Some(values_t_or_exit!(matches, name, Slot))
1379    } else {
1380        None
1381    }
1382}
1383
1384fn validators_set(
1385    identity_pubkey: &Pubkey,
1386    matches: &ArgMatches<'_>,
1387    matches_name: &str,
1388    arg_name: &str,
1389) -> Result<Option<HashSet<Pubkey>>, String> {
1390    if matches.is_present(matches_name) {
1391        let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
1392            .into_iter()
1393            .collect();
1394        if validators_set.contains(identity_pubkey) {
1395            Err(format!(
1396                "the validator's identity pubkey cannot be a {arg_name}: {identity_pubkey}"
1397            ))?;
1398        }
1399        Ok(Some(validators_set))
1400    } else {
1401        Ok(None)
1402    }
1403}
1404
1405fn get_cluster_shred_version(entrypoints: &[SocketAddr], bind_address: IpAddr) -> Option<u16> {
1406    let entrypoints = {
1407        let mut index: Vec<_> = (0..entrypoints.len()).collect();
1408        index.shuffle(&mut rand::thread_rng());
1409        index.into_iter().map(|i| &entrypoints[i])
1410    };
1411    for entrypoint in entrypoints {
1412        match solana_net_utils::get_cluster_shred_version_with_binding(entrypoint, bind_address) {
1413            Err(err) => eprintln!("get_cluster_shred_version failed: {entrypoint}, {err}"),
1414            Ok(0) => eprintln!("entrypoint {entrypoint} returned shred-version zero"),
1415            Ok(shred_version) => {
1416                info!(
1417                    "obtained shred-version {} from {}",
1418                    shred_version, entrypoint
1419                );
1420                return Some(shred_version);
1421            }
1422        }
1423    }
1424    None
1425}
1426
1427fn configure_banking_trace_dir_byte_limit(
1428    validator_config: &mut ValidatorConfig,
1429    matches: &ArgMatches,
1430) {
1431    validator_config.banking_trace_dir_byte_limit = if matches.is_present("disable_banking_trace") {
1432        // disable with an explicit flag; This effectively becomes `opt-out` by resetting to
1433        // DISABLED_BAKING_TRACE_DIR, while allowing us to specify a default sensible limit in clap
1434        // configuration for cli help.
1435        DISABLED_BAKING_TRACE_DIR
1436    } else {
1437        // a default value in clap configuration (BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT) or
1438        // explicit user-supplied override value
1439        value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64)
1440    };
1441}
1442
1443fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes {
1444    let account_indexes: HashSet<AccountIndex> = matches
1445        .values_of("account_indexes")
1446        .unwrap_or_default()
1447        .map(|value| match value {
1448            "program-id" => AccountIndex::ProgramId,
1449            "spl-token-mint" => AccountIndex::SplTokenMint,
1450            "spl-token-owner" => AccountIndex::SplTokenOwner,
1451            _ => unreachable!(),
1452        })
1453        .collect();
1454
1455    let account_indexes_include_keys: HashSet<Pubkey> =
1456        values_t!(matches, "account_index_include_key", Pubkey)
1457            .unwrap_or_default()
1458            .iter()
1459            .cloned()
1460            .collect();
1461
1462    let account_indexes_exclude_keys: HashSet<Pubkey> =
1463        values_t!(matches, "account_index_exclude_key", Pubkey)
1464            .unwrap_or_default()
1465            .iter()
1466            .cloned()
1467            .collect();
1468
1469    let exclude_keys = !account_indexes_exclude_keys.is_empty();
1470    let include_keys = !account_indexes_include_keys.is_empty();
1471
1472    let keys = if !account_indexes.is_empty() && (exclude_keys || include_keys) {
1473        let account_indexes_keys = AccountSecondaryIndexesIncludeExclude {
1474            exclude: exclude_keys,
1475            keys: if exclude_keys {
1476                account_indexes_exclude_keys
1477            } else {
1478                account_indexes_include_keys
1479            },
1480        };
1481        Some(account_indexes_keys)
1482    } else {
1483        None
1484    };
1485
1486    AccountSecondaryIndexes {
1487        keys,
1488        indexes: account_indexes,
1489    }
1490}