1use {
2 crate::{
3 admin_rpc_service::{self, load_staked_nodes_overrides, StakedNodesOverrides},
4 bootstrap,
5 cli::{self},
6 ledger_lockfile, lock_ledger,
7 },
8 clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
9 crossbeam_channel::unbounded,
10 log::*,
11 rand::{seq::SliceRandom, thread_rng},
12 solana_accounts_db::{
13 accounts_db::{AccountShrinkThreshold, AccountsDb, AccountsDbConfig},
14 accounts_file::StorageAccess,
15 accounts_index::{
16 AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude,
17 AccountsIndexConfig, IndexLimitMb, ScanFilter,
18 },
19 utils::{
20 create_all_accounts_run_and_snapshot_dirs, create_and_canonicalize_directories,
21 create_and_canonicalize_directory,
22 },
23 },
24 solana_clap_utils::input_parsers::{
25 keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of,
26 },
27 solana_clock::{Slot, DEFAULT_SLOTS_PER_EPOCH},
28 solana_core::{
29 banking_trace::DISABLED_BAKING_TRACE_DIR,
30 consensus::tower_storage,
31 snapshot_packager_service::SnapshotPackagerService,
32 system_monitor_service::SystemMonitorService,
33 validator::{
34 is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod,
35 TransactionStructure, Validator, ValidatorConfig, ValidatorError,
36 ValidatorStartProgress, ValidatorTpuConfig,
37 },
38 },
39 solana_gossip::{
40 cluster_info::{Node, NodeConfig},
41 contact_info::ContactInfo,
42 },
43 solana_hash::Hash,
44 solana_keypair::Keypair,
45 solana_ledger::{
46 blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
47 blockstore_options::{
48 AccessType, BlockstoreCompressionType, BlockstoreOptions, BlockstoreRecoveryMode,
49 LedgerColumnOptions,
50 },
51 use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
52 },
53 solana_logger::redirect_stderr_to_file,
54 solana_perf::recycler::enable_recycler_warming,
55 solana_poh::poh_service,
56 solana_pubkey::Pubkey,
57 solana_rpc::{
58 rpc::{JsonRpcConfig, RpcBigtableConfig},
59 rpc_pubsub_service::PubSubConfig,
60 },
61 solana_runtime::{
62 runtime_config::RuntimeConfig,
63 snapshot_bank_utils::DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
64 snapshot_config::{SnapshotConfig, SnapshotUsage},
65 snapshot_utils::{self, ArchiveFormat, SnapshotVersion},
66 },
67 solana_send_transaction_service::send_transaction_service,
68 solana_signer::Signer,
69 solana_streamer::{
70 quic::{QuicServerParams, DEFAULT_TPU_COALESCE},
71 socket::SocketAddrSpace,
72 },
73 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
74 solana_turbine::xdp::{set_cpu_affinity, XdpConfig},
75 std::{
76 collections::HashSet,
77 fs::{self, File},
78 net::{IpAddr, Ipv4Addr, SocketAddr},
79 num::NonZeroUsize,
80 path::{Path, PathBuf},
81 process::exit,
82 str::FromStr,
83 sync::{atomic::AtomicBool, Arc, RwLock},
84 time::Duration,
85 },
86};
87
88#[derive(Debug, PartialEq, Eq)]
89pub enum Operation {
90 Initialize,
91 Run,
92}
93
94const MILLIS_PER_SECOND: u64 = 1000;
95
96pub fn execute(
97 matches: &ArgMatches,
98 solana_version: &str,
99 socket_addr_space: SocketAddrSpace,
100 ledger_path: &Path,
101 operation: Operation,
102) -> Result<(), Box<dyn std::error::Error>> {
103 let cli::thread_args::NumThreadConfig {
104 accounts_db_clean_threads,
105 accounts_db_foreground_threads,
106 accounts_db_hash_threads,
107 accounts_index_flush_threads,
108 ip_echo_server_threads,
109 rayon_global_threads,
110 replay_forks_threads,
111 replay_transactions_threads,
112 rocksdb_compaction_threads,
113 rocksdb_flush_threads,
114 tvu_receive_threads,
115 tvu_retransmit_threads,
116 tvu_sigverify_threads,
117 } = cli::thread_args::parse_num_threads_args(matches);
118
119 let identity_keypair = keypair_of(matches, "identity").unwrap_or_else(|| {
120 clap::Error::with_description(
121 "The --identity <KEYPAIR> argument is required",
122 clap::ErrorKind::ArgumentNotFound,
123 )
124 .exit();
125 });
126
127 let logfile = {
128 let logfile = matches
129 .value_of("logfile")
130 .map(|s| s.into())
131 .unwrap_or_else(|| format!("agave-validator-{}.log", identity_keypair.pubkey()));
132
133 if logfile == "-" {
134 None
135 } else {
136 println!("log file: {logfile}");
137 Some(logfile)
138 }
139 };
140 let use_progress_bar = logfile.is_none();
141 let _logger_thread = redirect_stderr_to_file(logfile);
142
143 info!("{} {}", crate_name!(), solana_version);
144 info!("Starting validator with: {:#?}", std::env::args_os());
145
146 let cuda = matches.is_present("cuda");
147 if cuda {
148 solana_perf::perf_libs::init_cuda();
149 enable_recycler_warming();
150 }
151
152 solana_core::validator::report_target_features();
153
154 let authorized_voter_keypairs = keypairs_of(matches, "authorized_voter_keypairs")
155 .map(|keypairs| keypairs.into_iter().map(Arc::new).collect())
156 .unwrap_or_else(|| vec![Arc::new(keypair_of(matches, "identity").expect("identity"))]);
157 let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
158
159 let staked_nodes_overrides_path = matches
160 .value_of("staked_nodes_overrides")
161 .map(str::to_string);
162 let staked_nodes_overrides = Arc::new(RwLock::new(
163 match &staked_nodes_overrides_path {
164 None => StakedNodesOverrides::default(),
165 Some(p) => load_staked_nodes_overrides(p).unwrap_or_else(|err| {
166 error!("Failed to load stake-nodes-overrides from {}: {}", p, err);
167 clap::Error::with_description(
168 "Failed to load configuration of stake-nodes-overrides argument",
169 clap::ErrorKind::InvalidValue,
170 )
171 .exit()
172 }),
173 }
174 .staked_map_id,
175 ));
176
177 let init_complete_file = matches.value_of("init_complete_file");
178
179 let rpc_bootstrap_config = bootstrap::RpcBootstrapConfig {
180 no_genesis_fetch: matches.is_present("no_genesis_fetch"),
181 no_snapshot_fetch: matches.is_present("no_snapshot_fetch"),
182 check_vote_account: matches
183 .value_of("check_vote_account")
184 .map(|url| url.to_string()),
185 only_known_rpc: matches.is_present("only_known_rpc"),
186 max_genesis_archive_unpacked_size: value_t_or_exit!(
187 matches,
188 "max_genesis_archive_unpacked_size",
189 u64
190 ),
191 incremental_snapshot_fetch: !matches.is_present("no_incremental_snapshots"),
192 };
193
194 let private_rpc = matches.is_present("private_rpc");
195 let do_port_check = !matches.is_present("no_port_check");
196 let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64)
197 .map(Duration::from_millis)
198 .unwrap_or(DEFAULT_TPU_COALESCE);
199
200 let ledger_path = create_and_canonicalize_directory(ledger_path).map_err(|err| {
202 format!(
203 "unable to access ledger path '{}': {err}",
204 ledger_path.display(),
205 )
206 })?;
207
208 let recovery_mode = matches
209 .value_of("wal_recovery_mode")
210 .map(BlockstoreRecoveryMode::from);
211
212 let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
213 let limit_ledger_size = match matches.value_of("limit_ledger_size") {
214 Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
215 None => DEFAULT_MAX_LEDGER_SHREDS,
216 };
217 if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
218 Err(format!(
219 "The provided --limit-ledger-size value was too small, the minimum value is \
220 {DEFAULT_MIN_MAX_LEDGER_SHREDS}"
221 ))?;
222 }
223 Some(limit_ledger_size)
224 } else {
225 None
226 };
227
228 let column_options = LedgerColumnOptions {
229 compression_type: match matches.value_of("rocksdb_ledger_compression") {
230 None => BlockstoreCompressionType::default(),
231 Some(ledger_compression_string) => match ledger_compression_string {
232 "none" => BlockstoreCompressionType::None,
233 "snappy" => BlockstoreCompressionType::Snappy,
234 "lz4" => BlockstoreCompressionType::Lz4,
235 "zlib" => BlockstoreCompressionType::Zlib,
236 _ => panic!("Unsupported ledger_compression: {ledger_compression_string}"),
237 },
238 },
239 rocks_perf_sample_interval: value_t_or_exit!(
240 matches,
241 "rocksdb_perf_sample_interval",
242 usize
243 ),
244 };
245
246 let blockstore_options = BlockstoreOptions {
247 recovery_mode,
248 column_options,
249 enforce_ulimit_nofile: true,
252 access_type: AccessType::Primary,
254 num_rocksdb_compaction_threads: rocksdb_compaction_threads,
255 num_rocksdb_flush_threads: rocksdb_flush_threads,
256 };
257
258 let accounts_hash_cache_path = matches
259 .value_of("accounts_hash_cache_path")
260 .map(Into::into)
261 .unwrap_or_else(|| ledger_path.join(AccountsDb::DEFAULT_ACCOUNTS_HASH_CACHE_DIR));
262 let accounts_hash_cache_path = create_and_canonicalize_directory(&accounts_hash_cache_path)
263 .map_err(|err| {
264 format!(
265 "Unable to access accounts hash cache path '{}': {err}",
266 accounts_hash_cache_path.display(),
267 )
268 })?;
269
270 let debug_keys: Option<Arc<HashSet<_>>> = if matches.is_present("debug_key") {
271 Some(Arc::new(
272 values_t_or_exit!(matches, "debug_key", Pubkey)
273 .into_iter()
274 .collect(),
275 ))
276 } else {
277 None
278 };
279
280 let known_validators = validators_set(
281 &identity_keypair.pubkey(),
282 matches,
283 "known_validators",
284 "--known-validator",
285 )?;
286 let repair_validators = validators_set(
287 &identity_keypair.pubkey(),
288 matches,
289 "repair_validators",
290 "--repair-validator",
291 )?;
292 let repair_whitelist = validators_set(
293 &identity_keypair.pubkey(),
294 matches,
295 "repair_whitelist",
296 "--repair-whitelist",
297 )?;
298 let repair_whitelist = Arc::new(RwLock::new(repair_whitelist.unwrap_or_default()));
299 let gossip_validators = validators_set(
300 &identity_keypair.pubkey(),
301 matches,
302 "gossip_validators",
303 "--gossip-validator",
304 )?;
305
306 let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap())
307 .expect("invalid bind_address");
308 let rpc_bind_address = if matches.is_present("rpc_bind_address") {
309 solana_net_utils::parse_host(matches.value_of("rpc_bind_address").unwrap())
310 .expect("invalid rpc_bind_address")
311 } else if private_rpc {
312 solana_net_utils::parse_host("127.0.0.1").unwrap()
313 } else {
314 bind_address
315 };
316
317 let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
318
319 let account_indexes = process_account_indexes(matches);
320
321 let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
322 let accounts_shrink_optimize_total_space =
323 value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
324 let tpu_use_quic = !matches.is_present("tpu_disable_quic");
325 let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool);
326
327 let tpu_enable_udp = if matches.is_present("tpu_enable_udp") {
328 true
329 } else {
330 DEFAULT_TPU_ENABLE_UDP
331 };
332
333 let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);
334
335 let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
336 if !(0.0..=1.0).contains(&shrink_ratio) {
337 Err(format!(
338 "the specified account-shrink-ratio is invalid, it must be between 0. and 1.0 \
339 inclusive: {shrink_ratio}"
340 ))?;
341 }
342
343 let shrink_ratio = if accounts_shrink_optimize_total_space {
344 AccountShrinkThreshold::TotalSpace { shrink_ratio }
345 } else {
346 AccountShrinkThreshold::IndividualStore { shrink_ratio }
347 };
348 let entrypoint_addrs = values_t!(matches, "entrypoint", String)
349 .unwrap_or_default()
350 .into_iter()
351 .map(|entrypoint| {
352 solana_net_utils::parse_host_port(&entrypoint)
353 .map_err(|err| format!("failed to parse entrypoint address: {err}"))
354 })
355 .collect::<Result<HashSet<_>, _>>()?
356 .into_iter()
357 .collect::<Vec<_>>();
358 for addr in &entrypoint_addrs {
359 if !socket_addr_space.check(addr) {
360 Err(format!("invalid entrypoint address: {addr}"))?;
361 }
362 }
363 let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
368 .ok()
369 .or_else(|| get_cluster_shred_version(&entrypoint_addrs, bind_address));
370
371 let tower_storage: Arc<dyn tower_storage::TowerStorage> =
372 match value_t_or_exit!(matches, "tower_storage", String).as_str() {
373 "file" => {
374 let tower_path = value_t!(matches, "tower", PathBuf)
375 .ok()
376 .unwrap_or_else(|| ledger_path.clone());
377
378 Arc::new(tower_storage::FileTowerStorage::new(tower_path))
379 }
380 "etcd" => {
381 let endpoints = values_t_or_exit!(matches, "etcd_endpoint", String);
382 let domain_name = value_t_or_exit!(matches, "etcd_domain_name", String);
383 let ca_certificate_file = value_t_or_exit!(matches, "etcd_cacert_file", String);
384 let identity_certificate_file = value_t_or_exit!(matches, "etcd_cert_file", String);
385 let identity_private_key_file = value_t_or_exit!(matches, "etcd_key_file", String);
386
387 let read =
388 |file| fs::read(&file).map_err(|err| format!("unable to read {file}: {err}"));
389
390 let tls_config = tower_storage::EtcdTlsConfig {
391 domain_name,
392 ca_certificate: read(ca_certificate_file)?,
393 identity_certificate: read(identity_certificate_file)?,
394 identity_private_key: read(identity_private_key_file)?,
395 };
396
397 Arc::new(
398 tower_storage::EtcdTowerStorage::new(endpoints, Some(tls_config))
399 .map_err(|err| format!("failed to connect to etcd: {err}"))?,
400 )
401 }
402 _ => unreachable!(),
403 };
404
405 let mut accounts_index_config = AccountsIndexConfig {
406 num_flush_threads: Some(accounts_index_flush_threads),
407 ..AccountsIndexConfig::default()
408 };
409 if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
410 accounts_index_config.bins = Some(bins);
411 }
412
413 accounts_index_config.index_limit_mb = if matches.is_present("disable_accounts_disk_index") {
414 IndexLimitMb::InMemOnly
415 } else {
416 IndexLimitMb::Minimal
417 };
418
419 {
420 let mut accounts_index_paths: Vec<PathBuf> = if matches.is_present("accounts_index_path") {
421 values_t_or_exit!(matches, "accounts_index_path", String)
422 .into_iter()
423 .map(PathBuf::from)
424 .collect()
425 } else {
426 vec![]
427 };
428 if accounts_index_paths.is_empty() {
429 accounts_index_paths = vec![ledger_path.join("accounts_index")];
430 }
431 accounts_index_config.drives = Some(accounts_index_paths);
432 }
433
434 const MB: usize = 1_024 * 1_024;
435 accounts_index_config.scan_results_limit_bytes =
436 value_t!(matches, "accounts_index_scan_results_limit_mb", usize)
437 .ok()
438 .map(|mb| mb * MB);
439
440 let account_shrink_paths: Option<Vec<PathBuf>> =
441 values_t!(matches, "account_shrink_path", String)
442 .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
443 .ok();
444 let account_shrink_paths = account_shrink_paths
445 .as_ref()
446 .map(|paths| {
447 create_and_canonicalize_directories(paths)
448 .map_err(|err| format!("unable to access account shrink path: {err}"))
449 })
450 .transpose()?;
451
452 let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
453 .map(|paths| {
454 create_all_accounts_run_and_snapshot_dirs(&paths)
455 .map_err(|err| format!("unable to create account subdirectories: {err}"))
456 })
457 .transpose()?
458 .unzip();
459
460 let read_cache_limit_bytes = values_of::<usize>(matches, "accounts_db_read_cache_limit_mb")
461 .map(|limits| {
462 match limits.len() {
463 2 => (limits[0] * MB, limits[1] * MB),
465 1 => (limits[0] * MB, limits[0] * MB),
467 _ => {
468 unreachable!(
470 "invalid number of values given to accounts-db-read-cache-limit-mb"
471 )
472 }
473 }
474 });
475 let storage_access = matches
476 .value_of("accounts_db_access_storages_method")
477 .map(|method| match method {
478 "mmap" => StorageAccess::Mmap,
479 "file" => StorageAccess::File,
480 _ => {
481 unreachable!("invalid value given to accounts-db-access-storages-method")
483 }
484 })
485 .unwrap_or_default();
486
487 let scan_filter_for_shrinking = matches
488 .value_of("accounts_db_scan_filter_for_shrinking")
489 .map(|filter| match filter {
490 "all" => ScanFilter::All,
491 "only-abnormal" => ScanFilter::OnlyAbnormal,
492 "only-abnormal-with-verify" => ScanFilter::OnlyAbnormalWithVerify,
493 _ => {
494 unreachable!("invalid value given to accounts_db_scan_filter_for_shrinking")
496 }
497 })
498 .unwrap_or_default();
499
500 let accounts_db_config = AccountsDbConfig {
501 index: Some(accounts_index_config),
502 account_indexes: Some(account_indexes.clone()),
503 base_working_path: Some(ledger_path.clone()),
504 accounts_hash_cache_path: Some(accounts_hash_cache_path),
505 shrink_paths: account_shrink_run_paths,
506 shrink_ratio,
507 read_cache_limit_bytes,
508 write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
509 .ok()
510 .map(|mb| mb * MB as u64),
511 ancient_append_vec_offset: value_t!(matches, "accounts_db_ancient_append_vecs", i64).ok(),
512 ancient_storage_ideal_size: value_t!(
513 matches,
514 "accounts_db_ancient_storage_ideal_size",
515 u64
516 )
517 .ok(),
518 max_ancient_storages: value_t!(matches, "accounts_db_max_ancient_storages", usize).ok(),
519 hash_calculation_pubkey_bins: value_t!(
520 matches,
521 "accounts_db_hash_calculation_pubkey_bins",
522 usize
523 )
524 .ok(),
525 exhaustively_verify_refcounts: matches.is_present("accounts_db_verify_refcounts"),
526 test_skip_rewrites_but_include_in_bank_hash: false,
527 storage_access,
528 scan_filter_for_shrinking,
529 enable_experimental_accumulator_hash: !matches
530 .is_present("no_accounts_db_experimental_accumulator_hash"),
531 verify_experimental_accumulator_hash: matches
532 .is_present("accounts_db_verify_experimental_accumulator_hash"),
533 snapshots_use_experimental_accumulator_hash: matches
534 .is_present("accounts_db_snapshots_use_experimental_accumulator_hash"),
535 num_clean_threads: Some(accounts_db_clean_threads),
536 num_foreground_threads: Some(accounts_db_foreground_threads),
537 num_hash_threads: Some(accounts_db_hash_threads),
538 ..AccountsDbConfig::default()
539 };
540
541 let accounts_db_config = Some(accounts_db_config);
542
543 let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
544 Some(
545 values_t_or_exit!(matches, "geyser_plugin_config", String)
546 .into_iter()
547 .map(PathBuf::from)
548 .collect(),
549 )
550 } else {
551 None
552 };
553 let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
554 || matches.is_present("geyser_plugin_always_enabled");
555
556 let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
557 || matches.is_present("enable_bigtable_ledger_upload")
558 {
559 Some(RpcBigtableConfig {
560 enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"),
561 bigtable_instance_name: value_t_or_exit!(matches, "rpc_bigtable_instance_name", String),
562 bigtable_app_profile_id: value_t_or_exit!(
563 matches,
564 "rpc_bigtable_app_profile_id",
565 String
566 ),
567 timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
568 .ok()
569 .map(Duration::from_secs),
570 max_message_size: value_t_or_exit!(matches, "rpc_bigtable_max_message_size", usize),
571 })
572 } else {
573 None
574 };
575
576 let rpc_send_retry_rate_ms = value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64);
577 let rpc_send_batch_size = value_t_or_exit!(matches, "rpc_send_transaction_batch_size", usize);
578 let rpc_send_batch_send_rate_ms =
579 value_t_or_exit!(matches, "rpc_send_transaction_batch_ms", u64);
580
581 if rpc_send_batch_send_rate_ms > rpc_send_retry_rate_ms {
582 Err(format!(
583 "the specified rpc-send-batch-ms ({rpc_send_batch_send_rate_ms}) is invalid, it must \
584 be <= rpc-send-retry-ms ({rpc_send_retry_rate_ms})"
585 ))?;
586 }
587
588 let tps = rpc_send_batch_size as u64 * MILLIS_PER_SECOND / rpc_send_batch_send_rate_ms;
589 if tps > send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND {
590 Err(format!(
591 "either the specified rpc-send-batch-size ({}) or rpc-send-batch-ms ({}) is invalid, \
592 'rpc-send-batch-size * 1000 / rpc-send-batch-ms' must be smaller than ({}) .",
593 rpc_send_batch_size,
594 rpc_send_batch_send_rate_ms,
595 send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND
596 ))?;
597 }
598 let rpc_send_transaction_tpu_peers = matches
599 .values_of("rpc_send_transaction_tpu_peer")
600 .map(|values| {
601 values
602 .map(solana_net_utils::parse_host_port)
603 .collect::<Result<Vec<SocketAddr>, String>>()
604 })
605 .transpose()
606 .map_err(|err| {
607 format!("failed to parse rpc send-transaction-service tpu peer address: {err}")
608 })?;
609 let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
610 let leader_forward_count =
611 if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
612 0
614 } else {
615 value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
616 };
617
618 let full_api = matches.is_present("full_rpc_api");
619
620 let xdp_interface = matches.value_of("retransmit_xdp_interface");
621 let xdp_zero_copy = matches.is_present("retransmit_xdp_zero_copy");
622 let retransmit_xdp = matches.value_of("retransmit_xdp_cpu_cores").map(|cpus| {
623 XdpConfig::new(
624 xdp_interface,
625 parse_cpu_ranges(cpus).unwrap(),
626 xdp_zero_copy,
627 )
628 });
629
630 let mut validator_config = ValidatorConfig {
631 require_tower: matches.is_present("require_tower"),
632 tower_storage,
633 halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
634 expected_genesis_hash: matches
635 .value_of("expected_genesis_hash")
636 .map(|s| Hash::from_str(s).unwrap()),
637 expected_bank_hash: matches
638 .value_of("expected_bank_hash")
639 .map(|s| Hash::from_str(s).unwrap()),
640 expected_shred_version,
641 new_hard_forks: hardforks_of(matches, "hard_forks"),
642 rpc_config: JsonRpcConfig {
643 enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
644 enable_extended_tx_metadata_storage: matches.is_present("enable_cpi_and_log_storage")
645 || matches.is_present("enable_extended_tx_metadata_storage"),
646 rpc_bigtable_config,
647 faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
648 solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
649 }),
650 full_api,
651 max_multiple_accounts: Some(value_t_or_exit!(
652 matches,
653 "rpc_max_multiple_accounts",
654 usize
655 )),
656 health_check_slot_distance: value_t_or_exit!(
657 matches,
658 "health_check_slot_distance",
659 u64
660 ),
661 disable_health_check: false,
662 rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize),
663 rpc_blocking_threads: value_t_or_exit!(matches, "rpc_blocking_threads", usize),
664 rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8),
665 account_indexes: account_indexes.clone(),
666 rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),
667 max_request_body_size: Some(value_t_or_exit!(
668 matches,
669 "rpc_max_request_body_size",
670 usize
671 )),
672 skip_preflight_health_check: matches.is_present("skip_preflight_health_check"),
673 },
674 on_start_geyser_plugin_config_files,
675 geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
676 rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
677 (
678 SocketAddr::new(rpc_bind_address, rpc_port),
679 SocketAddr::new(rpc_bind_address, rpc_port + 1),
680 )
684 }),
685 pubsub_config: PubSubConfig {
686 enable_block_subscription: matches.is_present("rpc_pubsub_enable_block_subscription"),
687 enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"),
688 max_active_subscriptions: value_t_or_exit!(
689 matches,
690 "rpc_pubsub_max_active_subscriptions",
691 usize
692 ),
693 queue_capacity_items: value_t_or_exit!(
694 matches,
695 "rpc_pubsub_queue_capacity_items",
696 usize
697 ),
698 queue_capacity_bytes: value_t_or_exit!(
699 matches,
700 "rpc_pubsub_queue_capacity_bytes",
701 usize
702 ),
703 worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize),
704 notification_threads: value_t!(matches, "rpc_pubsub_notification_threads", usize)
705 .ok()
706 .and_then(NonZeroUsize::new),
707 },
708 voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
709 wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
710 known_validators,
711 repair_validators,
712 repair_whitelist,
713 gossip_validators,
714 max_ledger_shreds,
715 blockstore_options,
716 run_verification: !(matches.is_present("skip_poh_verify")
717 || matches.is_present("skip_startup_ledger_verification")),
718 debug_keys,
719 contact_debug_interval,
720 send_transaction_service_config: send_transaction_service::Config {
721 retry_rate_ms: rpc_send_retry_rate_ms,
722 leader_forward_count,
723 default_max_retries: value_t!(
724 matches,
725 "rpc_send_transaction_default_max_retries",
726 usize
727 )
728 .ok(),
729 service_max_retries: value_t_or_exit!(
730 matches,
731 "rpc_send_transaction_service_max_retries",
732 usize
733 ),
734 batch_send_rate_ms: rpc_send_batch_send_rate_ms,
735 batch_size: rpc_send_batch_size,
736 retry_pool_max_size: value_t_or_exit!(
737 matches,
738 "rpc_send_transaction_retry_pool_max_size",
739 usize
740 ),
741 tpu_peers: rpc_send_transaction_tpu_peers,
742 },
743 no_poh_speed_test: matches.is_present("no_poh_speed_test"),
744 no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
745 no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
746 no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
747 no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
748 poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core")
749 .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
750 poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch")
751 .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
752 process_ledger_before_services: matches.is_present("process_ledger_before_services"),
753 accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"),
754 accounts_db_config,
755 accounts_db_skip_shrink: true,
756 accounts_db_force_initial_clean: matches.is_present("no_skip_initial_accounts_db_clean"),
757 tpu_coalesce,
758 no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
759 runtime_config: RuntimeConfig {
760 log_messages_bytes_limit: value_of(matches, "log_messages_bytes_limit"),
761 ..RuntimeConfig::default()
762 },
763 staked_nodes_overrides: staked_nodes_overrides.clone(),
764 use_snapshot_archives_at_startup: value_t_or_exit!(
765 matches,
766 use_snapshot_archives_at_startup::cli::NAME,
767 UseSnapshotArchivesAtStartup
768 ),
769 ip_echo_server_threads,
770 rayon_global_threads,
771 replay_forks_threads,
772 replay_transactions_threads,
773 tvu_shred_sigverify_threads: tvu_sigverify_threads,
774 delay_leader_block_for_pending_fork: matches
775 .is_present("delay_leader_block_for_pending_fork"),
776 wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),
777 wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(),
778 retransmit_xdp,
779 use_tpu_client_next: !matches.is_present("use_connection_cache"),
780 ..ValidatorConfig::default()
781 };
782
783 let reserved = validator_config
784 .retransmit_xdp
785 .as_ref()
786 .map(|xdp| xdp.cpus.clone())
787 .unwrap_or_default()
788 .iter()
789 .cloned()
790 .collect::<HashSet<_>>();
791 if !reserved.is_empty() {
792 let available = core_affinity::get_core_ids()
793 .unwrap_or_default()
794 .into_iter()
795 .map(|core_id| core_id.id)
796 .collect::<HashSet<_>>();
797 let available = available.difference(&reserved);
798 set_cpu_affinity(available.into_iter().copied()).unwrap();
799 }
800
801 let vote_account = pubkey_of(matches, "vote_account").unwrap_or_else(|| {
802 if !validator_config.voting_disabled {
803 warn!("--vote-account not specified, validator will not vote");
804 validator_config.voting_disabled = true;
805 }
806 Keypair::new().pubkey()
807 });
808
809 let dynamic_port_range =
810 solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
811 .expect("invalid dynamic_port_range");
812
813 let account_paths: Vec<PathBuf> =
814 if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
815 account_paths
816 .join(",")
817 .split(',')
818 .map(PathBuf::from)
819 .collect()
820 } else {
821 vec![ledger_path.join("accounts")]
822 };
823 let account_paths = create_and_canonicalize_directories(account_paths)
824 .map_err(|err| format!("unable to access account path: {err}"))?;
825
826 let (account_run_paths, account_snapshot_paths) =
827 create_all_accounts_run_and_snapshot_dirs(&account_paths)
828 .map_err(|err| format!("unable to create account directories: {err}"))?;
829
830 validator_config.account_paths = account_run_paths;
832
833 validator_config.account_snapshot_paths =
835 if let Some(account_shrink_snapshot_paths) = account_shrink_snapshot_paths {
836 account_snapshot_paths
837 .into_iter()
838 .chain(account_shrink_snapshot_paths)
839 .collect()
840 } else {
841 account_snapshot_paths
842 };
843
844 let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
845 let maximum_full_snapshot_archives_to_retain =
846 value_t_or_exit!(matches, "maximum_full_snapshots_to_retain", NonZeroUsize);
847 let maximum_incremental_snapshot_archives_to_retain = value_t_or_exit!(
848 matches,
849 "maximum_incremental_snapshots_to_retain",
850 NonZeroUsize
851 );
852 let snapshot_packager_niceness_adj =
853 value_t_or_exit!(matches, "snapshot_packager_niceness_adj", i8);
854 let minimal_snapshot_download_speed =
855 value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32);
856 let maximum_snapshot_download_abort =
857 value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64);
858
859 let snapshots_dir = if let Some(snapshots) = matches.value_of("snapshots") {
860 Path::new(snapshots)
861 } else {
862 &ledger_path
863 };
864 let snapshots_dir = create_and_canonicalize_directory(snapshots_dir).map_err(|err| {
865 format!(
866 "failed to create snapshots directory '{}': {err}",
867 snapshots_dir.display(),
868 )
869 })?;
870
871 if account_paths
872 .iter()
873 .any(|account_path| account_path == &snapshots_dir)
874 {
875 Err(
876 "the --accounts and --snapshots paths must be unique since they \
877 both create 'snapshots' subdirectories, otherwise there may be collisions"
878 .to_string(),
879 )?;
880 }
881
882 let bank_snapshots_dir = snapshots_dir.join("snapshots");
883 fs::create_dir_all(&bank_snapshots_dir).map_err(|err| {
884 format!(
885 "failed to create bank snapshots directory '{}': {err}",
886 bank_snapshots_dir.display(),
887 )
888 })?;
889
890 let full_snapshot_archives_dir =
891 if let Some(full_snapshot_archive_path) = matches.value_of("full_snapshot_archive_path") {
892 PathBuf::from(full_snapshot_archive_path)
893 } else {
894 snapshots_dir.clone()
895 };
896 fs::create_dir_all(&full_snapshot_archives_dir).map_err(|err| {
897 format!(
898 "failed to create full snapshot archives directory '{}': {err}",
899 full_snapshot_archives_dir.display(),
900 )
901 })?;
902
903 let incremental_snapshot_archives_dir = if let Some(incremental_snapshot_archive_path) =
904 matches.value_of("incremental_snapshot_archive_path")
905 {
906 PathBuf::from(incremental_snapshot_archive_path)
907 } else {
908 snapshots_dir.clone()
909 };
910 fs::create_dir_all(&incremental_snapshot_archives_dir).map_err(|err| {
911 format!(
912 "failed to create incremental snapshot archives directory '{}': {err}",
913 incremental_snapshot_archives_dir.display(),
914 )
915 })?;
916
917 let archive_format = {
918 let archive_format_str = value_t_or_exit!(matches, "snapshot_archive_format", String);
919 let mut archive_format = ArchiveFormat::from_cli_arg(&archive_format_str)
920 .unwrap_or_else(|| panic!("Archive format not recognized: {archive_format_str}"));
921 if let ArchiveFormat::TarZstd { config } = &mut archive_format {
922 config.compression_level =
923 value_t_or_exit!(matches, "snapshot_zstd_compression_level", i32);
924 }
925 archive_format
926 };
927
928 let snapshot_version = matches
929 .value_of("snapshot_version")
930 .map(|value| {
931 value
932 .parse::<SnapshotVersion>()
933 .map_err(|err| format!("unable to parse snapshot version: {err}"))
934 })
935 .transpose()?
936 .unwrap_or(SnapshotVersion::default());
937
938 let (full_snapshot_archive_interval_slots, incremental_snapshot_archive_interval_slots) =
939 if matches.is_present("no_snapshots") {
940 (
942 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
943 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
944 )
945 } else {
946 match (
947 !matches.is_present("no_incremental_snapshots"),
948 value_t_or_exit!(matches, "snapshot_interval_slots", u64),
949 ) {
950 (_, 0) => {
951 warn!(
953 "Snapshot generation was disabled with `--snapshot-interval-slots 0`, \
954 which is now deprecated. Use `--no-snapshots` instead.",
955 );
956 (
957 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
958 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
959 )
960 }
961 (true, incremental_snapshot_interval_slots) => {
962 (
965 value_t_or_exit!(matches, "full_snapshot_interval_slots", u64),
966 incremental_snapshot_interval_slots,
967 )
968 }
969 (false, full_snapshot_interval_slots) => {
970 if matches.occurrences_of("full_snapshot_interval_slots") > 0 {
974 warn!(
975 "Incremental snapshots are disabled, yet --full-snapshot-interval-slots was specified! \
976 Note that --full-snapshot-interval-slots is *ignored* when incremental snapshots are disabled. \
977 Use --snapshot-interval-slots instead.",
978 );
979 }
980 (
981 full_snapshot_interval_slots,
982 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
983 )
984 }
985 }
986 };
987
988 validator_config.snapshot_config = SnapshotConfig {
989 usage: if full_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
990 SnapshotUsage::LoadOnly
991 } else {
992 SnapshotUsage::LoadAndGenerate
993 },
994 full_snapshot_archive_interval_slots,
995 incremental_snapshot_archive_interval_slots,
996 bank_snapshots_dir,
997 full_snapshot_archives_dir: full_snapshot_archives_dir.clone(),
998 incremental_snapshot_archives_dir: incremental_snapshot_archives_dir.clone(),
999 archive_format,
1000 snapshot_version,
1001 maximum_full_snapshot_archives_to_retain,
1002 maximum_incremental_snapshot_archives_to_retain,
1003 packager_thread_niceness_adj: snapshot_packager_niceness_adj,
1004 };
1005
1006 info!(
1007 "Snapshot configuration: full snapshot interval: {}, incremental snapshot interval: {}",
1008 if full_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
1009 "disabled".to_string()
1010 } else {
1011 format!("{full_snapshot_archive_interval_slots} slots")
1012 },
1013 if incremental_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
1014 "disabled".to_string()
1015 } else {
1016 format!("{incremental_snapshot_archive_interval_slots} slots")
1017 },
1018 );
1019
1020 if full_snapshot_archive_interval_slots > DEFAULT_SLOTS_PER_EPOCH
1023 && full_snapshot_archive_interval_slots != DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
1024 {
1025 warn!(
1026 "The full snapshot interval is excessively large: {}! This will negatively \
1027 impact the background cleanup tasks in accounts-db. Consider a smaller value.",
1028 full_snapshot_archive_interval_slots,
1029 );
1030 }
1031
1032 if !is_snapshot_config_valid(&validator_config.snapshot_config) {
1033 Err(
1034 "invalid snapshot configuration provided: snapshot intervals are incompatible. \
1035 \n\t- full snapshot interval MUST be larger than incremental snapshot interval \
1036 (if enabled)"
1037 .to_string(),
1038 )?;
1039 }
1040
1041 configure_banking_trace_dir_byte_limit(&mut validator_config, matches);
1042 validator_config.block_verification_method = value_t_or_exit!(
1043 matches,
1044 "block_verification_method",
1045 BlockVerificationMethod
1046 );
1047 match validator_config.block_verification_method {
1048 BlockVerificationMethod::BlockstoreProcessor => {
1049 warn!(
1050 "The value \"blockstore-processor\" for --block-verification-method has been \
1051 deprecated. The value \"blockstore-processor\" is still allowed for now, but \
1052 is planned for removal in the near future. To update, either set the value \
1053 \"unified-scheduler\" or remove the --block-verification-method argument"
1054 );
1055 }
1056 BlockVerificationMethod::UnifiedScheduler => {}
1057 }
1058 validator_config.block_production_method = value_t_or_exit!(
1059 matches, "block_production_method",
1061 BlockProductionMethod
1062 );
1063 validator_config.transaction_struct = value_t_or_exit!(
1064 matches, "transaction_struct",
1066 TransactionStructure
1067 );
1068 validator_config.enable_block_production_forwarding = staked_nodes_overrides_path.is_some();
1069 validator_config.unified_scheduler_handler_threads =
1070 value_t!(matches, "unified_scheduler_handler_threads", usize).ok();
1071
1072 let public_rpc_addr = matches
1073 .value_of("public_rpc_addr")
1074 .map(|addr| {
1075 solana_net_utils::parse_host_port(addr)
1076 .map_err(|err| format!("failed to parse public rpc address: {err}"))
1077 })
1078 .transpose()?;
1079
1080 if !matches.is_present("no_os_network_limits_test") {
1081 if SystemMonitorService::check_os_network_limits() {
1082 info!("OS network limits test passed.");
1083 } else {
1084 Err("OS network limit test failed. See \
1085 https://docs.solanalabs.com/operations/guides/validator-start#system-tuning"
1086 .to_string())?;
1087 }
1088 }
1089
1090 let validator_exit_backpressure = [(
1091 SnapshotPackagerService::NAME.to_string(),
1092 Arc::new(AtomicBool::new(false)),
1093 )]
1094 .into();
1095 validator_config.validator_exit_backpressure = validator_exit_backpressure;
1096
1097 let mut ledger_lock = ledger_lockfile(&ledger_path);
1098 let _ledger_write_guard = lock_ledger(&ledger_path, &mut ledger_lock);
1099
1100 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1101 let admin_service_post_init = Arc::new(RwLock::new(None));
1102 let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
1103 if starting_with_geyser_plugins {
1104 let (sender, receiver) = unbounded();
1105 (Some(sender), Some(receiver))
1106 } else {
1107 (None, None)
1108 };
1109 admin_rpc_service::run(
1110 &ledger_path,
1111 admin_rpc_service::AdminRpcRequestMetadata {
1112 rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1113 start_time: std::time::SystemTime::now(),
1114 validator_exit: validator_config.validator_exit.clone(),
1115 validator_exit_backpressure: validator_config.validator_exit_backpressure.clone(),
1116 start_progress: start_progress.clone(),
1117 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1118 post_init: admin_service_post_init.clone(),
1119 tower_storage: validator_config.tower_storage.clone(),
1120 staked_nodes_overrides,
1121 rpc_to_plugin_manager_sender,
1122 },
1123 );
1124
1125 let gossip_host: IpAddr = matches
1126 .value_of("gossip_host")
1127 .map(|gossip_host| {
1128 solana_net_utils::parse_host(gossip_host)
1129 .map_err(|err| format!("failed to parse --gossip-host: {err}"))
1130 })
1131 .transpose()?
1132 .or_else(|| {
1133 if !entrypoint_addrs.is_empty() {
1134 let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
1135 order.shuffle(&mut thread_rng());
1136 order.into_iter().find_map(|i| {
1138 let entrypoint_addr = &entrypoint_addrs[i];
1139 info!(
1140 "Contacting {} to determine the validator's public IP address",
1141 entrypoint_addr
1142 );
1143 solana_net_utils::get_public_ip_addr_with_binding(entrypoint_addr, bind_address)
1144 .map_or_else(
1145 |err| {
1146 warn!(
1147 "Failed to contact cluster entrypoint {entrypoint_addr}: {err}"
1148 );
1149 None
1150 },
1151 Some,
1152 )
1153 })
1154 } else {
1155 Some(IpAddr::V4(Ipv4Addr::LOCALHOST))
1156 }
1157 })
1158 .ok_or_else(|| "unable to determine the validator's public IP address".to_string())?;
1159 let gossip_port = value_t!(matches, "gossip_port", u16).or_else(|_| {
1160 solana_net_utils::find_available_port_in_range(bind_address, (0, 1))
1161 .map_err(|err| format!("unable to find an available gossip port: {err}"))
1162 })?;
1163 let gossip_addr = SocketAddr::new(gossip_host, gossip_port);
1164
1165 let public_tpu_addr = matches
1166 .value_of("public_tpu_addr")
1167 .map(|public_tpu_addr| {
1168 solana_net_utils::parse_host_port(public_tpu_addr)
1169 .map_err(|err| format!("failed to parse --public-tpu-address: {err}"))
1170 })
1171 .transpose()?;
1172
1173 let public_tpu_forwards_addr = matches
1174 .value_of("public_tpu_forwards_addr")
1175 .map(|public_tpu_forwards_addr| {
1176 solana_net_utils::parse_host_port(public_tpu_forwards_addr)
1177 .map_err(|err| format!("failed to parse --public-tpu-forwards-address: {err}"))
1178 })
1179 .transpose()?;
1180
1181 let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
1182
1183 let tpu_max_connections_per_peer =
1184 value_t_or_exit!(matches, "tpu_max_connections_per_peer", u64);
1185 let tpu_max_staked_connections = value_t_or_exit!(matches, "tpu_max_staked_connections", u64);
1186 let tpu_max_unstaked_connections =
1187 value_t_or_exit!(matches, "tpu_max_unstaked_connections", u64);
1188
1189 let tpu_max_fwd_staked_connections =
1190 value_t_or_exit!(matches, "tpu_max_fwd_staked_connections", u64);
1191 let tpu_max_fwd_unstaked_connections =
1192 value_t_or_exit!(matches, "tpu_max_fwd_unstaked_connections", u64);
1193
1194 let tpu_max_connections_per_ipaddr_per_minute: u64 =
1195 value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64);
1196 let max_streams_per_ms = value_t_or_exit!(matches, "tpu_max_streams_per_ms", u64);
1197
1198 let node_config = NodeConfig {
1199 gossip_addr,
1200 port_range: dynamic_port_range,
1201 bind_ip_addr: bind_address,
1202 public_tpu_addr,
1203 public_tpu_forwards_addr,
1204 num_tvu_receive_sockets: tvu_receive_threads,
1205 num_tvu_retransmit_sockets: tvu_retransmit_threads,
1206 num_quic_endpoints,
1207 };
1208
1209 let cluster_entrypoints = entrypoint_addrs
1210 .iter()
1211 .map(ContactInfo::new_gossip_entry_point)
1212 .collect::<Vec<_>>();
1213
1214 let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), node_config);
1215
1216 if restricted_repair_only_mode {
1217 if validator_config.wen_restart_proto_path.is_some() {
1218 Err("--restricted-repair-only-mode is not compatible with --wen_restart".to_string())?;
1219 }
1220
1221 node.info.remove_tpu();
1225 node.info.remove_tpu_forwards();
1226 node.info.remove_tvu();
1227 node.info.remove_serve_repair();
1228
1229 node.sockets.ip_echo = None;
1231 }
1232
1233 if !private_rpc {
1234 macro_rules! set_socket {
1235 ($method:ident, $addr:expr, $name:literal) => {
1236 node.info.$method($addr).expect(&format!(
1237 "Operator must spin up node with valid {} address",
1238 $name
1239 ))
1240 };
1241 }
1242 if let Some(public_rpc_addr) = public_rpc_addr {
1243 set_socket!(set_rpc, public_rpc_addr, "RPC");
1244 set_socket!(set_rpc_pubsub, public_rpc_addr, "RPC-pubsub");
1245 } else if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
1246 let addr = node
1247 .info
1248 .gossip()
1249 .expect("Operator must spin up node with valid gossip address")
1250 .ip();
1251 set_socket!(set_rpc, (addr, rpc_addr.port()), "RPC");
1252 set_socket!(set_rpc_pubsub, (addr, rpc_pubsub_addr.port()), "RPC-pubsub");
1253 }
1254 }
1255
1256 solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
1257 solana_metrics::set_panic_hook("validator", Some(String::from(solana_version)));
1258 solana_entry::entry::init_poh();
1259 snapshot_utils::remove_tmp_snapshot_archives(&full_snapshot_archives_dir);
1260 snapshot_utils::remove_tmp_snapshot_archives(&incremental_snapshot_archives_dir);
1261
1262 let identity_keypair = Arc::new(identity_keypair);
1263
1264 let should_check_duplicate_instance = true;
1265 if !cluster_entrypoints.is_empty() {
1266 bootstrap::rpc_bootstrap(
1267 &node,
1268 &identity_keypair,
1269 &ledger_path,
1270 &full_snapshot_archives_dir,
1271 &incremental_snapshot_archives_dir,
1272 &vote_account,
1273 authorized_voter_keypairs.clone(),
1274 &cluster_entrypoints,
1275 &mut validator_config,
1276 rpc_bootstrap_config,
1277 do_port_check,
1278 use_progress_bar,
1279 maximum_local_snapshot_age,
1280 should_check_duplicate_instance,
1281 &start_progress,
1282 minimal_snapshot_download_speed,
1283 maximum_snapshot_download_abort,
1284 socket_addr_space,
1285 );
1286 *start_progress.write().unwrap() = ValidatorStartProgress::Initializing;
1287 }
1288
1289 if operation == Operation::Initialize {
1290 info!("Validator ledger initialization complete");
1291 return Ok(());
1292 }
1293
1294 node.info.hot_swap_pubkey(identity_keypair.pubkey());
1300
1301 let tpu_quic_server_config = QuicServerParams {
1302 max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
1303 max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
1304 max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
1305 max_streams_per_ms,
1306 max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1307 coalesce: tpu_coalesce,
1308 ..Default::default()
1309 };
1310
1311 let tpu_fwd_quic_server_config = QuicServerParams {
1312 max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
1313 max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
1314 max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
1315 max_streams_per_ms,
1316 max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
1317 coalesce: tpu_coalesce,
1318 ..Default::default()
1319 };
1320
1321 let mut vote_quic_server_config = tpu_fwd_quic_server_config.clone();
1324 vote_quic_server_config.max_connections_per_peer = 1;
1325 vote_quic_server_config.max_unstaked_connections = 0;
1326
1327 let validator = match Validator::new(
1328 node,
1329 identity_keypair,
1330 &ledger_path,
1331 &vote_account,
1332 authorized_voter_keypairs,
1333 cluster_entrypoints,
1334 &validator_config,
1335 should_check_duplicate_instance,
1336 rpc_to_plugin_manager_receiver,
1337 start_progress,
1338 socket_addr_space,
1339 ValidatorTpuConfig {
1340 use_quic: tpu_use_quic,
1341 vote_use_quic,
1342 tpu_connection_pool_size,
1343 tpu_enable_udp,
1344 tpu_quic_server_config,
1345 tpu_fwd_quic_server_config,
1346 vote_quic_server_config,
1347 },
1348 admin_service_post_init,
1349 ) {
1350 Ok(validator) => Ok(validator),
1351 Err(err) => {
1352 if matches!(
1353 err.downcast_ref(),
1354 Some(&ValidatorError::WenRestartFinished)
1355 ) {
1356 error!("Please remove --wen_restart and use --wait_for_supermajority as instructed above");
1359 exit(200);
1360 }
1361 Err(format!("{err:?}"))
1362 }
1363 }?;
1364
1365 if let Some(filename) = init_complete_file {
1366 File::create(filename).map_err(|err| format!("unable to create {filename}: {err}"))?;
1367 }
1368 info!("Validator initialized");
1369 validator.join();
1370 info!("Validator exiting..");
1371
1372 Ok(())
1373}
1374
1375fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
1377 if matches.is_present(name) {
1378 Some(values_t_or_exit!(matches, name, Slot))
1379 } else {
1380 None
1381 }
1382}
1383
1384fn validators_set(
1385 identity_pubkey: &Pubkey,
1386 matches: &ArgMatches<'_>,
1387 matches_name: &str,
1388 arg_name: &str,
1389) -> Result<Option<HashSet<Pubkey>>, String> {
1390 if matches.is_present(matches_name) {
1391 let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
1392 .into_iter()
1393 .collect();
1394 if validators_set.contains(identity_pubkey) {
1395 Err(format!(
1396 "the validator's identity pubkey cannot be a {arg_name}: {identity_pubkey}"
1397 ))?;
1398 }
1399 Ok(Some(validators_set))
1400 } else {
1401 Ok(None)
1402 }
1403}
1404
1405fn get_cluster_shred_version(entrypoints: &[SocketAddr], bind_address: IpAddr) -> Option<u16> {
1406 let entrypoints = {
1407 let mut index: Vec<_> = (0..entrypoints.len()).collect();
1408 index.shuffle(&mut rand::thread_rng());
1409 index.into_iter().map(|i| &entrypoints[i])
1410 };
1411 for entrypoint in entrypoints {
1412 match solana_net_utils::get_cluster_shred_version_with_binding(entrypoint, bind_address) {
1413 Err(err) => eprintln!("get_cluster_shred_version failed: {entrypoint}, {err}"),
1414 Ok(0) => eprintln!("entrypoint {entrypoint} returned shred-version zero"),
1415 Ok(shred_version) => {
1416 info!(
1417 "obtained shred-version {} from {}",
1418 shred_version, entrypoint
1419 );
1420 return Some(shred_version);
1421 }
1422 }
1423 }
1424 None
1425}
1426
1427fn configure_banking_trace_dir_byte_limit(
1428 validator_config: &mut ValidatorConfig,
1429 matches: &ArgMatches,
1430) {
1431 validator_config.banking_trace_dir_byte_limit = if matches.is_present("disable_banking_trace") {
1432 DISABLED_BAKING_TRACE_DIR
1436 } else {
1437 value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64)
1440 };
1441}
1442
1443fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes {
1444 let account_indexes: HashSet<AccountIndex> = matches
1445 .values_of("account_indexes")
1446 .unwrap_or_default()
1447 .map(|value| match value {
1448 "program-id" => AccountIndex::ProgramId,
1449 "spl-token-mint" => AccountIndex::SplTokenMint,
1450 "spl-token-owner" => AccountIndex::SplTokenOwner,
1451 _ => unreachable!(),
1452 })
1453 .collect();
1454
1455 let account_indexes_include_keys: HashSet<Pubkey> =
1456 values_t!(matches, "account_index_include_key", Pubkey)
1457 .unwrap_or_default()
1458 .iter()
1459 .cloned()
1460 .collect();
1461
1462 let account_indexes_exclude_keys: HashSet<Pubkey> =
1463 values_t!(matches, "account_index_exclude_key", Pubkey)
1464 .unwrap_or_default()
1465 .iter()
1466 .cloned()
1467 .collect();
1468
1469 let exclude_keys = !account_indexes_exclude_keys.is_empty();
1470 let include_keys = !account_indexes_include_keys.is_empty();
1471
1472 let keys = if !account_indexes.is_empty() && (exclude_keys || include_keys) {
1473 let account_indexes_keys = AccountSecondaryIndexesIncludeExclude {
1474 exclude: exclude_keys,
1475 keys: if exclude_keys {
1476 account_indexes_exclude_keys
1477 } else {
1478 account_indexes_include_keys
1479 },
1480 };
1481 Some(account_indexes_keys)
1482 } else {
1483 None
1484 };
1485
1486 AccountSecondaryIndexes {
1487 keys,
1488 indexes: account_indexes,
1489 }
1490}