1pub use solana_perf::report_target_features;
4use {
5 crate::{
6 admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
7 banking_stage::BankingStage,
8 banking_trace::{self, BankingTracer, TraceError},
9 cluster_info_vote_listener::VoteTracker,
10 completed_data_sets_service::CompletedDataSetsService,
11 consensus::{
12 reconcile_blockstore_roots_with_external_source,
13 tower_storage::{NullTowerStorage, TowerStorage},
14 ExternalRootSource, Tower,
15 },
16 repair::{
17 self,
18 quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19 repair_handler::RepairHandlerType,
20 serve_repair_service::ServeRepairService,
21 },
22 sample_performance_service::SamplePerformanceService,
23 sigverify,
24 snapshot_packager_service::SnapshotPackagerService,
25 stats_reporter_service::StatsReporterService,
26 system_monitor_service::{
27 verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28 },
29 tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30 tvu::{Tvu, TvuConfig, TvuSockets},
31 },
32 anyhow::{anyhow, Context, Result},
33 crossbeam_channel::{bounded, unbounded, Receiver},
34 quinn::Endpoint,
35 solana_accounts_db::{
36 accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37 accounts_update_notifier_interface::AccountsUpdateNotifier,
38 hardened_unpack::{
39 open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40 },
41 utils::move_and_async_delete_path_contents,
42 },
43 solana_client::connection_cache::{ConnectionCache, Protocol},
44 solana_clock::Slot,
45 solana_cluster_type::ClusterType,
46 solana_entry::poh::compute_hash_time,
47 solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
48 solana_genesis_config::GenesisConfig,
49 solana_geyser_plugin_manager::{
50 geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
51 },
52 solana_gossip::{
53 cluster_info::{
54 ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
55 DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
56 },
57 contact_info::ContactInfo,
58 crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
59 gossip_service::GossipService,
60 node::{Node, NodeMultihoming},
61 },
62 solana_hard_forks::HardForks,
63 solana_hash::Hash,
64 solana_keypair::Keypair,
65 solana_ledger::{
66 bank_forks_utils,
67 blockstore::{
68 Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
69 MAX_REPLAY_WAKE_UP_SIGNALS,
70 },
71 blockstore_metric_report_service::BlockstoreMetricReportService,
72 blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
73 blockstore_processor::{self, TransactionStatusSender},
74 entry_notifier_interface::EntryNotifierArc,
75 entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
76 leader_schedule::FixedSchedule,
77 leader_schedule_cache::LeaderScheduleCache,
78 use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
79 },
80 solana_measure::measure::Measure,
81 solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
82 solana_poh::{
83 poh_recorder::PohRecorder,
84 poh_service::{self, PohService},
85 transaction_recorder::TransactionRecorder,
86 },
87 solana_pubkey::Pubkey,
88 solana_rayon_threadlimit::get_thread_count,
89 solana_rpc::{
90 max_slots::MaxSlots,
91 optimistically_confirmed_bank_tracker::{
92 BankNotificationSenderConfig, OptimisticallyConfirmedBank,
93 OptimisticallyConfirmedBankTracker,
94 },
95 rpc::JsonRpcConfig,
96 rpc_completed_slots_service::RpcCompletedSlotsService,
97 rpc_pubsub_service::{PubSubConfig, PubSubService},
98 rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
99 rpc_subscriptions::RpcSubscriptions,
100 transaction_notifier_interface::TransactionNotifierArc,
101 transaction_status_service::TransactionStatusService,
102 },
103 solana_runtime::{
104 accounts_background_service::{
105 AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
106 PendingSnapshotPackages, PrunedBanksRequestHandler, SnapshotRequestHandler,
107 },
108 bank::Bank,
109 bank_forks::BankForks,
110 commitment::BlockCommitmentCache,
111 dependency_tracker::DependencyTracker,
112 prioritization_fee_cache::PrioritizationFeeCache,
113 runtime_config::RuntimeConfig,
114 snapshot_archive_info::SnapshotArchiveInfoGetter,
115 snapshot_bank_utils,
116 snapshot_config::SnapshotConfig,
117 snapshot_controller::SnapshotController,
118 snapshot_hash::StartingSnapshotHashes,
119 snapshot_utils::{self, clean_orphaned_account_snapshot_dirs, SnapshotInterval},
120 },
121 solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
122 solana_shred_version::compute_shred_version,
123 solana_signer::Signer,
124 solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
125 solana_time_utils::timestamp,
126 solana_tpu_client::tpu_client::{
127 DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
128 },
129 solana_turbine::{
130 self,
131 broadcast_stage::BroadcastStageType,
132 xdp::{XdpConfig, XdpRetransmitter},
133 },
134 solana_unified_scheduler_pool::DefaultSchedulerPool,
135 solana_validator_exit::Exit,
136 solana_vote_program::vote_state,
137 solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
138 std::{
139 borrow::Cow,
140 collections::{HashMap, HashSet},
141 net::SocketAddr,
142 num::NonZeroUsize,
143 path::{Path, PathBuf},
144 sync::{
145 atomic::{AtomicBool, AtomicU64, Ordering},
146 Arc, Mutex, RwLock,
147 },
148 thread::{sleep, Builder, JoinHandle},
149 time::{Duration, Instant},
150 },
151 strum::VariantNames,
152 strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
153 thiserror::Error,
154 tokio::runtime::Runtime as TokioRuntime,
155 tokio_util::sync::CancellationToken,
156};
157
158const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
159const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
160const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
164 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
165
166#[derive(
167 Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
168)]
169#[strum(serialize_all = "kebab-case")]
170pub enum BlockVerificationMethod {
171 BlockstoreProcessor,
172 #[default]
173 UnifiedScheduler,
174}
175
176impl BlockVerificationMethod {
177 pub const fn cli_names() -> &'static [&'static str] {
178 Self::VARIANTS
179 }
180
181 pub fn cli_message() -> &'static str {
182 "Switch transaction scheduling method for verifying ledger entries"
183 }
184}
185
186#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
187#[strum(serialize_all = "kebab-case")]
188pub enum BlockProductionMethod {
189 CentralScheduler,
190 #[default]
191 CentralSchedulerGreedy,
192}
193
194impl BlockProductionMethod {
195 pub const fn cli_names() -> &'static [&'static str] {
196 Self::VARIANTS
197 }
198
199 pub fn cli_message() -> &'static str {
200 "Switch transaction scheduling method for producing ledger entries"
201 }
202}
203
204#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
205#[strum(serialize_all = "kebab-case")]
206pub enum TransactionStructure {
207 Sdk,
208 #[default]
209 View,
210}
211
212impl TransactionStructure {
213 pub const fn cli_names() -> &'static [&'static str] {
214 Self::VARIANTS
215 }
216
217 pub fn cli_message() -> &'static str {
218 "Switch internal transaction structure/representation"
219 }
220}
221
222#[derive(Clone, Debug)]
224pub struct GeneratorConfig {
225 pub accounts_path: String,
226 pub starting_keypairs: Arc<Vec<Keypair>>,
227}
228
229pub struct ValidatorConfig {
230 pub halt_at_slot: Option<Slot>,
231 pub expected_genesis_hash: Option<Hash>,
232 pub expected_bank_hash: Option<Hash>,
233 pub expected_shred_version: Option<u16>,
234 pub voting_disabled: bool,
235 pub account_paths: Vec<PathBuf>,
236 pub account_snapshot_paths: Vec<PathBuf>,
237 pub rpc_config: JsonRpcConfig,
238 pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
240 pub geyser_plugin_always_enabled: bool,
241 pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, pub pubsub_config: PubSubConfig,
243 pub snapshot_config: SnapshotConfig,
244 pub max_ledger_shreds: Option<u64>,
245 pub blockstore_options: BlockstoreOptions,
246 pub broadcast_stage_type: BroadcastStageType,
247 pub turbine_disabled: Arc<AtomicBool>,
248 pub fixed_leader_schedule: Option<FixedSchedule>,
249 pub wait_for_supermajority: Option<Slot>,
250 pub new_hard_forks: Option<Vec<Slot>>,
251 pub known_validators: Option<HashSet<Pubkey>>, pub repair_validators: Option<HashSet<Pubkey>>, pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, pub gossip_validators: Option<HashSet<Pubkey>>, pub max_genesis_archive_unpacked_size: u64,
256 pub run_verification: bool,
259 pub require_tower: bool,
260 pub tower_storage: Arc<dyn TowerStorage>,
261 pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
262 pub contact_debug_interval: u64,
263 pub contact_save_interval: u64,
264 pub send_transaction_service_config: SendTransactionServiceConfig,
265 pub no_poh_speed_test: bool,
266 pub no_os_memory_stats_reporting: bool,
267 pub no_os_network_stats_reporting: bool,
268 pub no_os_cpu_stats_reporting: bool,
269 pub no_os_disk_stats_reporting: bool,
270 pub poh_pinned_cpu_core: usize,
271 pub poh_hashes_per_batch: u64,
272 pub process_ledger_before_services: bool,
273 pub accounts_db_config: Option<AccountsDbConfig>,
274 pub warp_slot: Option<Slot>,
275 pub accounts_db_skip_shrink: bool,
276 pub accounts_db_force_initial_clean: bool,
277 pub tpu_coalesce: Duration,
278 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
279 pub validator_exit: Arc<RwLock<Exit>>,
280 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
281 pub no_wait_for_vote_to_start_leader: bool,
282 pub wait_to_vote_slot: Option<Slot>,
283 pub runtime_config: RuntimeConfig,
284 pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
285 pub block_verification_method: BlockVerificationMethod,
286 pub block_production_method: BlockProductionMethod,
287 pub block_production_num_workers: NonZeroUsize,
288 pub transaction_struct: TransactionStructure,
289 pub enable_block_production_forwarding: bool,
290 pub generator_config: Option<GeneratorConfig>,
291 pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
292 pub wen_restart_proto_path: Option<PathBuf>,
293 pub wen_restart_coordinator: Option<Pubkey>,
294 pub unified_scheduler_handler_threads: Option<usize>,
295 pub ip_echo_server_threads: NonZeroUsize,
296 pub rayon_global_threads: NonZeroUsize,
297 pub replay_forks_threads: NonZeroUsize,
298 pub replay_transactions_threads: NonZeroUsize,
299 pub tvu_shred_sigverify_threads: NonZeroUsize,
300 pub delay_leader_block_for_pending_fork: bool,
301 pub use_tpu_client_next: bool,
302 pub retransmit_xdp: Option<XdpConfig>,
303 pub repair_handler_type: RepairHandlerType,
304}
305
306impl ValidatorConfig {
307 pub fn default_for_test() -> Self {
308 let max_thread_count =
309 NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
310
311 Self {
312 halt_at_slot: None,
313 expected_genesis_hash: None,
314 expected_bank_hash: None,
315 expected_shred_version: None,
316 voting_disabled: false,
317 max_ledger_shreds: None,
318 blockstore_options: BlockstoreOptions::default_for_tests(),
319 account_paths: Vec::new(),
320 account_snapshot_paths: Vec::new(),
321 rpc_config: JsonRpcConfig::default_for_test(),
322 on_start_geyser_plugin_config_files: None,
323 geyser_plugin_always_enabled: false,
324 rpc_addrs: None,
325 pubsub_config: PubSubConfig::default(),
326 snapshot_config: SnapshotConfig::new_load_only(),
327 broadcast_stage_type: BroadcastStageType::Standard,
328 turbine_disabled: Arc::<AtomicBool>::default(),
329 fixed_leader_schedule: None,
330 wait_for_supermajority: None,
331 new_hard_forks: None,
332 known_validators: None,
333 repair_validators: None,
334 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
335 gossip_validators: None,
336 max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
337 run_verification: true,
338 require_tower: false,
339 tower_storage: Arc::new(NullTowerStorage::default()),
340 debug_keys: None,
341 contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
342 contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
343 send_transaction_service_config: SendTransactionServiceConfig::default(),
344 no_poh_speed_test: true,
345 no_os_memory_stats_reporting: true,
346 no_os_network_stats_reporting: true,
347 no_os_cpu_stats_reporting: true,
348 no_os_disk_stats_reporting: true,
349 poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
350 poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
351 process_ledger_before_services: false,
352 warp_slot: None,
353 accounts_db_skip_shrink: false,
354 accounts_db_force_initial_clean: false,
355 tpu_coalesce: DEFAULT_TPU_COALESCE,
356 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
357 validator_exit: Arc::new(RwLock::new(Exit::default())),
358 validator_exit_backpressure: HashMap::default(),
359 no_wait_for_vote_to_start_leader: true,
360 accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
361 wait_to_vote_slot: None,
362 runtime_config: RuntimeConfig::default(),
363 banking_trace_dir_byte_limit: 0,
364 block_verification_method: BlockVerificationMethod::default(),
365 block_production_method: BlockProductionMethod::default(),
366 block_production_num_workers: BankingStage::default_num_workers(),
367 transaction_struct: TransactionStructure::default(),
368 enable_block_production_forwarding: true,
370 generator_config: None,
371 use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
372 wen_restart_proto_path: None,
373 wen_restart_coordinator: None,
374 unified_scheduler_handler_threads: None,
375 ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
376 rayon_global_threads: max_thread_count,
377 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
378 replay_transactions_threads: max_thread_count,
379 tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
380 .expect("thread count is non-zero"),
381 delay_leader_block_for_pending_fork: false,
382 use_tpu_client_next: true,
383 retransmit_xdp: None,
384 repair_handler_type: RepairHandlerType::default(),
385 }
386 }
387
388 pub fn enable_default_rpc_block_subscribe(&mut self) {
389 let pubsub_config = PubSubConfig {
390 enable_block_subscription: true,
391 ..PubSubConfig::default()
392 };
393 let rpc_config = JsonRpcConfig {
394 enable_rpc_transaction_history: true,
395 ..JsonRpcConfig::default_for_test()
396 };
397
398 self.pubsub_config = pubsub_config;
399 self.rpc_config = rpc_config;
400 }
401}
402
403#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
407pub enum ValidatorStartProgress {
408 Initializing, SearchingForRpcService,
410 DownloadingSnapshot {
411 slot: Slot,
412 rpc_addr: SocketAddr,
413 },
414 CleaningBlockStore,
415 CleaningAccounts,
416 LoadingLedger,
417 ProcessingLedger {
418 slot: Slot,
419 max_slot: Slot,
420 },
421 StartingServices,
422 Halted, WaitingForSupermajority {
424 slot: Slot,
425 gossip_stake_percent: u64,
426 },
427
428 Running,
431}
432
433impl Default for ValidatorStartProgress {
434 fn default() -> Self {
435 Self::Initializing
436 }
437}
438
439struct BlockstoreRootScan {
440 thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
441}
442
443impl BlockstoreRootScan {
444 fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
445 let thread = if config.rpc_addrs.is_some()
446 && config.rpc_config.enable_rpc_transaction_history
447 && config.rpc_config.rpc_scan_and_fix_roots
448 {
449 Some(
450 Builder::new()
451 .name("solBStoreRtScan".to_string())
452 .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
453 .unwrap(),
454 )
455 } else {
456 None
457 };
458 Self { thread }
459 }
460
461 fn join(self) {
462 if let Some(blockstore_root_scan) = self.thread {
463 if let Err(err) = blockstore_root_scan.join() {
464 warn!("blockstore_root_scan failed to join {err:?}");
465 }
466 }
467 }
468}
469
470#[derive(Default)]
471struct TransactionHistoryServices {
472 transaction_status_sender: Option<TransactionStatusSender>,
473 transaction_status_service: Option<TransactionStatusService>,
474 max_complete_transaction_status_slot: Arc<AtomicU64>,
475}
476
477pub struct ValidatorTpuConfig {
479 pub use_quic: bool,
481 pub vote_use_quic: bool,
483 pub tpu_connection_pool_size: usize,
485 pub tpu_enable_udp: bool,
487 pub tpu_quic_server_config: QuicServerParams,
489 pub tpu_fwd_quic_server_config: QuicServerParams,
491 pub vote_quic_server_config: QuicServerParams,
493}
494
495impl ValidatorTpuConfig {
496 pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
499 let tpu_quic_server_config = QuicServerParams {
500 max_connections_per_ipaddr_per_min: 32,
501 coalesce_channel_size: 100_000, ..Default::default()
503 };
504
505 let tpu_fwd_quic_server_config = QuicServerParams {
506 max_connections_per_ipaddr_per_min: 32,
507 max_unstaked_connections: 0,
508 coalesce_channel_size: 100_000, ..Default::default()
510 };
511
512 let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
514
515 ValidatorTpuConfig {
516 use_quic: DEFAULT_TPU_USE_QUIC,
517 vote_use_quic: DEFAULT_VOTE_USE_QUIC,
518 tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
519 tpu_enable_udp,
520 tpu_quic_server_config,
521 tpu_fwd_quic_server_config,
522 vote_quic_server_config,
523 }
524 }
525}
526
527pub struct Validator {
528 validator_exit: Arc<RwLock<Exit>>,
529 json_rpc_service: Option<JsonRpcService>,
530 pubsub_service: Option<PubSubService>,
531 rpc_completed_slots_service: Option<JoinHandle<()>>,
532 optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
533 transaction_status_service: Option<TransactionStatusService>,
534 entry_notifier_service: Option<EntryNotifierService>,
535 system_monitor_service: Option<SystemMonitorService>,
536 sample_performance_service: Option<SamplePerformanceService>,
537 stats_reporter_service: StatsReporterService,
538 gossip_service: GossipService,
539 serve_repair_service: ServeRepairService,
540 completed_data_sets_service: Option<CompletedDataSetsService>,
541 snapshot_packager_service: Option<SnapshotPackagerService>,
542 poh_recorder: Arc<RwLock<PohRecorder>>,
543 poh_service: PohService,
544 tpu: Tpu,
545 tvu: Tvu,
546 ip_echo_server: Option<solana_net_utils::IpEchoServer>,
547 pub cluster_info: Arc<ClusterInfo>,
548 pub bank_forks: Arc<RwLock<BankForks>>,
549 pub blockstore: Arc<Blockstore>,
550 geyser_plugin_service: Option<GeyserPluginService>,
551 blockstore_metric_report_service: BlockstoreMetricReportService,
552 accounts_background_service: AccountsBackgroundService,
553 turbine_quic_endpoint: Option<Endpoint>,
554 turbine_quic_endpoint_runtime: Option<TokioRuntime>,
555 turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
556 repair_quic_endpoints: Option<[Endpoint; 3]>,
557 repair_quic_endpoints_runtime: Option<TokioRuntime>,
558 repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
559 xdp_retransmitter: Option<XdpRetransmitter>,
560 _tpu_client_next_runtime: Option<TokioRuntime>,
564}
565
566impl Validator {
567 #[allow(clippy::too_many_arguments)]
568 pub fn new(
569 mut node: Node,
570 identity_keypair: Arc<Keypair>,
571 ledger_path: &Path,
572 vote_account: &Pubkey,
573 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
574 cluster_entrypoints: Vec<ContactInfo>,
575 config: &ValidatorConfig,
576 should_check_duplicate_instance: bool,
577 rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
578 start_progress: Arc<RwLock<ValidatorStartProgress>>,
579 socket_addr_space: SocketAddrSpace,
580 tpu_config: ValidatorTpuConfig,
581 admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
582 ) -> Result<Self> {
583 #[cfg(debug_assertions)]
584 const DEBUG_ASSERTION_STATUS: &str = "enabled";
585 #[cfg(not(debug_assertions))]
586 const DEBUG_ASSERTION_STATUS: &str = "disabled";
587 info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
588
589 let ValidatorTpuConfig {
590 use_quic,
591 vote_use_quic,
592 tpu_connection_pool_size,
593 tpu_enable_udp,
594 tpu_quic_server_config,
595 tpu_fwd_quic_server_config,
596 vote_quic_server_config,
597 } = tpu_config;
598
599 let start_time = Instant::now();
600
601 if rayon::ThreadPoolBuilder::new()
605 .thread_name(|i| format!("solRayonGlob{i:02}"))
606 .num_threads(config.rayon_global_threads.get())
607 .build_global()
608 .is_err()
609 {
610 warn!("Rayon global thread pool already initialized");
611 }
612
613 let id = identity_keypair.pubkey();
614 assert_eq!(&id, node.info.pubkey());
615
616 info!("identity pubkey: {id}");
617 info!("vote account pubkey: {vote_account}");
618
619 if !config.no_os_network_stats_reporting {
620 verify_net_stats_access().map_err(|e| {
621 ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
622 })?;
623 }
624
625 let mut bank_notification_senders = Vec::new();
626
627 let exit = Arc::new(AtomicBool::new(false));
628
629 let geyser_plugin_config_files = config
630 .on_start_geyser_plugin_config_files
631 .as_ref()
632 .map(Cow::Borrowed)
633 .or_else(|| {
634 config
635 .geyser_plugin_always_enabled
636 .then_some(Cow::Owned(vec![]))
637 });
638 let geyser_plugin_service =
639 if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
640 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
641 bank_notification_senders.push(confirmed_bank_sender);
642 let rpc_to_plugin_manager_receiver_and_exit =
643 rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
644 Some(
645 GeyserPluginService::new_with_receiver(
646 confirmed_bank_receiver,
647 config.geyser_plugin_always_enabled,
648 geyser_plugin_config_files.as_ref(),
649 rpc_to_plugin_manager_receiver_and_exit,
650 )
651 .map_err(|err| {
652 ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
653 })?,
654 )
655 } else {
656 None
657 };
658
659 if config.voting_disabled {
660 warn!("voting disabled");
661 authorized_voter_keypairs.write().unwrap().clear();
662 } else {
663 for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
664 warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
665 }
666 }
667
668 for cluster_entrypoint in &cluster_entrypoints {
669 info!("entrypoint: {cluster_entrypoint:?}");
670 }
671
672 if solana_perf::perf_libs::api().is_some() {
673 info!("Initializing sigverify, this could take a while...");
674 } else {
675 info!("Initializing sigverify...");
676 }
677 sigverify::init();
678 info!("Initializing sigverify done.");
679
680 if !ledger_path.is_dir() {
681 return Err(anyhow!(
682 "ledger directory does not exist or is not accessible: {ledger_path:?}"
683 ));
684 }
685 let genesis_config = load_genesis(config, ledger_path)?;
686 metrics_config_sanity_check(genesis_config.cluster_type)?;
687
688 info!("Cleaning accounts paths..");
689 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
690 let mut timer = Measure::start("clean_accounts_paths");
691 cleanup_accounts_paths(config);
692 timer.stop();
693 info!("Cleaning accounts paths done. {timer}");
694
695 snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
696 snapshot_utils::purge_old_bank_snapshots_at_startup(
697 &config.snapshot_config.bank_snapshots_dir,
698 );
699
700 info!("Cleaning orphaned account snapshot directories..");
701 let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
702 clean_orphaned_account_snapshot_dirs(
703 &config.snapshot_config.bank_snapshots_dir,
704 &config.account_snapshot_paths,
705 )
706 .context("failed to clean orphaned account snapshot directories")?;
707 timer.stop();
708 info!("Cleaning orphaned account snapshot directories done. {timer}");
709
710 let cancel_tpu_client_next = CancellationToken::new();
712 {
713 let exit = exit.clone();
714 config
715 .validator_exit
716 .write()
717 .unwrap()
718 .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
719 let cancel_tpu_client_next = cancel_tpu_client_next.clone();
720 config
721 .validator_exit
722 .write()
723 .unwrap()
724 .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
725 }
726
727 let (
728 accounts_update_notifier,
729 transaction_notifier,
730 entry_notifier,
731 block_metadata_notifier,
732 slot_status_notifier,
733 ) = if let Some(service) = &geyser_plugin_service {
734 (
735 service.get_accounts_update_notifier(),
736 service.get_transaction_notifier(),
737 service.get_entry_notifier(),
738 service.get_block_metadata_notifier(),
739 service.get_slot_status_notifier(),
740 )
741 } else {
742 (None, None, None, None, None)
743 };
744
745 info!(
746 "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
747 entry_notifier: {}",
748 accounts_update_notifier.is_some(),
749 transaction_notifier.is_some(),
750 entry_notifier.is_some()
751 );
752
753 let system_monitor_service = Some(SystemMonitorService::new(
754 exit.clone(),
755 SystemMonitorStatsReportConfig {
756 report_os_memory_stats: !config.no_os_memory_stats_reporting,
757 report_os_network_stats: !config.no_os_network_stats_reporting,
758 report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
759 report_os_disk_stats: !config.no_os_disk_stats_reporting,
760 },
761 ));
762
763 let dependency_tracker = Arc::new(DependencyTracker::default());
764
765 let (
766 bank_forks,
767 blockstore,
768 original_blockstore_root,
769 ledger_signal_receiver,
770 leader_schedule_cache,
771 starting_snapshot_hashes,
772 TransactionHistoryServices {
773 transaction_status_sender,
774 transaction_status_service,
775 max_complete_transaction_status_slot,
776 },
777 blockstore_process_options,
778 blockstore_root_scan,
779 pruned_banks_receiver,
780 entry_notifier_service,
781 ) = load_blockstore(
782 config,
783 ledger_path,
784 &genesis_config,
785 exit.clone(),
786 &start_progress,
787 accounts_update_notifier,
788 transaction_notifier,
789 entry_notifier,
790 config
791 .rpc_addrs
792 .is_some()
793 .then(|| dependency_tracker.clone()),
794 )
795 .map_err(ValidatorError::Other)?;
796
797 if !config.no_poh_speed_test {
798 check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
799 }
800
801 let (root_slot, hard_forks) = {
802 let root_bank = bank_forks.read().unwrap().root_bank();
803 (root_bank.slot(), root_bank.hard_forks())
804 };
805 let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
806 info!("shred version: {shred_version}, hard forks: {hard_forks:?}");
807
808 if let Some(expected_shred_version) = config.expected_shred_version {
809 if expected_shred_version != shred_version {
810 return Err(ValidatorError::ShredVersionMismatch {
811 actual: shred_version,
812 expected: expected_shred_version,
813 }
814 .into());
815 }
816 }
817
818 if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
819 config,
820 &blockstore,
821 root_slot,
822 &hard_forks,
823 )? {
824 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
825 cleanup_blockstore_incorrect_shred_versions(
826 &blockstore,
827 config,
828 start_slot,
829 shred_version,
830 )?;
831 } else {
832 info!("Skipping the blockstore check for shreds with incorrect version");
833 }
834
835 node.info.set_shred_version(shred_version);
836 node.info.set_wallclock(timestamp());
837 Self::print_node_info(&node);
838
839 let mut cluster_info = ClusterInfo::new(
840 node.info.clone(),
841 identity_keypair.clone(),
842 socket_addr_space,
843 );
844 cluster_info.set_contact_debug_interval(config.contact_debug_interval);
845 cluster_info.set_entrypoints(cluster_entrypoints);
846 cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
847 cluster_info.set_bind_ip_addrs(node.bind_ip_addrs.clone());
848 let cluster_info = Arc::new(cluster_info);
849 let node_multihoming = NodeMultihoming::from(&node);
850
851 assert!(is_snapshot_config_valid(&config.snapshot_config));
852
853 let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
854 let snapshot_controller = Arc::new(SnapshotController::new(
855 snapshot_request_sender.clone(),
856 config.snapshot_config.clone(),
857 bank_forks.read().unwrap().root(),
858 ));
859
860 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
861 let snapshot_packager_service = if snapshot_controller
862 .snapshot_config()
863 .should_generate_snapshots()
864 {
865 let exit_backpressure = config
866 .validator_exit_backpressure
867 .get(SnapshotPackagerService::NAME)
868 .cloned();
869 let enable_gossip_push = true;
870 let snapshot_packager_service = SnapshotPackagerService::new(
871 pending_snapshot_packages.clone(),
872 starting_snapshot_hashes,
873 exit.clone(),
874 exit_backpressure,
875 cluster_info.clone(),
876 snapshot_controller.clone(),
877 enable_gossip_push,
878 );
879 Some(snapshot_packager_service)
880 } else {
881 None
882 };
883
884 let snapshot_request_handler = SnapshotRequestHandler {
885 snapshot_controller: snapshot_controller.clone(),
886 snapshot_request_receiver,
887 pending_snapshot_packages,
888 };
889 let pruned_banks_request_handler = PrunedBanksRequestHandler {
890 pruned_banks_receiver,
891 };
892 let accounts_background_service = AccountsBackgroundService::new(
893 bank_forks.clone(),
894 exit.clone(),
895 AbsRequestHandlers {
896 snapshot_request_handler,
897 pruned_banks_request_handler,
898 },
899 );
900 info!(
901 "Using: block-verification-method: {}, block-production-method: {}, \
902 transaction-structure: {}",
903 config.block_verification_method,
904 config.block_production_method,
905 config.transaction_struct
906 );
907
908 let (replay_vote_sender, replay_vote_receiver) = unbounded();
909
910 let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
913
914 let leader_schedule_cache = Arc::new(leader_schedule_cache);
915 let startup_verification_complete;
916 let (mut poh_recorder, entry_receiver) = {
917 let bank = &bank_forks.read().unwrap().working_bank();
918 startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
919 PohRecorder::new_with_clear_signal(
920 bank.tick_height(),
921 bank.last_blockhash(),
922 bank.clone(),
923 None,
924 bank.ticks_per_slot(),
925 config.delay_leader_block_for_pending_fork,
926 blockstore.clone(),
927 blockstore.get_new_shred_signal(0),
928 &leader_schedule_cache,
929 &genesis_config.poh_config,
930 exit.clone(),
931 )
932 };
933 if transaction_status_sender.is_some() {
934 poh_recorder.track_transaction_indexes();
935 }
936 let (record_sender, record_receiver) = unbounded();
937 let transaction_recorder =
938 TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
939 let poh_recorder = Arc::new(RwLock::new(poh_recorder));
940
941 let (banking_tracer, tracer_thread) =
942 BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
943 &blockstore.banking_trace_path(),
944 exit.clone(),
945 config.banking_trace_dir_byte_limit,
946 )))?;
947 if banking_tracer.is_enabled() {
948 info!(
949 "Enabled banking trace (dir_byte_limit: {})",
950 config.banking_trace_dir_byte_limit
951 );
952 } else {
953 info!("Disabled banking trace");
954 }
955 let banking_tracer_channels = banking_tracer.create_channels(false);
956
957 match &config.block_verification_method {
958 BlockVerificationMethod::BlockstoreProcessor => {
959 info!("no scheduler pool is installed for block verification...");
960 if let Some(count) = config.unified_scheduler_handler_threads {
961 warn!(
962 "--unified-scheduler-handler-threads={count} is ignored because unified \
963 scheduler isn't enabled"
964 );
965 }
966 }
967 BlockVerificationMethod::UnifiedScheduler => {
968 let scheduler_pool = DefaultSchedulerPool::new_dyn(
969 config.unified_scheduler_handler_threads,
970 config.runtime_config.log_messages_bytes_limit,
971 transaction_status_sender.clone(),
972 Some(replay_vote_sender.clone()),
973 prioritization_fee_cache.clone(),
974 );
975 bank_forks
976 .write()
977 .unwrap()
978 .install_scheduler_pool(scheduler_pool);
979 }
980 }
981
982 let entry_notification_sender = entry_notifier_service
983 .as_ref()
984 .map(|service| service.sender());
985 let mut process_blockstore = ProcessBlockStore::new(
986 &id,
987 vote_account,
988 &start_progress,
989 &blockstore,
990 original_blockstore_root,
991 &bank_forks,
992 &leader_schedule_cache,
993 &blockstore_process_options,
994 transaction_status_sender.as_ref(),
995 entry_notification_sender,
996 blockstore_root_scan,
997 &snapshot_controller,
998 config,
999 );
1000
1001 maybe_warp_slot(
1002 config,
1003 &mut process_blockstore,
1004 ledger_path,
1005 &bank_forks,
1006 &leader_schedule_cache,
1007 &snapshot_controller,
1008 )
1009 .map_err(ValidatorError::Other)?;
1010
1011 if config.process_ledger_before_services {
1012 process_blockstore
1013 .process()
1014 .map_err(ValidatorError::Other)?;
1015 }
1016 *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1017
1018 let sample_performance_service =
1019 if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1020 Some(SamplePerformanceService::new(
1021 &bank_forks,
1022 blockstore.clone(),
1023 exit.clone(),
1024 ))
1025 } else {
1026 None
1027 };
1028
1029 let mut block_commitment_cache = BlockCommitmentCache::default();
1030 let bank_forks_guard = bank_forks.read().unwrap();
1031 block_commitment_cache.initialize_slots(
1032 bank_forks_guard.working_bank().slot(),
1033 bank_forks_guard.root(),
1034 );
1035 drop(bank_forks_guard);
1036 let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1037
1038 let optimistically_confirmed_bank =
1039 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1040
1041 let max_slots = Arc::new(MaxSlots::default());
1042
1043 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1044
1045 let mut tpu_transactions_forwards_client =
1046 Some(node.sockets.tpu_transaction_forwarding_client);
1047
1048 let connection_cache = match (config.use_tpu_client_next, use_quic) {
1049 (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1050 "connection_cache_tpu_quic",
1051 tpu_connection_pool_size,
1052 Some(
1053 tpu_transactions_forwards_client
1054 .take()
1055 .expect("Socket should exist."),
1056 ),
1057 Some((
1058 &identity_keypair,
1059 node.info
1060 .tpu(Protocol::UDP)
1061 .ok_or_else(|| {
1062 ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1063 })?
1064 .ip(),
1065 )),
1066 Some((&staked_nodes, &identity_keypair.pubkey())),
1067 ))),
1068 (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1069 "connection_cache_tpu_udp",
1070 tpu_connection_pool_size,
1071 ))),
1072 (true, _) => None,
1073 };
1074
1075 let vote_connection_cache = if vote_use_quic {
1076 let vote_connection_cache = ConnectionCache::new_with_client_options(
1077 "connection_cache_vote_quic",
1078 tpu_connection_pool_size,
1079 Some(node.sockets.quic_vote_client),
1080 Some((
1081 &identity_keypair,
1082 node.info
1083 .tpu_vote(Protocol::QUIC)
1084 .ok_or_else(|| {
1085 ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1086 })?
1087 .ip(),
1088 )),
1089 Some((&staked_nodes, &identity_keypair.pubkey())),
1090 );
1091 Arc::new(vote_connection_cache)
1092 } else {
1093 Arc::new(ConnectionCache::with_udp(
1094 "connection_cache_vote_udp",
1095 tpu_connection_pool_size,
1096 ))
1097 };
1098
1099 let current_runtime_handle = tokio::runtime::Handle::try_current();
1105 let tpu_client_next_runtime =
1106 (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1107 tokio::runtime::Builder::new_multi_thread()
1108 .enable_all()
1109 .worker_threads(2)
1110 .thread_name("solTpuClientRt")
1111 .build()
1112 .unwrap()
1113 });
1114
1115 let rpc_override_health_check =
1116 Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1117 let (
1118 json_rpc_service,
1119 rpc_subscriptions,
1120 pubsub_service,
1121 completed_data_sets_sender,
1122 completed_data_sets_service,
1123 rpc_completed_slots_service,
1124 optimistically_confirmed_bank_tracker,
1125 bank_notification_sender,
1126 ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1127 assert_eq!(
1128 node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1129 node.info
1130 .rpc_pubsub()
1131 .map(|addr| socket_addr_space.check(&addr))
1132 );
1133 let (bank_notification_sender, bank_notification_receiver) = unbounded();
1134 let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1135 Some(Arc::new(RwLock::new(bank_notification_senders)))
1136 } else {
1137 None
1138 };
1139
1140 let client_option = if config.use_tpu_client_next {
1141 let runtime_handle = tpu_client_next_runtime
1142 .as_ref()
1143 .map(TokioRuntime::handle)
1144 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1145 ClientOption::TpuClientNext(
1146 Arc::as_ref(&identity_keypair),
1147 node.sockets.rpc_sts_client,
1148 runtime_handle.clone(),
1149 cancel_tpu_client_next.clone(),
1150 )
1151 } else {
1152 let Some(connection_cache) = &connection_cache else {
1153 panic!("ConnectionCache should exist by construction.");
1154 };
1155 ClientOption::ConnectionCache(connection_cache.clone())
1156 };
1157 let rpc_svc_config = JsonRpcServiceConfig {
1158 rpc_addr,
1159 rpc_config: config.rpc_config.clone(),
1160 snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1161 bank_forks: bank_forks.clone(),
1162 block_commitment_cache: block_commitment_cache.clone(),
1163 blockstore: blockstore.clone(),
1164 cluster_info: cluster_info.clone(),
1165 poh_recorder: Some(poh_recorder.clone()),
1166 genesis_hash: genesis_config.hash(),
1167 ledger_path: ledger_path.to_path_buf(),
1168 validator_exit: config.validator_exit.clone(),
1169 exit: exit.clone(),
1170 override_health_check: rpc_override_health_check.clone(),
1171 startup_verification_complete,
1172 optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1173 send_transaction_service_config: config.send_transaction_service_config.clone(),
1174 max_slots: max_slots.clone(),
1175 leader_schedule_cache: leader_schedule_cache.clone(),
1176 max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
1177 prioritization_fee_cache: prioritization_fee_cache.clone(),
1178 client_option,
1179 };
1180 let json_rpc_service =
1181 JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1182 let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1183 exit.clone(),
1184 max_complete_transaction_status_slot,
1185 blockstore.clone(),
1186 bank_forks.clone(),
1187 block_commitment_cache.clone(),
1188 optimistically_confirmed_bank.clone(),
1189 &config.pubsub_config,
1190 None,
1191 ));
1192 let pubsub_service = if !config.rpc_config.full_api {
1193 None
1194 } else {
1195 let (trigger, pubsub_service) = PubSubService::new(
1196 config.pubsub_config.clone(),
1197 &rpc_subscriptions,
1198 rpc_pubsub_addr,
1199 );
1200 config
1201 .validator_exit
1202 .write()
1203 .unwrap()
1204 .register_exit(Box::new(move || trigger.cancel()));
1205
1206 Some(pubsub_service)
1207 };
1208
1209 let (completed_data_sets_sender, completed_data_sets_service) =
1210 if !config.rpc_config.full_api {
1211 (None, None)
1212 } else {
1213 let (completed_data_sets_sender, completed_data_sets_receiver) =
1214 bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1215 let completed_data_sets_service = CompletedDataSetsService::new(
1216 completed_data_sets_receiver,
1217 blockstore.clone(),
1218 rpc_subscriptions.clone(),
1219 exit.clone(),
1220 max_slots.clone(),
1221 );
1222 (
1223 Some(completed_data_sets_sender),
1224 Some(completed_data_sets_service),
1225 )
1226 };
1227
1228 let rpc_completed_slots_service =
1229 if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1230 let (completed_slots_sender, completed_slots_receiver) =
1231 bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1232 blockstore.add_completed_slots_signal(completed_slots_sender);
1233
1234 Some(RpcCompletedSlotsService::spawn(
1235 completed_slots_receiver,
1236 rpc_subscriptions.clone(),
1237 slot_status_notifier.clone(),
1238 exit.clone(),
1239 ))
1240 } else {
1241 None
1242 };
1243
1244 let dependency_tracker = transaction_status_sender
1245 .is_some()
1246 .then_some(dependency_tracker);
1247 let optimistically_confirmed_bank_tracker =
1248 Some(OptimisticallyConfirmedBankTracker::new(
1249 bank_notification_receiver,
1250 exit.clone(),
1251 bank_forks.clone(),
1252 optimistically_confirmed_bank,
1253 rpc_subscriptions.clone(),
1254 confirmed_bank_subscribers,
1255 prioritization_fee_cache.clone(),
1256 dependency_tracker.clone(),
1257 ));
1258 let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1259 sender: bank_notification_sender,
1260 should_send_parents: geyser_plugin_service.is_some(),
1261 dependency_tracker,
1262 });
1263 (
1264 Some(json_rpc_service),
1265 Some(rpc_subscriptions),
1266 pubsub_service,
1267 completed_data_sets_sender,
1268 completed_data_sets_service,
1269 rpc_completed_slots_service,
1270 optimistically_confirmed_bank_tracker,
1271 bank_notification_sender_config,
1272 )
1273 } else {
1274 (None, None, None, None, None, None, None, None)
1275 };
1276
1277 if config.halt_at_slot.is_some() {
1278 block_commitment_cache
1281 .write()
1282 .unwrap()
1283 .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1284
1285 warn!("Validator halted");
1287 *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1288 std::thread::park();
1289 }
1290 let ip_echo_server = match node.sockets.ip_echo {
1291 None => None,
1292 Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1293 tcp_listener,
1294 config.ip_echo_server_threads,
1295 Some(node.info.shred_version()),
1296 )),
1297 };
1298
1299 let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1300
1301 let stats_reporter_service =
1302 StatsReporterService::new(stats_reporter_receiver, exit.clone());
1303
1304 let gossip_service = GossipService::new(
1305 &cluster_info,
1306 Some(bank_forks.clone()),
1307 node.sockets.gossip.clone(),
1308 config.gossip_validators.clone(),
1309 should_check_duplicate_instance,
1310 Some(stats_reporter_sender.clone()),
1311 exit.clone(),
1312 );
1313 let serve_repair = config.repair_handler_type.create_serve_repair(
1314 blockstore.clone(),
1315 cluster_info.clone(),
1316 bank_forks.read().unwrap().sharable_root_bank(),
1317 config.repair_whitelist.clone(),
1318 );
1319 let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1320 let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1321 let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1322 unbounded();
1323
1324 let waited_for_supermajority = wait_for_supermajority(
1325 config,
1326 Some(&mut process_blockstore),
1327 &bank_forks,
1328 &cluster_info,
1329 rpc_override_health_check,
1330 &start_progress,
1331 )?;
1332
1333 let blockstore_metric_report_service =
1334 BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1335
1336 let wait_for_vote_to_start_leader =
1337 !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1338
1339 let poh_service = PohService::new(
1340 poh_recorder.clone(),
1341 &genesis_config.poh_config,
1342 exit.clone(),
1343 bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1344 config.poh_pinned_cpu_core,
1345 config.poh_hashes_per_batch,
1346 record_receiver,
1347 );
1348 assert_eq!(
1349 blockstore.get_new_shred_signals_len(),
1350 1,
1351 "New shred signal for the TVU should be the same as the clear bank signal."
1352 );
1353
1354 let vote_tracker = Arc::<VoteTracker>::default();
1355
1356 let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1357 let (verified_vote_sender, verified_vote_receiver) = unbounded();
1358 let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1359 let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1360
1361 let entry_notification_sender = entry_notifier_service
1362 .as_ref()
1363 .map(|service| service.sender_cloned());
1364
1365 let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1366 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1367 .then(|| {
1368 tokio::runtime::Builder::new_multi_thread()
1369 .enable_all()
1370 .thread_name("solTurbineQuic")
1371 .build()
1372 .unwrap()
1373 });
1374 let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1375 let (
1376 turbine_quic_endpoint,
1377 turbine_quic_endpoint_sender,
1378 turbine_quic_endpoint_join_handle,
1379 ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1380 let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1381 (None, sender, None)
1382 } else {
1383 solana_turbine::quic_endpoint::new_quic_endpoint(
1384 turbine_quic_endpoint_runtime
1385 .as_ref()
1386 .map(TokioRuntime::handle)
1387 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1388 &identity_keypair,
1389 node.sockets.tvu_quic,
1390 turbine_quic_endpoint_sender,
1391 bank_forks.clone(),
1392 )
1393 .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1394 .unwrap()
1395 };
1396
1397 let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1399 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1400 .then(|| {
1401 tokio::runtime::Builder::new_multi_thread()
1402 .enable_all()
1403 .thread_name("solRepairQuic")
1404 .build()
1405 .unwrap()
1406 });
1407 let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1408 if genesis_config.cluster_type == ClusterType::MainnetBeta {
1409 (None, RepairQuicAsyncSenders::new_dummy(), None)
1410 } else {
1411 let repair_quic_sockets = RepairQuicSockets {
1412 repair_server_quic_socket: node.sockets.serve_repair_quic,
1413 repair_client_quic_socket: node.sockets.repair_quic,
1414 ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1415 };
1416 let repair_quic_senders = RepairQuicSenders {
1417 repair_request_quic_sender: repair_request_quic_sender.clone(),
1418 repair_response_quic_sender,
1419 ancestor_hashes_response_quic_sender,
1420 };
1421 repair::quic_endpoint::new_quic_endpoints(
1422 repair_quic_endpoints_runtime
1423 .as_ref()
1424 .map(TokioRuntime::handle)
1425 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1426 &identity_keypair,
1427 repair_quic_sockets,
1428 repair_quic_senders,
1429 bank_forks.clone(),
1430 )
1431 .map(|(endpoints, senders, join_handle)| {
1432 (Some(endpoints), senders, Some(join_handle))
1433 })
1434 .unwrap()
1435 };
1436 let serve_repair_service = ServeRepairService::new(
1437 serve_repair,
1438 repair_request_quic_sender,
1441 repair_request_quic_receiver,
1442 repair_quic_async_senders.repair_response_quic_sender,
1443 node.sockets.serve_repair,
1444 socket_addr_space,
1445 stats_reporter_sender,
1446 exit.clone(),
1447 );
1448
1449 let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1450 let wen_restart_repair_slots = if in_wen_restart {
1451 Some(Arc::new(RwLock::new(Vec::new())))
1452 } else {
1453 None
1454 };
1455 let tower = match process_blockstore.process_to_create_tower() {
1456 Ok(tower) => {
1457 info!("Tower state: {tower:?}");
1458 tower
1459 }
1460 Err(e) => {
1461 warn!("Unable to retrieve tower: {e:?} creating default tower....");
1462 Tower::default()
1463 }
1464 };
1465 let last_vote = tower.last_vote();
1466
1467 let outstanding_repair_requests =
1468 Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1469 let cluster_slots =
1470 Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1471
1472 let connection_cache_for_warmup =
1474 if json_rpc_service.is_some() && connection_cache.is_some() {
1475 connection_cache.as_ref()
1476 } else {
1477 None
1478 };
1479 let (xdp_retransmitter, xdp_sender) =
1480 if let Some(xdp_config) = config.retransmit_xdp.clone() {
1481 let src_port = node.sockets.retransmit_sockets[0]
1482 .local_addr()
1483 .expect("failed to get local address")
1484 .port();
1485 let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port)
1486 .expect("failed to create xdp retransmitter");
1487 (Some(rtx), Some(sender))
1488 } else {
1489 (None, None)
1490 };
1491
1492 let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet
1494 || genesis_config.cluster_type == ClusterType::Development
1495 {
1496 node.sockets.alpenglow
1497 } else {
1498 None
1499 };
1500
1501 let tvu = Tvu::new(
1502 vote_account,
1503 authorized_voter_keypairs,
1504 &bank_forks,
1505 &cluster_info,
1506 TvuSockets {
1507 repair: node.sockets.repair.try_clone().unwrap(),
1508 retransmit: node.sockets.retransmit_sockets,
1509 fetch: node.sockets.tvu,
1510 ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1511 alpenglow: alpenglow_socket,
1512 },
1513 blockstore.clone(),
1514 ledger_signal_receiver,
1515 rpc_subscriptions.clone(),
1516 &poh_recorder,
1517 tower,
1518 config.tower_storage.clone(),
1519 &leader_schedule_cache,
1520 exit.clone(),
1521 block_commitment_cache,
1522 config.turbine_disabled.clone(),
1523 transaction_status_sender.clone(),
1524 entry_notification_sender.clone(),
1525 vote_tracker.clone(),
1526 retransmit_slots_sender,
1527 gossip_verified_vote_hash_receiver,
1528 verified_vote_receiver,
1529 replay_vote_sender.clone(),
1530 completed_data_sets_sender,
1531 bank_notification_sender.clone(),
1532 duplicate_confirmed_slots_receiver,
1533 TvuConfig {
1534 max_ledger_shreds: config.max_ledger_shreds,
1535 shred_version: node.info.shred_version(),
1536 repair_validators: config.repair_validators.clone(),
1537 repair_whitelist: config.repair_whitelist.clone(),
1538 wait_for_vote_to_start_leader,
1539 replay_forks_threads: config.replay_forks_threads,
1540 replay_transactions_threads: config.replay_transactions_threads,
1541 shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1542 xdp_sender: xdp_sender.clone(),
1543 },
1544 &max_slots,
1545 block_metadata_notifier,
1546 config.wait_to_vote_slot,
1547 Some(snapshot_controller.clone()),
1548 config.runtime_config.log_messages_bytes_limit,
1549 connection_cache_for_warmup,
1550 &prioritization_fee_cache,
1551 banking_tracer.clone(),
1552 turbine_quic_endpoint_sender.clone(),
1553 turbine_quic_endpoint_receiver,
1554 repair_response_quic_receiver,
1555 repair_quic_async_senders.repair_request_quic_sender,
1556 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1557 ancestor_hashes_response_quic_receiver,
1558 outstanding_repair_requests.clone(),
1559 cluster_slots.clone(),
1560 wen_restart_repair_slots.clone(),
1561 slot_status_notifier,
1562 vote_connection_cache,
1563 )
1564 .map_err(ValidatorError::Other)?;
1565
1566 if in_wen_restart {
1567 info!("Waiting for wen_restart to finish");
1568 wait_for_wen_restart(WenRestartConfig {
1569 wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1570 wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1571 last_vote,
1572 blockstore: blockstore.clone(),
1573 cluster_info: cluster_info.clone(),
1574 bank_forks: bank_forks.clone(),
1575 wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1576 wait_for_supermajority_threshold_percent:
1577 WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1578 snapshot_controller: Some(snapshot_controller.clone()),
1579 abs_status: accounts_background_service.status().clone(),
1580 genesis_config_hash: genesis_config.hash(),
1581 exit: exit.clone(),
1582 })?;
1583 return Err(ValidatorError::WenRestartFinished.into());
1584 }
1585
1586 let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1587 let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1588 ForwardingClientOption::ConnectionCache(connection_cache.clone())
1589 } else {
1590 let runtime_handle = tpu_client_next_runtime
1591 .as_ref()
1592 .map(TokioRuntime::handle)
1593 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1594 ForwardingClientOption::TpuClientNext((
1595 Arc::as_ref(&identity_keypair),
1596 tpu_transactions_forwards_client
1597 .take()
1598 .expect("Socket should exist."),
1599 runtime_handle.clone(),
1600 cancel_tpu_client_next,
1601 ))
1602 };
1603 let tpu = Tpu::new_with_client(
1604 &cluster_info,
1605 &poh_recorder,
1606 transaction_recorder,
1607 entry_receiver,
1608 retransmit_slots_receiver,
1609 TpuSockets {
1610 transactions: node.sockets.tpu,
1611 transaction_forwards: node.sockets.tpu_forwards,
1612 vote: node.sockets.tpu_vote,
1613 broadcast: node.sockets.broadcast,
1614 transactions_quic: node.sockets.tpu_quic,
1615 transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1616 vote_quic: node.sockets.tpu_vote_quic,
1617 vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1618 vortexor_receivers: node.sockets.vortexor_receivers,
1619 },
1620 rpc_subscriptions.clone(),
1621 transaction_status_sender,
1622 entry_notification_sender,
1623 blockstore.clone(),
1624 &config.broadcast_stage_type,
1625 xdp_sender,
1626 exit,
1627 node.info.shred_version(),
1628 vote_tracker,
1629 bank_forks.clone(),
1630 verified_vote_sender,
1631 gossip_verified_vote_hash_sender,
1632 replay_vote_receiver,
1633 replay_vote_sender,
1634 bank_notification_sender,
1635 config.tpu_coalesce,
1636 duplicate_confirmed_slot_sender,
1637 forwarding_tpu_client,
1638 turbine_quic_endpoint_sender,
1639 &identity_keypair,
1640 config.runtime_config.log_messages_bytes_limit,
1641 &staked_nodes,
1642 config.staked_nodes_overrides.clone(),
1643 banking_tracer_channels,
1644 tracer_thread,
1645 tpu_enable_udp,
1646 tpu_quic_server_config,
1647 tpu_fwd_quic_server_config,
1648 vote_quic_server_config,
1649 &prioritization_fee_cache,
1650 config.block_production_method.clone(),
1651 config.block_production_num_workers,
1652 config.transaction_struct.clone(),
1653 config.enable_block_production_forwarding,
1654 config.generator_config.clone(),
1655 key_notifiers.clone(),
1656 );
1657
1658 datapoint_info!(
1659 "validator-new",
1660 ("id", id.to_string(), String),
1661 ("version", solana_version::version!(), String),
1662 ("cluster_type", genesis_config.cluster_type as u32, i64),
1663 ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1664 ("waited_for_supermajority", waited_for_supermajority, bool),
1665 ("shred_version", shred_version as i64, i64),
1666 );
1667
1668 *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1669 if config.use_tpu_client_next {
1670 if let Some(json_rpc_service) = &json_rpc_service {
1671 key_notifiers.write().unwrap().add(
1672 KeyUpdaterType::RpcService,
1673 json_rpc_service.get_client_key_updater(),
1674 );
1675 }
1676 }
1679
1680 *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1681 bank_forks: bank_forks.clone(),
1682 cluster_info: cluster_info.clone(),
1683 vote_account: *vote_account,
1684 repair_whitelist: config.repair_whitelist.clone(),
1685 notifies: key_notifiers,
1686 repair_socket: Arc::new(node.sockets.repair),
1687 outstanding_repair_requests,
1688 cluster_slots,
1689 node: Some(Arc::new(node_multihoming)),
1690 });
1691
1692 Ok(Self {
1693 stats_reporter_service,
1694 gossip_service,
1695 serve_repair_service,
1696 json_rpc_service,
1697 pubsub_service,
1698 rpc_completed_slots_service,
1699 optimistically_confirmed_bank_tracker,
1700 transaction_status_service,
1701 entry_notifier_service,
1702 system_monitor_service,
1703 sample_performance_service,
1704 snapshot_packager_service,
1705 completed_data_sets_service,
1706 tpu,
1707 tvu,
1708 poh_service,
1709 poh_recorder,
1710 ip_echo_server,
1711 validator_exit: config.validator_exit.clone(),
1712 cluster_info,
1713 bank_forks,
1714 blockstore,
1715 geyser_plugin_service,
1716 blockstore_metric_report_service,
1717 accounts_background_service,
1718 turbine_quic_endpoint,
1719 turbine_quic_endpoint_runtime,
1720 turbine_quic_endpoint_join_handle,
1721 repair_quic_endpoints,
1722 repair_quic_endpoints_runtime,
1723 repair_quic_endpoints_join_handle,
1724 xdp_retransmitter,
1725 _tpu_client_next_runtime: tpu_client_next_runtime,
1726 })
1727 }
1728
1729 pub fn exit(&mut self) {
1731 self.validator_exit.write().unwrap().exit();
1732
1733 self.blockstore.drop_signal();
1735 }
1736
1737 pub fn close(mut self) {
1738 self.exit();
1739 self.join();
1740 }
1741
1742 fn print_node_info(node: &Node) {
1743 info!("{:?}", node.info);
1744 info!(
1745 "local gossip address: {}",
1746 node.sockets.gossip[0].local_addr().unwrap()
1747 );
1748 info!(
1749 "local broadcast address: {}",
1750 node.sockets
1751 .broadcast
1752 .first()
1753 .unwrap()
1754 .local_addr()
1755 .unwrap()
1756 );
1757 info!(
1758 "local repair address: {}",
1759 node.sockets.repair.local_addr().unwrap()
1760 );
1761 info!(
1762 "local retransmit address: {}",
1763 node.sockets.retransmit_sockets[0].local_addr().unwrap()
1764 );
1765 }
1766
1767 pub fn join(self) {
1768 drop(self.bank_forks);
1769 drop(self.cluster_info);
1770
1771 self.poh_service.join().expect("poh_service");
1772 drop(self.poh_recorder);
1773
1774 if let Some(json_rpc_service) = self.json_rpc_service {
1775 json_rpc_service.join().expect("rpc_service");
1776 }
1777
1778 if let Some(pubsub_service) = self.pubsub_service {
1779 pubsub_service.join().expect("pubsub_service");
1780 }
1781
1782 if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1783 rpc_completed_slots_service
1784 .join()
1785 .expect("rpc_completed_slots_service");
1786 }
1787
1788 if let Some(optimistically_confirmed_bank_tracker) =
1789 self.optimistically_confirmed_bank_tracker
1790 {
1791 optimistically_confirmed_bank_tracker
1792 .join()
1793 .expect("optimistically_confirmed_bank_tracker");
1794 }
1795
1796 if let Some(transaction_status_service) = self.transaction_status_service {
1797 transaction_status_service
1798 .join()
1799 .expect("transaction_status_service");
1800 }
1801
1802 if let Some(system_monitor_service) = self.system_monitor_service {
1803 system_monitor_service
1804 .join()
1805 .expect("system_monitor_service");
1806 }
1807
1808 if let Some(sample_performance_service) = self.sample_performance_service {
1809 sample_performance_service
1810 .join()
1811 .expect("sample_performance_service");
1812 }
1813
1814 if let Some(entry_notifier_service) = self.entry_notifier_service {
1815 entry_notifier_service
1816 .join()
1817 .expect("entry_notifier_service");
1818 }
1819
1820 if let Some(s) = self.snapshot_packager_service {
1821 s.join().expect("snapshot_packager_service");
1822 }
1823
1824 self.gossip_service.join().expect("gossip_service");
1825 self.repair_quic_endpoints
1826 .iter()
1827 .flatten()
1828 .for_each(repair::quic_endpoint::close_quic_endpoint);
1829 self.serve_repair_service
1830 .join()
1831 .expect("serve_repair_service");
1832 if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1833 self.repair_quic_endpoints_runtime
1834 .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1835 .transpose()
1836 .unwrap();
1837 }
1838 self.stats_reporter_service
1839 .join()
1840 .expect("stats_reporter_service");
1841 self.blockstore_metric_report_service
1842 .join()
1843 .expect("ledger_metric_report_service");
1844 self.accounts_background_service
1845 .join()
1846 .expect("accounts_background_service");
1847 if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1848 solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1849 }
1850 if let Some(xdp_retransmitter) = self.xdp_retransmitter {
1851 xdp_retransmitter.join().expect("xdp_retransmitter");
1852 }
1853 self.tpu.join().expect("tpu");
1854 self.tvu.join().expect("tvu");
1855 if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1856 self.turbine_quic_endpoint_runtime
1857 .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1858 .transpose()
1859 .unwrap();
1860 }
1861 if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1862 completed_data_sets_service
1863 .join()
1864 .expect("completed_data_sets_service");
1865 }
1866 if let Some(ip_echo_server) = self.ip_echo_server {
1867 ip_echo_server.shutdown_background();
1868 }
1869
1870 if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1871 geyser_plugin_service.join().expect("geyser_plugin_service");
1872 }
1873 }
1874}
1875
1876fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1877 if let Some(account) = &bank.get_account(vote_account) {
1878 if let Some(vote_state) = vote_state::from(account) {
1879 return !vote_state.votes.is_empty();
1880 }
1881 }
1882 false
1883}
1884
1885fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1886 let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1887 warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1888 return Ok(());
1889 };
1890
1891 let ticks_per_slot = bank.ticks_per_slot();
1892 let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1893 let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1894
1895 let hash_time = compute_hash_time(hash_samples);
1896 let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1897
1898 let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1899 let target_hashes_per_second =
1900 (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1901
1902 info!(
1903 "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1904 second {target_hashes_per_second}"
1905 );
1906 if my_hashes_per_second < target_hashes_per_second {
1907 return Err(ValidatorError::PohTooSlow {
1908 mine: my_hashes_per_second,
1909 target: target_hashes_per_second,
1910 });
1911 }
1912
1913 Ok(())
1914}
1915
1916fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1917 if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1919 if wait_slot_for_supermajority == root_slot {
1920 return Some(wait_slot_for_supermajority);
1921 }
1922 }
1923
1924 None
1925}
1926
1927fn post_process_restored_tower(
1928 restored_tower: crate::consensus::Result<Tower>,
1929 validator_identity: &Pubkey,
1930 vote_account: &Pubkey,
1931 config: &ValidatorConfig,
1932 bank_forks: &BankForks,
1933) -> Result<Tower, String> {
1934 let mut should_require_tower = config.require_tower;
1935
1936 let restored_tower = restored_tower.and_then(|tower| {
1937 let root_bank = bank_forks.root_bank();
1938 let slot_history = root_bank.get_slot_history();
1939 let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1941
1942 if let Some(hard_fork_restart_slot) =
1943 maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1944 {
1945 let message =
1949 format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1950 datapoint_error!("tower_error", ("error", message, String),);
1951 error!("{message}");
1952
1953 should_require_tower = false;
1956 return Err(crate::consensus::TowerError::HardFork(
1957 hard_fork_restart_slot,
1958 ));
1959 }
1960
1961 if let Some(warp_slot) = config.warp_slot {
1962 should_require_tower = false;
1965 return Err(crate::consensus::TowerError::HardFork(warp_slot));
1966 }
1967
1968 tower
1969 });
1970
1971 let restored_tower = match restored_tower {
1972 Ok(tower) => tower,
1973 Err(err) => {
1974 let voting_has_been_active =
1975 active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1976 if !err.is_file_missing() {
1977 datapoint_error!(
1978 "tower_error",
1979 ("error", format!("Unable to restore tower: {err}"), String),
1980 );
1981 }
1982 if should_require_tower && voting_has_been_active {
1983 return Err(format!(
1984 "Requested mandatory tower restore failed: {err}. And there is an existing \
1985 vote_account containing actual votes. Aborting due to possible conflicting \
1986 duplicate votes"
1987 ));
1988 }
1989 if err.is_file_missing() && !voting_has_been_active {
1990 info!(
1992 "Ignoring expected failed tower restore because this is the initial validator \
1993 start with the vote account..."
1994 );
1995 } else {
1996 error!(
1997 "Rebuilding a new tower from the latest vote account due to failed tower \
1998 restore: {err}"
1999 );
2000 }
2001
2002 Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2003 }
2004 };
2005
2006 Ok(restored_tower)
2007}
2008
2009fn load_genesis(
2010 config: &ValidatorConfig,
2011 ledger_path: &Path,
2012) -> Result<GenesisConfig, ValidatorError> {
2013 let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2014 .map_err(ValidatorError::OpenGenesisConfig)?;
2015
2016 let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2019 let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2020 let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2021 assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2022
2023 let genesis_hash = genesis_config.hash();
2024 info!("genesis hash: {genesis_hash}");
2025
2026 if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2027 if genesis_hash != expected_genesis_hash {
2028 return Err(ValidatorError::GenesisHashMismatch(
2029 genesis_hash,
2030 expected_genesis_hash,
2031 ));
2032 }
2033 }
2034
2035 Ok(genesis_config)
2036}
2037
2038#[allow(clippy::type_complexity)]
2039fn load_blockstore(
2040 config: &ValidatorConfig,
2041 ledger_path: &Path,
2042 genesis_config: &GenesisConfig,
2043 exit: Arc<AtomicBool>,
2044 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2045 accounts_update_notifier: Option<AccountsUpdateNotifier>,
2046 transaction_notifier: Option<TransactionNotifierArc>,
2047 entry_notifier: Option<EntryNotifierArc>,
2048 dependency_tracker: Option<Arc<DependencyTracker>>,
2049) -> Result<
2050 (
2051 Arc<RwLock<BankForks>>,
2052 Arc<Blockstore>,
2053 Slot,
2054 Receiver<bool>,
2055 LeaderScheduleCache,
2056 Option<StartingSnapshotHashes>,
2057 TransactionHistoryServices,
2058 blockstore_processor::ProcessOptions,
2059 BlockstoreRootScan,
2060 DroppedSlotsReceiver,
2061 Option<EntryNotifierService>,
2062 ),
2063 String,
2064> {
2065 info!("loading ledger from {ledger_path:?}...");
2066 *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2067
2068 let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2069 .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2070
2071 let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2072 blockstore.add_new_shred_signal(ledger_signal_sender);
2073
2074 let original_blockstore_root = blockstore.max_root();
2077
2078 let blockstore = Arc::new(blockstore);
2079 let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2080 let halt_at_slot = config
2081 .halt_at_slot
2082 .or_else(|| blockstore.highest_slot().unwrap_or(None));
2083
2084 let process_options = blockstore_processor::ProcessOptions {
2085 run_verification: config.run_verification,
2086 halt_at_slot,
2087 new_hard_forks: config.new_hard_forks.clone(),
2088 debug_keys: config.debug_keys.clone(),
2089 accounts_db_config: config.accounts_db_config.clone(),
2090 accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2091 accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2092 runtime_config: config.runtime_config.clone(),
2093 use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2094 ..blockstore_processor::ProcessOptions::default()
2095 };
2096
2097 let enable_rpc_transaction_history =
2098 config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2099 let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2100 let transaction_history_services =
2101 if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2102 initialize_rpc_transaction_history_services(
2103 blockstore.clone(),
2104 exit.clone(),
2105 enable_rpc_transaction_history,
2106 config.rpc_config.enable_extended_tx_metadata_storage,
2107 transaction_notifier,
2108 dependency_tracker,
2109 )
2110 } else {
2111 TransactionHistoryServices::default()
2112 };
2113
2114 let entry_notifier_service = entry_notifier
2115 .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2116
2117 let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2118 bank_forks_utils::load_bank_forks(
2119 genesis_config,
2120 &blockstore,
2121 config.account_paths.clone(),
2122 &config.snapshot_config,
2123 &process_options,
2124 transaction_history_services
2125 .transaction_status_sender
2126 .as_ref(),
2127 entry_notifier_service
2128 .as_ref()
2129 .map(|service| service.sender()),
2130 accounts_update_notifier,
2131 exit,
2132 )
2133 .map_err(|err| err.to_string())?;
2134
2135 let pruned_banks_receiver =
2141 AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2142
2143 leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2144
2145 Ok((
2146 bank_forks,
2147 blockstore,
2148 original_blockstore_root,
2149 ledger_signal_receiver,
2150 leader_schedule_cache,
2151 starting_snapshot_hashes,
2152 transaction_history_services,
2153 process_options,
2154 blockstore_root_scan,
2155 pruned_banks_receiver,
2156 entry_notifier_service,
2157 ))
2158}
2159
2160pub struct ProcessBlockStore<'a> {
2161 id: &'a Pubkey,
2162 vote_account: &'a Pubkey,
2163 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2164 blockstore: &'a Blockstore,
2165 original_blockstore_root: Slot,
2166 bank_forks: &'a Arc<RwLock<BankForks>>,
2167 leader_schedule_cache: &'a LeaderScheduleCache,
2168 process_options: &'a blockstore_processor::ProcessOptions,
2169 transaction_status_sender: Option<&'a TransactionStatusSender>,
2170 entry_notification_sender: Option<&'a EntryNotifierSender>,
2171 blockstore_root_scan: Option<BlockstoreRootScan>,
2172 snapshot_controller: &'a SnapshotController,
2173 config: &'a ValidatorConfig,
2174 tower: Option<Tower>,
2175}
2176
2177impl<'a> ProcessBlockStore<'a> {
2178 #[allow(clippy::too_many_arguments)]
2179 fn new(
2180 id: &'a Pubkey,
2181 vote_account: &'a Pubkey,
2182 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2183 blockstore: &'a Blockstore,
2184 original_blockstore_root: Slot,
2185 bank_forks: &'a Arc<RwLock<BankForks>>,
2186 leader_schedule_cache: &'a LeaderScheduleCache,
2187 process_options: &'a blockstore_processor::ProcessOptions,
2188 transaction_status_sender: Option<&'a TransactionStatusSender>,
2189 entry_notification_sender: Option<&'a EntryNotifierSender>,
2190 blockstore_root_scan: BlockstoreRootScan,
2191 snapshot_controller: &'a SnapshotController,
2192 config: &'a ValidatorConfig,
2193 ) -> Self {
2194 Self {
2195 id,
2196 vote_account,
2197 start_progress,
2198 blockstore,
2199 original_blockstore_root,
2200 bank_forks,
2201 leader_schedule_cache,
2202 process_options,
2203 transaction_status_sender,
2204 entry_notification_sender,
2205 blockstore_root_scan: Some(blockstore_root_scan),
2206 snapshot_controller,
2207 config,
2208 tower: None,
2209 }
2210 }
2211
2212 pub(crate) fn process(&mut self) -> Result<(), String> {
2213 if self.tower.is_none() {
2214 let previous_start_process = *self.start_progress.read().unwrap();
2215 *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2216
2217 let exit = Arc::new(AtomicBool::new(false));
2218 if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2219 let bank_forks = self.bank_forks.clone();
2220 let exit = exit.clone();
2221 let start_progress = self.start_progress.clone();
2222
2223 let _ = Builder::new()
2224 .name("solRptLdgrStat".to_string())
2225 .spawn(move || {
2226 while !exit.load(Ordering::Relaxed) {
2227 let slot = bank_forks.read().unwrap().working_bank().slot();
2228 *start_progress.write().unwrap() =
2229 ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2230 sleep(Duration::from_secs(2));
2231 }
2232 })
2233 .unwrap();
2234 }
2235 blockstore_processor::process_blockstore_from_root(
2236 self.blockstore,
2237 self.bank_forks,
2238 self.leader_schedule_cache,
2239 self.process_options,
2240 self.transaction_status_sender,
2241 self.entry_notification_sender,
2242 Some(self.snapshot_controller),
2243 )
2244 .map_err(|err| {
2245 exit.store(true, Ordering::Relaxed);
2246 format!("Failed to load ledger: {err:?}")
2247 })?;
2248 exit.store(true, Ordering::Relaxed);
2249
2250 if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2251 blockstore_root_scan.join();
2252 }
2253
2254 self.tower = Some({
2255 let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2256 if let Ok(tower) = &restored_tower {
2257 reconcile_blockstore_roots_with_external_source(
2259 ExternalRootSource::Tower(tower.root()),
2260 self.blockstore,
2261 &mut self.original_blockstore_root,
2262 )
2263 .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2264 }
2265
2266 post_process_restored_tower(
2267 restored_tower,
2268 self.id,
2269 self.vote_account,
2270 self.config,
2271 &self.bank_forks.read().unwrap(),
2272 )?
2273 });
2274
2275 if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2276 self.config,
2277 self.bank_forks.read().unwrap().root(),
2278 ) {
2279 reconcile_blockstore_roots_with_external_source(
2282 ExternalRootSource::HardFork(hard_fork_restart_slot),
2283 self.blockstore,
2284 &mut self.original_blockstore_root,
2285 )
2286 .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2287 }
2288
2289 *self.start_progress.write().unwrap() = previous_start_process;
2290 }
2291 Ok(())
2292 }
2293
2294 pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2295 self.process()?;
2296 Ok(self.tower.unwrap())
2297 }
2298}
2299
2300fn maybe_warp_slot(
2301 config: &ValidatorConfig,
2302 process_blockstore: &mut ProcessBlockStore,
2303 ledger_path: &Path,
2304 bank_forks: &RwLock<BankForks>,
2305 leader_schedule_cache: &LeaderScheduleCache,
2306 snapshot_controller: &SnapshotController,
2307) -> Result<(), String> {
2308 if let Some(warp_slot) = config.warp_slot {
2309 let mut bank_forks = bank_forks.write().unwrap();
2310
2311 let working_bank = bank_forks.working_bank();
2312
2313 if warp_slot <= working_bank.slot() {
2314 return Err(format!(
2315 "warp slot ({}) cannot be less than the working bank slot ({})",
2316 warp_slot,
2317 working_bank.slot()
2318 ));
2319 }
2320 info!("warping to slot {warp_slot}");
2321
2322 let root_bank = bank_forks.root_bank();
2323
2324 root_bank.squash();
2328 root_bank.force_flush_accounts_cache();
2329
2330 bank_forks.insert(Bank::warp_from_parent(
2331 root_bank,
2332 &Pubkey::default(),
2333 warp_slot,
2334 ));
2335 bank_forks
2336 .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2337 .map_err(|err| err.to_string())?;
2338 leader_schedule_cache.set_root(&bank_forks.root_bank());
2339
2340 let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2341 ledger_path,
2342 &bank_forks.root_bank(),
2343 None,
2344 &config.snapshot_config.full_snapshot_archives_dir,
2345 &config.snapshot_config.incremental_snapshot_archives_dir,
2346 config.snapshot_config.archive_format,
2347 ) {
2348 Ok(archive_info) => archive_info,
2349 Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2350 };
2351 info!(
2352 "created snapshot: {}",
2353 full_snapshot_archive_info.path().display()
2354 );
2355
2356 drop(bank_forks);
2357 process_blockstore.process()?;
2360 }
2361 Ok(())
2362}
2363
2364fn should_cleanup_blockstore_incorrect_shred_versions(
2367 config: &ValidatorConfig,
2368 blockstore: &Blockstore,
2369 root_slot: Slot,
2370 hard_forks: &HardForks,
2371) -> Result<Option<Slot>, BlockstoreError> {
2372 let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2374 if maybe_cluster_restart_slot.is_some() {
2375 return Ok(Some(root_slot + 1));
2376 }
2377
2378 let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2380 return Ok(None);
2381 };
2382
2383 let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2385 return Ok(None);
2386 };
2387 let blockstore_min_slot = blockstore.lowest_slot();
2388 info!(
2389 "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2390 latest hard fork is {latest_hard_fork}"
2391 );
2392
2393 if latest_hard_fork < blockstore_min_slot {
2394 Ok(None)
2402 } else if latest_hard_fork < blockstore_max_slot {
2403 Ok(Some(latest_hard_fork + 1))
2414 } else {
2415 Ok(None)
2423 }
2424}
2425
2426fn scan_blockstore_for_incorrect_shred_version(
2429 blockstore: &Blockstore,
2430 start_slot: Slot,
2431 expected_shred_version: u16,
2432) -> Result<Option<u16>, BlockstoreError> {
2433 const TIMEOUT: Duration = Duration::from_secs(60);
2434 let timer = Instant::now();
2435 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2437
2438 info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2439 for (slot, _meta) in slot_meta_iterator {
2440 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2441 for shred in &shreds {
2442 if shred.version() != expected_shred_version {
2443 return Ok(Some(shred.version()));
2444 }
2445 }
2446 if timer.elapsed() > TIMEOUT {
2447 info!("Didn't find incorrect shreds after 60 seconds, aborting");
2448 break;
2449 }
2450 }
2451 Ok(None)
2452}
2453
2454fn cleanup_blockstore_incorrect_shred_versions(
2457 blockstore: &Blockstore,
2458 config: &ValidatorConfig,
2459 start_slot: Slot,
2460 expected_shred_version: u16,
2461) -> Result<(), BlockstoreError> {
2462 let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2463 blockstore,
2464 start_slot,
2465 expected_shred_version,
2466 )?;
2467 let Some(incorrect_shred_version) = incorrect_shred_version else {
2468 info!("Only shreds with the correct version were found in the blockstore");
2469 return Ok(());
2470 };
2471
2472 let end_slot = blockstore.highest_slot()?.unwrap();
2474
2475 let backup_folder = format!(
2478 "{BLOCKSTORE_DIRECTORY_ROCKS_LEVEL}_backup_{incorrect_shred_version}_{start_slot}_{end_slot}"
2479 );
2480 match Blockstore::open_with_options(
2481 &blockstore.ledger_path().join(backup_folder),
2482 config.blockstore_options.clone(),
2483 ) {
2484 Ok(backup_blockstore) => {
2485 info!("Backing up slots from {start_slot} to {end_slot}");
2486 let mut timer = Measure::start("blockstore backup");
2487
2488 const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2489 let mut print_timer = Instant::now();
2490 let mut num_slots_copied = 0;
2491 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2492 for (slot, _meta) in slot_meta_iterator {
2493 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2494 let shreds = shreds.into_iter().map(Cow::Owned);
2495 let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2496 num_slots_copied += 1;
2497
2498 if print_timer.elapsed() > PRINT_INTERVAL {
2499 info!("Backed up {num_slots_copied} slots thus far");
2500 print_timer = Instant::now();
2501 }
2502 }
2503
2504 timer.stop();
2505 info!("Backing up slots done. {timer}");
2506 }
2507 Err(err) => {
2508 warn!("Unable to backup shreds with incorrect shred version: {err}");
2509 }
2510 }
2511
2512 info!("Purging slots {start_slot} to {end_slot} from blockstore");
2513 let mut timer = Measure::start("blockstore purge");
2514 blockstore.purge_from_next_slots(start_slot, end_slot);
2515 blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2516 timer.stop();
2517 info!("Purging slots done. {timer}");
2518
2519 Ok(())
2520}
2521
2522fn initialize_rpc_transaction_history_services(
2523 blockstore: Arc<Blockstore>,
2524 exit: Arc<AtomicBool>,
2525 enable_rpc_transaction_history: bool,
2526 enable_extended_tx_metadata_storage: bool,
2527 transaction_notifier: Option<TransactionNotifierArc>,
2528 dependency_tracker: Option<Arc<DependencyTracker>>,
2529) -> TransactionHistoryServices {
2530 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2531 let (transaction_status_sender, transaction_status_receiver) = unbounded();
2532 let transaction_status_sender = Some(TransactionStatusSender {
2533 sender: transaction_status_sender,
2534 dependency_tracker: dependency_tracker.clone(),
2535 });
2536 let transaction_status_service = Some(TransactionStatusService::new(
2537 transaction_status_receiver,
2538 max_complete_transaction_status_slot.clone(),
2539 enable_rpc_transaction_history,
2540 transaction_notifier,
2541 blockstore.clone(),
2542 enable_extended_tx_metadata_storage,
2543 dependency_tracker,
2544 exit.clone(),
2545 ));
2546
2547 TransactionHistoryServices {
2548 transaction_status_sender,
2549 transaction_status_service,
2550 max_complete_transaction_status_slot,
2551 }
2552}
2553
2554#[derive(Error, Debug)]
2555pub enum ValidatorError {
2556 #[error("bank hash mismatch: actual={0}, expected={1}")]
2557 BankHashMismatch(Hash, Hash),
2558
2559 #[error("blockstore error: {0}")]
2560 Blockstore(#[source] BlockstoreError),
2561
2562 #[error("genesis hash mismatch: actual={0}, expected={1}")]
2563 GenesisHashMismatch(Hash, Hash),
2564
2565 #[error(
2566 "ledger does not have enough data to wait for supermajority: current slot={0}, needed \
2567 slot={1}"
2568 )]
2569 NotEnoughLedgerData(Slot, Slot),
2570
2571 #[error("failed to open genesis: {0}")]
2572 OpenGenesisConfig(#[source] OpenGenesisConfigError),
2573
2574 #[error("{0}")]
2575 Other(String),
2576
2577 #[error(
2578 "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2579 )]
2580 PohTooSlow { mine: u64, target: u64 },
2581
2582 #[error("shred version mismatch: actual {actual}, expected {expected}")]
2583 ShredVersionMismatch { actual: u16, expected: u16 },
2584
2585 #[error(transparent)]
2586 TraceError(#[from] TraceError),
2587
2588 #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2589 WenRestartFinished,
2590}
2591
2592fn wait_for_supermajority(
2599 config: &ValidatorConfig,
2600 process_blockstore: Option<&mut ProcessBlockStore>,
2601 bank_forks: &RwLock<BankForks>,
2602 cluster_info: &ClusterInfo,
2603 rpc_override_health_check: Arc<AtomicBool>,
2604 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2605) -> Result<bool, ValidatorError> {
2606 match config.wait_for_supermajority {
2607 None => Ok(false),
2608 Some(wait_for_supermajority_slot) => {
2609 if let Some(process_blockstore) = process_blockstore {
2610 process_blockstore
2611 .process()
2612 .map_err(ValidatorError::Other)?;
2613 }
2614
2615 let bank = bank_forks.read().unwrap().working_bank();
2616 match wait_for_supermajority_slot.cmp(&bank.slot()) {
2617 std::cmp::Ordering::Less => return Ok(false),
2618 std::cmp::Ordering::Greater => {
2619 return Err(ValidatorError::NotEnoughLedgerData(
2620 bank.slot(),
2621 wait_for_supermajority_slot,
2622 ));
2623 }
2624 _ => {}
2625 }
2626
2627 if let Some(expected_bank_hash) = config.expected_bank_hash {
2628 if bank.hash() != expected_bank_hash {
2629 return Err(ValidatorError::BankHashMismatch(
2630 bank.hash(),
2631 expected_bank_hash,
2632 ));
2633 }
2634 }
2635
2636 for i in 1.. {
2637 let logging = i % 10 == 1;
2638 if logging {
2639 info!(
2640 "Waiting for {}% of activated stake at slot {} to be in gossip...",
2641 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2642 bank.slot()
2643 );
2644 }
2645
2646 let gossip_stake_percent =
2647 get_stake_percent_in_gossip(&bank, cluster_info, logging);
2648
2649 *start_progress.write().unwrap() =
2650 ValidatorStartProgress::WaitingForSupermajority {
2651 slot: wait_for_supermajority_slot,
2652 gossip_stake_percent,
2653 };
2654
2655 if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2656 info!(
2657 "Supermajority reached, {gossip_stake_percent}% active stake detected, \
2658 starting up now.",
2659 );
2660 break;
2661 }
2662 rpc_override_health_check.store(true, Ordering::Relaxed);
2666 sleep(Duration::new(1, 0));
2667 }
2668 rpc_override_health_check.store(false, Ordering::Relaxed);
2669 Ok(true)
2670 }
2671 }
2672}
2673
2674fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2676 let mut online_stake = 0;
2677 let mut wrong_shred_stake = 0;
2678 let mut wrong_shred_nodes = vec![];
2679 let mut offline_stake = 0;
2680 let mut offline_nodes = vec![];
2681
2682 let mut total_activated_stake = 0;
2683 let now = timestamp();
2684 let peers: HashMap<_, _> = cluster_info
2688 .tvu_peers(ContactInfo::clone)
2689 .into_iter()
2690 .filter(|node| {
2691 let age = now.saturating_sub(node.wallclock());
2692 age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2694 })
2695 .map(|node| (*node.pubkey(), node))
2696 .collect();
2697 let my_shred_version = cluster_info.my_shred_version();
2698 let my_id = cluster_info.id();
2699
2700 for (activated_stake, vote_account) in bank.vote_accounts().values() {
2701 let activated_stake = *activated_stake;
2702 total_activated_stake += activated_stake;
2703
2704 if activated_stake == 0 {
2705 continue;
2706 }
2707 let vote_state_node_pubkey = *vote_account.node_pubkey();
2708
2709 if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2710 if peer.shred_version() == my_shred_version {
2711 trace!(
2712 "observed {vote_state_node_pubkey} in gossip, \
2713 (activated_stake={activated_stake})"
2714 );
2715 online_stake += activated_stake;
2716 } else {
2717 wrong_shred_stake += activated_stake;
2718 wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2719 }
2720 } else if vote_state_node_pubkey == my_id {
2721 online_stake += activated_stake; } else {
2723 offline_stake += activated_stake;
2724 offline_nodes.push((activated_stake, vote_state_node_pubkey));
2725 }
2726 }
2727
2728 let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2729 if log {
2730 info!("{online_stake_percentage:.3}% of active stake visible in gossip");
2731
2732 if !wrong_shred_nodes.is_empty() {
2733 info!(
2734 "{:.3}% of active stake has the wrong shred version in gossip",
2735 (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2736 );
2737 wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in wrong_shred_nodes {
2739 info!(
2740 " {:.3}% - {}",
2741 (stake as f64 / total_activated_stake as f64) * 100.,
2742 identity
2743 );
2744 }
2745 }
2746
2747 if !offline_nodes.is_empty() {
2748 info!(
2749 "{:.3}% of active stake is not visible in gossip",
2750 (offline_stake as f64 / total_activated_stake as f64) * 100.
2751 );
2752 offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in offline_nodes {
2754 info!(
2755 " {:.3}% - {}",
2756 (stake as f64 / total_activated_stake as f64) * 100.,
2757 identity
2758 );
2759 }
2760 }
2761 }
2762
2763 online_stake_percentage as u64
2764}
2765
2766fn cleanup_accounts_paths(config: &ValidatorConfig) {
2767 for account_path in &config.account_paths {
2768 move_and_async_delete_path_contents(account_path);
2769 }
2770 if let Some(shrink_paths) = config
2771 .accounts_db_config
2772 .as_ref()
2773 .and_then(|config| config.shrink_paths.as_ref())
2774 {
2775 for shrink_path in shrink_paths {
2776 move_and_async_delete_path_contents(shrink_path);
2777 }
2778 }
2779}
2780
2781pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2782 if !snapshot_config.should_generate_snapshots() {
2784 return true;
2785 }
2786
2787 let SnapshotInterval::Slots(full_snapshot_interval_slots) =
2788 snapshot_config.full_snapshot_archive_interval
2789 else {
2790 return false;
2792 };
2793
2794 match snapshot_config.incremental_snapshot_archive_interval {
2795 SnapshotInterval::Disabled => true,
2796 SnapshotInterval::Slots(incremental_snapshot_interval_slots) => {
2797 full_snapshot_interval_slots > incremental_snapshot_interval_slots
2798 }
2799 }
2800}
2801
2802#[cfg(test)]
2803mod tests {
2804 use {
2805 super::*,
2806 crossbeam_channel::{bounded, RecvTimeoutError},
2807 solana_entry::entry,
2808 solana_genesis_config::create_genesis_config,
2809 solana_gossip::contact_info::ContactInfo,
2810 solana_ledger::{
2811 blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2812 get_tmp_ledger_path_auto_delete,
2813 },
2814 solana_poh_config::PohConfig,
2815 solana_sha256_hasher::hash,
2816 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2817 std::{fs::remove_dir_all, num::NonZeroU64, thread, time::Duration},
2818 };
2819
2820 #[test]
2821 fn validator_exit() {
2822 solana_logger::setup();
2823 let leader_keypair = Keypair::new();
2824 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2825
2826 let validator_keypair = Keypair::new();
2827 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2828 let genesis_config =
2829 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2830 .genesis_config;
2831 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2832
2833 let voting_keypair = Arc::new(Keypair::new());
2834 let config = ValidatorConfig {
2835 rpc_addrs: Some((
2836 validator_node.info.rpc().unwrap(),
2837 validator_node.info.rpc_pubsub().unwrap(),
2838 )),
2839 ..ValidatorConfig::default_for_test()
2840 };
2841 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2842 let validator = Validator::new(
2843 validator_node,
2844 Arc::new(validator_keypair),
2845 &validator_ledger_path,
2846 &voting_keypair.pubkey(),
2847 Arc::new(RwLock::new(vec![voting_keypair])),
2848 vec![leader_node.info],
2849 &config,
2850 true, None, start_progress.clone(),
2853 SocketAddrSpace::Unspecified,
2854 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2855 Arc::new(RwLock::new(None)),
2856 )
2857 .expect("assume successful validator start");
2858 assert_eq!(
2859 *start_progress.read().unwrap(),
2860 ValidatorStartProgress::Running
2861 );
2862 validator.close();
2863 remove_dir_all(validator_ledger_path).unwrap();
2864 }
2865
2866 #[test]
2867 fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2868 solana_logger::setup();
2869
2870 let ledger_path = get_tmp_ledger_path_auto_delete!();
2871 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2872
2873 let mut validator_config = ValidatorConfig::default_for_test();
2874 let mut hard_forks = HardForks::default();
2875 let mut root_slot;
2876
2877 root_slot = 10;
2879 validator_config.wait_for_supermajority = Some(root_slot);
2880 assert_eq!(
2881 should_cleanup_blockstore_incorrect_shred_versions(
2882 &validator_config,
2883 &blockstore,
2884 root_slot,
2885 &hard_forks
2886 )
2887 .unwrap(),
2888 Some(root_slot + 1)
2889 );
2890
2891 root_slot = 15;
2894 assert_eq!(
2895 should_cleanup_blockstore_incorrect_shred_versions(
2896 &validator_config,
2897 &blockstore,
2898 root_slot,
2899 &hard_forks
2900 )
2901 .unwrap(),
2902 None,
2903 );
2904
2905 hard_forks.register(10);
2908 assert_eq!(
2909 should_cleanup_blockstore_incorrect_shred_versions(
2910 &validator_config,
2911 &blockstore,
2912 root_slot,
2913 &hard_forks
2914 )
2915 .unwrap(),
2916 None,
2917 );
2918
2919 let entries = entry::create_ticks(1, 0, Hash::default());
2921 for i in 20..35 {
2922 let shreds = blockstore::entries_to_test_shreds(
2923 &entries,
2924 i, i - 1, true, 1, );
2929 blockstore.insert_shreds(shreds, None, true).unwrap();
2930 }
2931
2932 assert_eq!(
2934 should_cleanup_blockstore_incorrect_shred_versions(
2935 &validator_config,
2936 &blockstore,
2937 root_slot,
2938 &hard_forks
2939 )
2940 .unwrap(),
2941 None,
2942 );
2943
2944 root_slot = 25;
2947 hard_forks.register(root_slot);
2948 validator_config.wait_for_supermajority = Some(root_slot);
2949 assert_eq!(
2950 should_cleanup_blockstore_incorrect_shred_versions(
2951 &validator_config,
2952 &blockstore,
2953 root_slot,
2954 &hard_forks
2955 )
2956 .unwrap(),
2957 Some(root_slot + 1),
2958 );
2959 validator_config.wait_for_supermajority = None;
2960 assert_eq!(
2961 should_cleanup_blockstore_incorrect_shred_versions(
2962 &validator_config,
2963 &blockstore,
2964 root_slot,
2965 &hard_forks
2966 )
2967 .unwrap(),
2968 Some(root_slot + 1),
2969 );
2970
2971 root_slot = 30;
2974 let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2975 assert_eq!(
2976 should_cleanup_blockstore_incorrect_shred_versions(
2977 &validator_config,
2978 &blockstore,
2979 root_slot,
2980 &hard_forks
2981 )
2982 .unwrap(),
2983 Some(latest_hard_fork + 1),
2984 );
2985
2986 blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2989 assert_eq!(
2990 should_cleanup_blockstore_incorrect_shred_versions(
2991 &validator_config,
2992 &blockstore,
2993 root_slot,
2994 &hard_forks
2995 )
2996 .unwrap(),
2997 None,
2998 );
2999 }
3000
3001 #[test]
3002 fn test_cleanup_blockstore_incorrect_shred_versions() {
3003 solana_logger::setup();
3004
3005 let validator_config = ValidatorConfig::default_for_test();
3006 let ledger_path = get_tmp_ledger_path_auto_delete!();
3007 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3008
3009 let entries = entry::create_ticks(1, 0, Hash::default());
3010 for i in 1..10 {
3011 let shreds = blockstore::entries_to_test_shreds(
3012 &entries,
3013 i, i - 1, true, 1, );
3018 blockstore.insert_shreds(shreds, None, true).unwrap();
3019 }
3020
3021 cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3023 assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3025 for i in 5..10 {
3026 assert!(blockstore
3027 .get_data_shreds_for_slot(i, 0)
3028 .unwrap()
3029 .is_empty());
3030 }
3031 }
3032
3033 #[test]
3034 fn validator_parallel_exit() {
3035 let leader_keypair = Keypair::new();
3036 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3037 let genesis_config =
3038 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3039 .genesis_config;
3040
3041 let mut ledger_paths = vec![];
3042 let mut validators: Vec<Validator> = (0..2)
3043 .map(|_| {
3044 let validator_keypair = Keypair::new();
3045 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3046 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3047 ledger_paths.push(validator_ledger_path.clone());
3048 let vote_account_keypair = Keypair::new();
3049 let config = ValidatorConfig {
3050 rpc_addrs: Some((
3051 validator_node.info.rpc().unwrap(),
3052 validator_node.info.rpc_pubsub().unwrap(),
3053 )),
3054 ..ValidatorConfig::default_for_test()
3055 };
3056 Validator::new(
3057 validator_node,
3058 Arc::new(validator_keypair),
3059 &validator_ledger_path,
3060 &vote_account_keypair.pubkey(),
3061 Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3062 vec![leader_node.info.clone()],
3063 &config,
3064 true, None, Arc::new(RwLock::new(ValidatorStartProgress::default())),
3067 SocketAddrSpace::Unspecified,
3068 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3069 Arc::new(RwLock::new(None)),
3070 )
3071 .expect("assume successful validator start")
3072 })
3073 .collect();
3074
3075 validators.iter_mut().for_each(|v| v.exit());
3077
3078 let (sender, receiver) = bounded(0);
3080 let _ = thread::spawn(move || {
3081 validators.into_iter().for_each(|validator| {
3082 validator.join();
3083 });
3084 sender.send(()).unwrap();
3085 });
3086
3087 let timeout = Duration::from_secs(60);
3088 if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3089 panic!("timeout for shutting down validators",);
3090 }
3091
3092 for path in ledger_paths {
3093 remove_dir_all(path).unwrap();
3094 }
3095 }
3096
3097 #[test]
3098 fn test_wait_for_supermajority() {
3099 solana_logger::setup();
3100 let node_keypair = Arc::new(Keypair::new());
3101 let cluster_info = ClusterInfo::new(
3102 ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3103 node_keypair,
3104 SocketAddrSpace::Unspecified,
3105 );
3106
3107 let (genesis_config, _mint_keypair) = create_genesis_config(1);
3108 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3109 let mut config = ValidatorConfig::default_for_test();
3110 let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3111 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3112
3113 assert!(!wait_for_supermajority(
3114 &config,
3115 None,
3116 &bank_forks,
3117 &cluster_info,
3118 rpc_override_health_check.clone(),
3119 &start_progress,
3120 )
3121 .unwrap());
3122
3123 config.wait_for_supermajority = Some(1);
3125 assert!(matches!(
3126 wait_for_supermajority(
3127 &config,
3128 None,
3129 &bank_forks,
3130 &cluster_info,
3131 rpc_override_health_check.clone(),
3132 &start_progress,
3133 ),
3134 Err(ValidatorError::NotEnoughLedgerData(_, _)),
3135 ));
3136
3137 let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3139 bank_forks.read().unwrap().root_bank(),
3140 &Pubkey::default(),
3141 1,
3142 ));
3143 config.wait_for_supermajority = Some(0);
3144 assert!(!wait_for_supermajority(
3145 &config,
3146 None,
3147 &bank_forks,
3148 &cluster_info,
3149 rpc_override_health_check.clone(),
3150 &start_progress,
3151 )
3152 .unwrap());
3153
3154 config.wait_for_supermajority = Some(1);
3156 config.expected_bank_hash = Some(hash(&[1]));
3157 assert!(matches!(
3158 wait_for_supermajority(
3159 &config,
3160 None,
3161 &bank_forks,
3162 &cluster_info,
3163 rpc_override_health_check,
3164 &start_progress,
3165 ),
3166 Err(ValidatorError::BankHashMismatch(_, _)),
3167 ));
3168 }
3169
3170 #[test]
3171 fn test_is_snapshot_config_valid() {
3172 fn new_snapshot_config(
3173 full_snapshot_archive_interval_slots: Slot,
3174 incremental_snapshot_archive_interval_slots: Slot,
3175 ) -> SnapshotConfig {
3176 SnapshotConfig {
3177 full_snapshot_archive_interval: SnapshotInterval::Slots(
3178 NonZeroU64::new(full_snapshot_archive_interval_slots).unwrap(),
3179 ),
3180 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3181 NonZeroU64::new(incremental_snapshot_archive_interval_slots).unwrap(),
3182 ),
3183 ..SnapshotConfig::default()
3184 }
3185 }
3186
3187 assert!(is_snapshot_config_valid(&SnapshotConfig::default()));
3189
3190 assert!(is_snapshot_config_valid(&SnapshotConfig {
3192 incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3193 ..SnapshotConfig::default()
3194 }));
3195
3196 assert!(!is_snapshot_config_valid(&SnapshotConfig {
3198 full_snapshot_archive_interval: SnapshotInterval::Disabled,
3199 ..SnapshotConfig::default()
3200 }));
3201
3202 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 200)));
3204 assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3205 assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3206 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3207
3208 assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3210 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3211 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3212
3213 assert!(is_snapshot_config_valid(&SnapshotConfig::new_disabled()));
3215 assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3216 assert!(is_snapshot_config_valid(&SnapshotConfig {
3217 full_snapshot_archive_interval: SnapshotInterval::Slots(NonZeroU64::new(37).unwrap()),
3218 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3219 NonZeroU64::new(41).unwrap()
3220 ),
3221 ..SnapshotConfig::new_load_only()
3222 }));
3223 assert!(is_snapshot_config_valid(&SnapshotConfig {
3224 full_snapshot_archive_interval: SnapshotInterval::Disabled,
3225 incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3226 ..SnapshotConfig::new_load_only()
3227 }));
3228 }
3229
3230 fn target_tick_duration() -> Duration {
3231 let target_tick_duration_us =
3239 solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3240 assert_eq!(target_tick_duration_us, 6250);
3241 Duration::from_micros(target_tick_duration_us)
3242 }
3243
3244 #[test]
3245 fn test_poh_speed() {
3246 solana_logger::setup();
3247 let poh_config = PohConfig {
3248 target_tick_duration: target_tick_duration(),
3249 hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3251 ..PohConfig::default()
3252 };
3253 let genesis_config = GenesisConfig {
3254 poh_config,
3255 ..GenesisConfig::default()
3256 };
3257 let bank = Bank::new_for_tests(&genesis_config);
3258 assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3259 }
3260
3261 #[test]
3262 fn test_poh_speed_no_hashes_per_tick() {
3263 solana_logger::setup();
3264 let poh_config = PohConfig {
3265 target_tick_duration: target_tick_duration(),
3266 hashes_per_tick: None,
3267 ..PohConfig::default()
3268 };
3269 let genesis_config = GenesisConfig {
3270 poh_config,
3271 ..GenesisConfig::default()
3272 };
3273 let bank = Bank::new_for_tests(&genesis_config);
3274 check_poh_speed(&bank, Some(10_000)).unwrap();
3275 }
3276}