1pub use solana_perf::report_target_features;
4use {
5 crate::{
6 admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
7 banking_stage::BankingStage,
8 banking_trace::{self, BankingTracer, TraceError},
9 cluster_info_vote_listener::VoteTracker,
10 completed_data_sets_service::CompletedDataSetsService,
11 consensus::{
12 reconcile_blockstore_roots_with_external_source,
13 tower_storage::{NullTowerStorage, TowerStorage},
14 ExternalRootSource, Tower,
15 },
16 repair::{
17 self,
18 quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19 repair_handler::RepairHandlerType,
20 serve_repair_service::ServeRepairService,
21 },
22 sample_performance_service::SamplePerformanceService,
23 sigverify,
24 snapshot_packager_service::SnapshotPackagerService,
25 stats_reporter_service::StatsReporterService,
26 system_monitor_service::{
27 verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28 },
29 tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30 tvu::{Tvu, TvuConfig, TvuSockets},
31 },
32 anyhow::{anyhow, Context, Result},
33 crossbeam_channel::{bounded, unbounded, Receiver},
34 quinn::Endpoint,
35 solana_accounts_db::{
36 accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37 accounts_update_notifier_interface::AccountsUpdateNotifier,
38 hardened_unpack::{
39 open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40 },
41 utils::move_and_async_delete_path_contents,
42 },
43 solana_client::connection_cache::{ConnectionCache, Protocol},
44 solana_clock::Slot,
45 solana_cluster_type::ClusterType,
46 solana_entry::poh::compute_hash_time,
47 solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
48 solana_genesis_config::GenesisConfig,
49 solana_geyser_plugin_manager::{
50 geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
51 },
52 solana_gossip::{
53 cluster_info::{
54 ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
55 DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
56 },
57 contact_info::ContactInfo,
58 crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
59 gossip_service::GossipService,
60 node::{Node, NodeMultihoming},
61 },
62 solana_hard_forks::HardForks,
63 solana_hash::Hash,
64 solana_keypair::Keypair,
65 solana_ledger::{
66 bank_forks_utils,
67 blockstore::{
68 Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
69 MAX_REPLAY_WAKE_UP_SIGNALS,
70 },
71 blockstore_metric_report_service::BlockstoreMetricReportService,
72 blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
73 blockstore_processor::{self, TransactionStatusSender},
74 entry_notifier_interface::EntryNotifierArc,
75 entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
76 leader_schedule::FixedSchedule,
77 leader_schedule_cache::LeaderScheduleCache,
78 use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
79 },
80 solana_measure::measure::Measure,
81 solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
82 solana_poh::{
83 poh_recorder::PohRecorder,
84 poh_service::{self, PohService},
85 transaction_recorder::TransactionRecorder,
86 },
87 solana_pubkey::Pubkey,
88 solana_rayon_threadlimit::get_thread_count,
89 solana_rpc::{
90 max_slots::MaxSlots,
91 optimistically_confirmed_bank_tracker::{
92 BankNotificationSenderConfig, OptimisticallyConfirmedBank,
93 OptimisticallyConfirmedBankTracker,
94 },
95 rpc::JsonRpcConfig,
96 rpc_completed_slots_service::RpcCompletedSlotsService,
97 rpc_pubsub_service::{PubSubConfig, PubSubService},
98 rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
99 rpc_subscriptions::RpcSubscriptions,
100 transaction_notifier_interface::TransactionNotifierArc,
101 transaction_status_service::TransactionStatusService,
102 },
103 solana_runtime::{
104 accounts_background_service::{
105 AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
106 PendingSnapshotPackages, PrunedBanksRequestHandler, SnapshotRequestHandler,
107 },
108 bank::Bank,
109 bank_forks::BankForks,
110 commitment::BlockCommitmentCache,
111 dependency_tracker::DependencyTracker,
112 prioritization_fee_cache::PrioritizationFeeCache,
113 runtime_config::RuntimeConfig,
114 snapshot_archive_info::SnapshotArchiveInfoGetter,
115 snapshot_bank_utils,
116 snapshot_config::SnapshotConfig,
117 snapshot_controller::SnapshotController,
118 snapshot_hash::StartingSnapshotHashes,
119 snapshot_utils::{self, clean_orphaned_account_snapshot_dirs, SnapshotInterval},
120 },
121 solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
122 solana_shred_version::compute_shred_version,
123 solana_signer::Signer,
124 solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
125 solana_time_utils::timestamp,
126 solana_tpu_client::tpu_client::{
127 DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
128 },
129 solana_turbine::{
130 self,
131 broadcast_stage::BroadcastStageType,
132 xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitter},
133 },
134 solana_unified_scheduler_pool::DefaultSchedulerPool,
135 solana_validator_exit::Exit,
136 solana_vote_program::vote_state,
137 solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
138 std::{
139 borrow::Cow,
140 collections::{HashMap, HashSet},
141 net::{IpAddr, SocketAddr},
142 num::NonZeroUsize,
143 path::{Path, PathBuf},
144 sync::{
145 atomic::{AtomicBool, AtomicU64, Ordering},
146 Arc, Mutex, RwLock,
147 },
148 thread::{sleep, Builder, JoinHandle},
149 time::{Duration, Instant},
150 },
151 strum::VariantNames,
152 strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
153 thiserror::Error,
154 tokio::runtime::Runtime as TokioRuntime,
155 tokio_util::sync::CancellationToken,
156};
157
158const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
159const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
160const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
164 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
165
166#[derive(
167 Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
168)]
169#[strum(serialize_all = "kebab-case")]
170pub enum BlockVerificationMethod {
171 BlockstoreProcessor,
172 #[default]
173 UnifiedScheduler,
174}
175
176impl BlockVerificationMethod {
177 pub const fn cli_names() -> &'static [&'static str] {
178 Self::VARIANTS
179 }
180
181 pub fn cli_message() -> &'static str {
182 "Switch transaction scheduling method for verifying ledger entries"
183 }
184}
185
186#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
187#[strum(serialize_all = "kebab-case")]
188pub enum BlockProductionMethod {
189 CentralScheduler,
190 #[default]
191 CentralSchedulerGreedy,
192}
193
194impl BlockProductionMethod {
195 pub const fn cli_names() -> &'static [&'static str] {
196 Self::VARIANTS
197 }
198
199 pub fn cli_message() -> &'static str {
200 "Switch transaction scheduling method for producing ledger entries"
201 }
202}
203
204#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
205#[strum(serialize_all = "kebab-case")]
206pub enum TransactionStructure {
207 Sdk,
208 #[default]
209 View,
210}
211
212impl TransactionStructure {
213 pub const fn cli_names() -> &'static [&'static str] {
214 Self::VARIANTS
215 }
216
217 pub fn cli_message() -> &'static str {
218 "Switch internal transaction structure/representation"
219 }
220}
221
222#[derive(Clone, Debug)]
224pub struct GeneratorConfig {
225 pub accounts_path: String,
226 pub starting_keypairs: Arc<Vec<Keypair>>,
227}
228
229pub struct ValidatorConfig {
230 pub halt_at_slot: Option<Slot>,
231 pub expected_genesis_hash: Option<Hash>,
232 pub expected_bank_hash: Option<Hash>,
233 pub expected_shred_version: Option<u16>,
234 pub voting_disabled: bool,
235 pub account_paths: Vec<PathBuf>,
236 pub account_snapshot_paths: Vec<PathBuf>,
237 pub rpc_config: JsonRpcConfig,
238 pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
240 pub geyser_plugin_always_enabled: bool,
241 pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, pub pubsub_config: PubSubConfig,
243 pub snapshot_config: SnapshotConfig,
244 pub max_ledger_shreds: Option<u64>,
245 pub blockstore_options: BlockstoreOptions,
246 pub broadcast_stage_type: BroadcastStageType,
247 pub turbine_disabled: Arc<AtomicBool>,
248 pub fixed_leader_schedule: Option<FixedSchedule>,
249 pub wait_for_supermajority: Option<Slot>,
250 pub new_hard_forks: Option<Vec<Slot>>,
251 pub known_validators: Option<HashSet<Pubkey>>, pub repair_validators: Option<HashSet<Pubkey>>, pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, pub gossip_validators: Option<HashSet<Pubkey>>, pub max_genesis_archive_unpacked_size: u64,
256 pub run_verification: bool,
259 pub require_tower: bool,
260 pub tower_storage: Arc<dyn TowerStorage>,
261 pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
262 pub contact_debug_interval: u64,
263 pub contact_save_interval: u64,
264 pub send_transaction_service_config: SendTransactionServiceConfig,
265 pub no_poh_speed_test: bool,
266 pub no_os_memory_stats_reporting: bool,
267 pub no_os_network_stats_reporting: bool,
268 pub no_os_cpu_stats_reporting: bool,
269 pub no_os_disk_stats_reporting: bool,
270 pub poh_pinned_cpu_core: usize,
271 pub poh_hashes_per_batch: u64,
272 pub process_ledger_before_services: bool,
273 pub accounts_db_config: Option<AccountsDbConfig>,
274 pub warp_slot: Option<Slot>,
275 pub accounts_db_skip_shrink: bool,
276 pub accounts_db_force_initial_clean: bool,
277 pub tpu_coalesce: Duration,
278 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
279 pub validator_exit: Arc<RwLock<Exit>>,
280 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
281 pub no_wait_for_vote_to_start_leader: bool,
282 pub wait_to_vote_slot: Option<Slot>,
283 pub runtime_config: RuntimeConfig,
284 pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
285 pub block_verification_method: BlockVerificationMethod,
286 pub block_production_method: BlockProductionMethod,
287 pub block_production_num_workers: NonZeroUsize,
288 pub transaction_struct: TransactionStructure,
289 pub enable_block_production_forwarding: bool,
290 pub generator_config: Option<GeneratorConfig>,
291 pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
292 pub wen_restart_proto_path: Option<PathBuf>,
293 pub wen_restart_coordinator: Option<Pubkey>,
294 pub unified_scheduler_handler_threads: Option<usize>,
295 pub ip_echo_server_threads: NonZeroUsize,
296 pub rayon_global_threads: NonZeroUsize,
297 pub replay_forks_threads: NonZeroUsize,
298 pub replay_transactions_threads: NonZeroUsize,
299 pub tvu_shred_sigverify_threads: NonZeroUsize,
300 pub delay_leader_block_for_pending_fork: bool,
301 pub use_tpu_client_next: bool,
302 pub retransmit_xdp: Option<XdpConfig>,
303 pub repair_handler_type: RepairHandlerType,
304}
305
306impl ValidatorConfig {
307 pub fn default_for_test() -> Self {
308 let max_thread_count =
309 NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
310
311 Self {
312 halt_at_slot: None,
313 expected_genesis_hash: None,
314 expected_bank_hash: None,
315 expected_shred_version: None,
316 voting_disabled: false,
317 max_ledger_shreds: None,
318 blockstore_options: BlockstoreOptions::default_for_tests(),
319 account_paths: Vec::new(),
320 account_snapshot_paths: Vec::new(),
321 rpc_config: JsonRpcConfig::default_for_test(),
322 on_start_geyser_plugin_config_files: None,
323 geyser_plugin_always_enabled: false,
324 rpc_addrs: None,
325 pubsub_config: PubSubConfig::default(),
326 snapshot_config: SnapshotConfig::new_load_only(),
327 broadcast_stage_type: BroadcastStageType::Standard,
328 turbine_disabled: Arc::<AtomicBool>::default(),
329 fixed_leader_schedule: None,
330 wait_for_supermajority: None,
331 new_hard_forks: None,
332 known_validators: None,
333 repair_validators: None,
334 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
335 gossip_validators: None,
336 max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
337 run_verification: true,
338 require_tower: false,
339 tower_storage: Arc::new(NullTowerStorage::default()),
340 debug_keys: None,
341 contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
342 contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
343 send_transaction_service_config: SendTransactionServiceConfig::default(),
344 no_poh_speed_test: true,
345 no_os_memory_stats_reporting: true,
346 no_os_network_stats_reporting: true,
347 no_os_cpu_stats_reporting: true,
348 no_os_disk_stats_reporting: true,
349 poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
350 poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
351 process_ledger_before_services: false,
352 warp_slot: None,
353 accounts_db_skip_shrink: false,
354 accounts_db_force_initial_clean: false,
355 tpu_coalesce: DEFAULT_TPU_COALESCE,
356 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
357 validator_exit: Arc::new(RwLock::new(Exit::default())),
358 validator_exit_backpressure: HashMap::default(),
359 no_wait_for_vote_to_start_leader: true,
360 accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
361 wait_to_vote_slot: None,
362 runtime_config: RuntimeConfig::default(),
363 banking_trace_dir_byte_limit: 0,
364 block_verification_method: BlockVerificationMethod::default(),
365 block_production_method: BlockProductionMethod::default(),
366 block_production_num_workers: BankingStage::default_num_workers(),
367 transaction_struct: TransactionStructure::default(),
368 enable_block_production_forwarding: true,
370 generator_config: None,
371 use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
372 wen_restart_proto_path: None,
373 wen_restart_coordinator: None,
374 unified_scheduler_handler_threads: None,
375 ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
376 rayon_global_threads: max_thread_count,
377 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
378 replay_transactions_threads: max_thread_count,
379 tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
380 .expect("thread count is non-zero"),
381 delay_leader_block_for_pending_fork: false,
382 use_tpu_client_next: true,
383 retransmit_xdp: None,
384 repair_handler_type: RepairHandlerType::default(),
385 }
386 }
387
388 pub fn enable_default_rpc_block_subscribe(&mut self) {
389 let pubsub_config = PubSubConfig {
390 enable_block_subscription: true,
391 ..PubSubConfig::default()
392 };
393 let rpc_config = JsonRpcConfig {
394 enable_rpc_transaction_history: true,
395 ..JsonRpcConfig::default_for_test()
396 };
397
398 self.pubsub_config = pubsub_config;
399 self.rpc_config = rpc_config;
400 }
401}
402
403#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
407pub enum ValidatorStartProgress {
408 Initializing, SearchingForRpcService,
410 DownloadingSnapshot {
411 slot: Slot,
412 rpc_addr: SocketAddr,
413 },
414 CleaningBlockStore,
415 CleaningAccounts,
416 LoadingLedger,
417 ProcessingLedger {
418 slot: Slot,
419 max_slot: Slot,
420 },
421 StartingServices,
422 Halted, WaitingForSupermajority {
424 slot: Slot,
425 gossip_stake_percent: u64,
426 },
427
428 Running,
431}
432
433impl Default for ValidatorStartProgress {
434 fn default() -> Self {
435 Self::Initializing
436 }
437}
438
439struct BlockstoreRootScan {
440 thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
441}
442
443impl BlockstoreRootScan {
444 fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
445 let thread = if config.rpc_addrs.is_some()
446 && config.rpc_config.enable_rpc_transaction_history
447 && config.rpc_config.rpc_scan_and_fix_roots
448 {
449 Some(
450 Builder::new()
451 .name("solBStoreRtScan".to_string())
452 .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
453 .unwrap(),
454 )
455 } else {
456 None
457 };
458 Self { thread }
459 }
460
461 fn join(self) {
462 if let Some(blockstore_root_scan) = self.thread {
463 if let Err(err) = blockstore_root_scan.join() {
464 warn!("blockstore_root_scan failed to join {err:?}");
465 }
466 }
467 }
468}
469
470#[derive(Default)]
471struct TransactionHistoryServices {
472 transaction_status_sender: Option<TransactionStatusSender>,
473 transaction_status_service: Option<TransactionStatusService>,
474 max_complete_transaction_status_slot: Arc<AtomicU64>,
475}
476
477pub struct ValidatorTpuConfig {
479 pub use_quic: bool,
481 pub vote_use_quic: bool,
483 pub tpu_connection_pool_size: usize,
485 pub tpu_enable_udp: bool,
487 pub tpu_quic_server_config: QuicServerParams,
489 pub tpu_fwd_quic_server_config: QuicServerParams,
491 pub vote_quic_server_config: QuicServerParams,
493}
494
495impl ValidatorTpuConfig {
496 pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
499 let tpu_quic_server_config = QuicServerParams {
500 max_connections_per_ipaddr_per_min: 32,
501 coalesce_channel_size: 100_000, ..Default::default()
503 };
504
505 let tpu_fwd_quic_server_config = QuicServerParams {
506 max_connections_per_ipaddr_per_min: 32,
507 max_unstaked_connections: 0,
508 coalesce_channel_size: 100_000, ..Default::default()
510 };
511
512 let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
514
515 ValidatorTpuConfig {
516 use_quic: DEFAULT_TPU_USE_QUIC,
517 vote_use_quic: DEFAULT_VOTE_USE_QUIC,
518 tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
519 tpu_enable_udp,
520 tpu_quic_server_config,
521 tpu_fwd_quic_server_config,
522 vote_quic_server_config,
523 }
524 }
525}
526
527pub struct Validator {
528 validator_exit: Arc<RwLock<Exit>>,
529 json_rpc_service: Option<JsonRpcService>,
530 pubsub_service: Option<PubSubService>,
531 rpc_completed_slots_service: Option<JoinHandle<()>>,
532 optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
533 transaction_status_service: Option<TransactionStatusService>,
534 entry_notifier_service: Option<EntryNotifierService>,
535 system_monitor_service: Option<SystemMonitorService>,
536 sample_performance_service: Option<SamplePerformanceService>,
537 stats_reporter_service: StatsReporterService,
538 gossip_service: GossipService,
539 serve_repair_service: ServeRepairService,
540 completed_data_sets_service: Option<CompletedDataSetsService>,
541 snapshot_packager_service: Option<SnapshotPackagerService>,
542 poh_recorder: Arc<RwLock<PohRecorder>>,
543 poh_service: PohService,
544 tpu: Tpu,
545 tvu: Tvu,
546 ip_echo_server: Option<solana_net_utils::IpEchoServer>,
547 pub cluster_info: Arc<ClusterInfo>,
548 pub bank_forks: Arc<RwLock<BankForks>>,
549 pub blockstore: Arc<Blockstore>,
550 geyser_plugin_service: Option<GeyserPluginService>,
551 blockstore_metric_report_service: BlockstoreMetricReportService,
552 accounts_background_service: AccountsBackgroundService,
553 turbine_quic_endpoint: Option<Endpoint>,
554 turbine_quic_endpoint_runtime: Option<TokioRuntime>,
555 turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
556 repair_quic_endpoints: Option<[Endpoint; 3]>,
557 repair_quic_endpoints_runtime: Option<TokioRuntime>,
558 repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
559 xdp_retransmitter: Option<XdpRetransmitter>,
560 _tpu_client_next_runtime: Option<TokioRuntime>,
564}
565
566impl Validator {
567 #[allow(clippy::too_many_arguments)]
568 pub fn new(
569 mut node: Node,
570 identity_keypair: Arc<Keypair>,
571 ledger_path: &Path,
572 vote_account: &Pubkey,
573 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
574 cluster_entrypoints: Vec<ContactInfo>,
575 config: &ValidatorConfig,
576 should_check_duplicate_instance: bool,
577 rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
578 start_progress: Arc<RwLock<ValidatorStartProgress>>,
579 socket_addr_space: SocketAddrSpace,
580 tpu_config: ValidatorTpuConfig,
581 admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
582 ) -> Result<Self> {
583 #[cfg(debug_assertions)]
584 const DEBUG_ASSERTION_STATUS: &str = "enabled";
585 #[cfg(not(debug_assertions))]
586 const DEBUG_ASSERTION_STATUS: &str = "disabled";
587 info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
588
589 let ValidatorTpuConfig {
590 use_quic,
591 vote_use_quic,
592 tpu_connection_pool_size,
593 tpu_enable_udp,
594 tpu_quic_server_config,
595 tpu_fwd_quic_server_config,
596 vote_quic_server_config,
597 } = tpu_config;
598
599 let start_time = Instant::now();
600
601 if rayon::ThreadPoolBuilder::new()
605 .thread_name(|i| format!("solRayonGlob{i:02}"))
606 .num_threads(config.rayon_global_threads.get())
607 .build_global()
608 .is_err()
609 {
610 warn!("Rayon global thread pool already initialized");
611 }
612
613 let id = identity_keypair.pubkey();
614 assert_eq!(&id, node.info.pubkey());
615
616 info!("identity pubkey: {id}");
617 info!("vote account pubkey: {vote_account}");
618
619 if !config.no_os_network_stats_reporting {
620 verify_net_stats_access().map_err(|e| {
621 ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
622 })?;
623 }
624
625 let mut bank_notification_senders = Vec::new();
626
627 let exit = Arc::new(AtomicBool::new(false));
628
629 let geyser_plugin_config_files = config
630 .on_start_geyser_plugin_config_files
631 .as_ref()
632 .map(Cow::Borrowed)
633 .or_else(|| {
634 config
635 .geyser_plugin_always_enabled
636 .then_some(Cow::Owned(vec![]))
637 });
638 let geyser_plugin_service =
639 if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
640 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
641 bank_notification_senders.push(confirmed_bank_sender);
642 let rpc_to_plugin_manager_receiver_and_exit =
643 rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
644 Some(
645 GeyserPluginService::new_with_receiver(
646 confirmed_bank_receiver,
647 config.geyser_plugin_always_enabled,
648 geyser_plugin_config_files.as_ref(),
649 rpc_to_plugin_manager_receiver_and_exit,
650 )
651 .map_err(|err| {
652 ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
653 })?,
654 )
655 } else {
656 None
657 };
658
659 if config.voting_disabled {
660 warn!("voting disabled");
661 authorized_voter_keypairs.write().unwrap().clear();
662 } else {
663 for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
664 warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
665 }
666 }
667
668 for cluster_entrypoint in &cluster_entrypoints {
669 info!("entrypoint: {cluster_entrypoint:?}");
670 }
671
672 if solana_perf::perf_libs::api().is_some() {
673 info!("Initializing sigverify, this could take a while...");
674 } else {
675 info!("Initializing sigverify...");
676 }
677 sigverify::init();
678 info!("Initializing sigverify done.");
679
680 if !ledger_path.is_dir() {
681 return Err(anyhow!(
682 "ledger directory does not exist or is not accessible: {ledger_path:?}"
683 ));
684 }
685 let genesis_config = load_genesis(config, ledger_path)?;
686 metrics_config_sanity_check(genesis_config.cluster_type)?;
687
688 info!("Cleaning accounts paths..");
689 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
690 let mut timer = Measure::start("clean_accounts_paths");
691 cleanup_accounts_paths(config);
692 timer.stop();
693 info!("Cleaning accounts paths done. {timer}");
694
695 snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
696 snapshot_utils::purge_old_bank_snapshots_at_startup(
697 &config.snapshot_config.bank_snapshots_dir,
698 );
699
700 info!("Cleaning orphaned account snapshot directories..");
701 let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
702 clean_orphaned_account_snapshot_dirs(
703 &config.snapshot_config.bank_snapshots_dir,
704 &config.account_snapshot_paths,
705 )
706 .context("failed to clean orphaned account snapshot directories")?;
707 timer.stop();
708 info!("Cleaning orphaned account snapshot directories done. {timer}");
709
710 let cancel_tpu_client_next = CancellationToken::new();
712 {
713 let exit = exit.clone();
714 config
715 .validator_exit
716 .write()
717 .unwrap()
718 .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
719 let cancel_tpu_client_next = cancel_tpu_client_next.clone();
720 config
721 .validator_exit
722 .write()
723 .unwrap()
724 .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
725 }
726
727 let (
728 accounts_update_notifier,
729 transaction_notifier,
730 entry_notifier,
731 block_metadata_notifier,
732 slot_status_notifier,
733 ) = if let Some(service) = &geyser_plugin_service {
734 (
735 service.get_accounts_update_notifier(),
736 service.get_transaction_notifier(),
737 service.get_entry_notifier(),
738 service.get_block_metadata_notifier(),
739 service.get_slot_status_notifier(),
740 )
741 } else {
742 (None, None, None, None, None)
743 };
744
745 info!(
746 "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
747 entry_notifier: {}",
748 accounts_update_notifier.is_some(),
749 transaction_notifier.is_some(),
750 entry_notifier.is_some()
751 );
752
753 let system_monitor_service = Some(SystemMonitorService::new(
754 exit.clone(),
755 SystemMonitorStatsReportConfig {
756 report_os_memory_stats: !config.no_os_memory_stats_reporting,
757 report_os_network_stats: !config.no_os_network_stats_reporting,
758 report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
759 report_os_disk_stats: !config.no_os_disk_stats_reporting,
760 },
761 ));
762
763 let dependency_tracker = Arc::new(DependencyTracker::default());
764
765 let (
766 bank_forks,
767 blockstore,
768 original_blockstore_root,
769 ledger_signal_receiver,
770 leader_schedule_cache,
771 starting_snapshot_hashes,
772 TransactionHistoryServices {
773 transaction_status_sender,
774 transaction_status_service,
775 max_complete_transaction_status_slot,
776 },
777 blockstore_process_options,
778 blockstore_root_scan,
779 pruned_banks_receiver,
780 entry_notifier_service,
781 ) = load_blockstore(
782 config,
783 ledger_path,
784 &genesis_config,
785 exit.clone(),
786 &start_progress,
787 accounts_update_notifier,
788 transaction_notifier,
789 entry_notifier,
790 config
791 .rpc_addrs
792 .is_some()
793 .then(|| dependency_tracker.clone()),
794 )
795 .map_err(ValidatorError::Other)?;
796
797 if !config.no_poh_speed_test {
798 check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
799 }
800
801 let (root_slot, hard_forks) = {
802 let root_bank = bank_forks.read().unwrap().root_bank();
803 (root_bank.slot(), root_bank.hard_forks())
804 };
805 let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
806 info!("shred version: {shred_version}, hard forks: {hard_forks:?}");
807
808 if let Some(expected_shred_version) = config.expected_shred_version {
809 if expected_shred_version != shred_version {
810 return Err(ValidatorError::ShredVersionMismatch {
811 actual: shred_version,
812 expected: expected_shred_version,
813 }
814 .into());
815 }
816 }
817
818 if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
819 config,
820 &blockstore,
821 root_slot,
822 &hard_forks,
823 )? {
824 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
825 cleanup_blockstore_incorrect_shred_versions(
826 &blockstore,
827 config,
828 start_slot,
829 shred_version,
830 )?;
831 } else {
832 info!("Skipping the blockstore check for shreds with incorrect version");
833 }
834
835 node.info.set_shred_version(shred_version);
836 node.info.set_wallclock(timestamp());
837 Self::print_node_info(&node);
838
839 let mut cluster_info = ClusterInfo::new(
840 node.info.clone(),
841 identity_keypair.clone(),
842 socket_addr_space,
843 );
844 cluster_info.set_contact_debug_interval(config.contact_debug_interval);
845 cluster_info.set_entrypoints(cluster_entrypoints);
846 cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
847 cluster_info.set_bind_ip_addrs(node.bind_ip_addrs.clone());
848 let cluster_info = Arc::new(cluster_info);
849 let node_multihoming = NodeMultihoming::from(&node);
850
851 assert!(is_snapshot_config_valid(&config.snapshot_config));
852
853 let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
854 let snapshot_controller = Arc::new(SnapshotController::new(
855 snapshot_request_sender.clone(),
856 config.snapshot_config.clone(),
857 bank_forks.read().unwrap().root(),
858 ));
859
860 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
861 let snapshot_packager_service = if snapshot_controller
862 .snapshot_config()
863 .should_generate_snapshots()
864 {
865 let exit_backpressure = config
866 .validator_exit_backpressure
867 .get(SnapshotPackagerService::NAME)
868 .cloned();
869 let enable_gossip_push = true;
870 let snapshot_packager_service = SnapshotPackagerService::new(
871 pending_snapshot_packages.clone(),
872 starting_snapshot_hashes,
873 exit.clone(),
874 exit_backpressure,
875 cluster_info.clone(),
876 snapshot_controller.clone(),
877 enable_gossip_push,
878 );
879 Some(snapshot_packager_service)
880 } else {
881 None
882 };
883
884 let snapshot_request_handler = SnapshotRequestHandler {
885 snapshot_controller: snapshot_controller.clone(),
886 snapshot_request_receiver,
887 pending_snapshot_packages,
888 };
889 let pruned_banks_request_handler = PrunedBanksRequestHandler {
890 pruned_banks_receiver,
891 };
892 let accounts_background_service = AccountsBackgroundService::new(
893 bank_forks.clone(),
894 exit.clone(),
895 AbsRequestHandlers {
896 snapshot_request_handler,
897 pruned_banks_request_handler,
898 },
899 );
900 info!(
901 "Using: block-verification-method: {}, block-production-method: {}, \
902 transaction-structure: {}",
903 config.block_verification_method,
904 config.block_production_method,
905 config.transaction_struct
906 );
907
908 let (replay_vote_sender, replay_vote_receiver) = unbounded();
909
910 let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
913
914 let leader_schedule_cache = Arc::new(leader_schedule_cache);
915 let startup_verification_complete;
916 let (mut poh_recorder, entry_receiver) = {
917 let bank = &bank_forks.read().unwrap().working_bank();
918 startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
919 PohRecorder::new_with_clear_signal(
920 bank.tick_height(),
921 bank.last_blockhash(),
922 bank.clone(),
923 None,
924 bank.ticks_per_slot(),
925 config.delay_leader_block_for_pending_fork,
926 blockstore.clone(),
927 blockstore.get_new_shred_signal(0),
928 &leader_schedule_cache,
929 &genesis_config.poh_config,
930 exit.clone(),
931 )
932 };
933 if transaction_status_sender.is_some() {
934 poh_recorder.track_transaction_indexes();
935 }
936 let (record_sender, record_receiver) = unbounded();
937 let transaction_recorder =
938 TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
939 let poh_recorder = Arc::new(RwLock::new(poh_recorder));
940
941 let (banking_tracer, tracer_thread) =
942 BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
943 &blockstore.banking_trace_path(),
944 exit.clone(),
945 config.banking_trace_dir_byte_limit,
946 )))?;
947 if banking_tracer.is_enabled() {
948 info!(
949 "Enabled banking trace (dir_byte_limit: {})",
950 config.banking_trace_dir_byte_limit
951 );
952 } else {
953 info!("Disabled banking trace");
954 }
955 let banking_tracer_channels = banking_tracer.create_channels(false);
956
957 match &config.block_verification_method {
958 BlockVerificationMethod::BlockstoreProcessor => {
959 info!("no scheduler pool is installed for block verification...");
960 if let Some(count) = config.unified_scheduler_handler_threads {
961 warn!(
962 "--unified-scheduler-handler-threads={count} is ignored because unified \
963 scheduler isn't enabled"
964 );
965 }
966 }
967 BlockVerificationMethod::UnifiedScheduler => {
968 let scheduler_pool = DefaultSchedulerPool::new_dyn(
969 config.unified_scheduler_handler_threads,
970 config.runtime_config.log_messages_bytes_limit,
971 transaction_status_sender.clone(),
972 Some(replay_vote_sender.clone()),
973 prioritization_fee_cache.clone(),
974 );
975 bank_forks
976 .write()
977 .unwrap()
978 .install_scheduler_pool(scheduler_pool);
979 }
980 }
981
982 let entry_notification_sender = entry_notifier_service
983 .as_ref()
984 .map(|service| service.sender());
985 let mut process_blockstore = ProcessBlockStore::new(
986 &id,
987 vote_account,
988 &start_progress,
989 &blockstore,
990 original_blockstore_root,
991 &bank_forks,
992 &leader_schedule_cache,
993 &blockstore_process_options,
994 transaction_status_sender.as_ref(),
995 entry_notification_sender,
996 blockstore_root_scan,
997 &snapshot_controller,
998 config,
999 );
1000
1001 maybe_warp_slot(
1002 config,
1003 &mut process_blockstore,
1004 ledger_path,
1005 &bank_forks,
1006 &leader_schedule_cache,
1007 &snapshot_controller,
1008 )
1009 .map_err(ValidatorError::Other)?;
1010
1011 if config.process_ledger_before_services {
1012 process_blockstore
1013 .process()
1014 .map_err(ValidatorError::Other)?;
1015 }
1016 *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1017
1018 let sample_performance_service =
1019 if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1020 Some(SamplePerformanceService::new(
1021 &bank_forks,
1022 blockstore.clone(),
1023 exit.clone(),
1024 ))
1025 } else {
1026 None
1027 };
1028
1029 let mut block_commitment_cache = BlockCommitmentCache::default();
1030 let bank_forks_guard = bank_forks.read().unwrap();
1031 block_commitment_cache.initialize_slots(
1032 bank_forks_guard.working_bank().slot(),
1033 bank_forks_guard.root(),
1034 );
1035 drop(bank_forks_guard);
1036 let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1037
1038 let optimistically_confirmed_bank =
1039 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1040
1041 let max_slots = Arc::new(MaxSlots::default());
1042
1043 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1044
1045 let mut tpu_transactions_forwards_client =
1046 Some(node.sockets.tpu_transaction_forwarding_client);
1047
1048 let connection_cache = match (config.use_tpu_client_next, use_quic) {
1049 (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1050 "connection_cache_tpu_quic",
1051 tpu_connection_pool_size,
1052 Some(
1053 tpu_transactions_forwards_client
1054 .take()
1055 .expect("Socket should exist."),
1056 ),
1057 Some((
1058 &identity_keypair,
1059 node.info
1060 .tpu(Protocol::UDP)
1061 .ok_or_else(|| {
1062 ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1063 })?
1064 .ip(),
1065 )),
1066 Some((&staked_nodes, &identity_keypair.pubkey())),
1067 ))),
1068 (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1069 "connection_cache_tpu_udp",
1070 tpu_connection_pool_size,
1071 ))),
1072 (true, _) => None,
1073 };
1074
1075 let vote_connection_cache = if vote_use_quic {
1076 let vote_connection_cache = ConnectionCache::new_with_client_options(
1077 "connection_cache_vote_quic",
1078 tpu_connection_pool_size,
1079 Some(node.sockets.quic_vote_client),
1080 Some((
1081 &identity_keypair,
1082 node.info
1083 .tpu_vote(Protocol::QUIC)
1084 .ok_or_else(|| {
1085 ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1086 })?
1087 .ip(),
1088 )),
1089 Some((&staked_nodes, &identity_keypair.pubkey())),
1090 );
1091 Arc::new(vote_connection_cache)
1092 } else {
1093 Arc::new(ConnectionCache::with_udp(
1094 "connection_cache_vote_udp",
1095 tpu_connection_pool_size,
1096 ))
1097 };
1098
1099 let current_runtime_handle = tokio::runtime::Handle::try_current();
1105 let tpu_client_next_runtime =
1106 (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1107 tokio::runtime::Builder::new_multi_thread()
1108 .enable_all()
1109 .worker_threads(2)
1110 .thread_name("solTpuClientRt")
1111 .build()
1112 .unwrap()
1113 });
1114
1115 let rpc_override_health_check =
1116 Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1117 let (
1118 json_rpc_service,
1119 rpc_subscriptions,
1120 pubsub_service,
1121 completed_data_sets_sender,
1122 completed_data_sets_service,
1123 rpc_completed_slots_service,
1124 optimistically_confirmed_bank_tracker,
1125 bank_notification_sender,
1126 ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1127 assert_eq!(
1128 node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1129 node.info
1130 .rpc_pubsub()
1131 .map(|addr| socket_addr_space.check(&addr))
1132 );
1133 let (bank_notification_sender, bank_notification_receiver) = unbounded();
1134 let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1135 Some(Arc::new(RwLock::new(bank_notification_senders)))
1136 } else {
1137 None
1138 };
1139
1140 let client_option = if config.use_tpu_client_next {
1141 let runtime_handle = tpu_client_next_runtime
1142 .as_ref()
1143 .map(TokioRuntime::handle)
1144 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1145 ClientOption::TpuClientNext(
1146 Arc::as_ref(&identity_keypair),
1147 node.sockets.rpc_sts_client,
1148 runtime_handle.clone(),
1149 cancel_tpu_client_next.clone(),
1150 )
1151 } else {
1152 let Some(connection_cache) = &connection_cache else {
1153 panic!("ConnectionCache should exist by construction.");
1154 };
1155 ClientOption::ConnectionCache(connection_cache.clone())
1156 };
1157 let rpc_svc_config = JsonRpcServiceConfig {
1158 rpc_addr,
1159 rpc_config: config.rpc_config.clone(),
1160 snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1161 bank_forks: bank_forks.clone(),
1162 block_commitment_cache: block_commitment_cache.clone(),
1163 blockstore: blockstore.clone(),
1164 cluster_info: cluster_info.clone(),
1165 poh_recorder: Some(poh_recorder.clone()),
1166 genesis_hash: genesis_config.hash(),
1167 ledger_path: ledger_path.to_path_buf(),
1168 validator_exit: config.validator_exit.clone(),
1169 exit: exit.clone(),
1170 override_health_check: rpc_override_health_check.clone(),
1171 startup_verification_complete,
1172 optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1173 send_transaction_service_config: config.send_transaction_service_config.clone(),
1174 max_slots: max_slots.clone(),
1175 leader_schedule_cache: leader_schedule_cache.clone(),
1176 max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
1177 prioritization_fee_cache: prioritization_fee_cache.clone(),
1178 client_option,
1179 };
1180 let json_rpc_service =
1181 JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1182 let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1183 exit.clone(),
1184 max_complete_transaction_status_slot,
1185 blockstore.clone(),
1186 bank_forks.clone(),
1187 block_commitment_cache.clone(),
1188 optimistically_confirmed_bank.clone(),
1189 &config.pubsub_config,
1190 None,
1191 ));
1192 let pubsub_service = if !config.rpc_config.full_api {
1193 None
1194 } else {
1195 let (trigger, pubsub_service) = PubSubService::new(
1196 config.pubsub_config.clone(),
1197 &rpc_subscriptions,
1198 rpc_pubsub_addr,
1199 );
1200 config
1201 .validator_exit
1202 .write()
1203 .unwrap()
1204 .register_exit(Box::new(move || trigger.cancel()));
1205
1206 Some(pubsub_service)
1207 };
1208
1209 let (completed_data_sets_sender, completed_data_sets_service) =
1210 if !config.rpc_config.full_api {
1211 (None, None)
1212 } else {
1213 let (completed_data_sets_sender, completed_data_sets_receiver) =
1214 bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1215 let completed_data_sets_service = CompletedDataSetsService::new(
1216 completed_data_sets_receiver,
1217 blockstore.clone(),
1218 rpc_subscriptions.clone(),
1219 exit.clone(),
1220 max_slots.clone(),
1221 );
1222 (
1223 Some(completed_data_sets_sender),
1224 Some(completed_data_sets_service),
1225 )
1226 };
1227
1228 let rpc_completed_slots_service =
1229 if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1230 let (completed_slots_sender, completed_slots_receiver) =
1231 bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1232 blockstore.add_completed_slots_signal(completed_slots_sender);
1233
1234 Some(RpcCompletedSlotsService::spawn(
1235 completed_slots_receiver,
1236 rpc_subscriptions.clone(),
1237 slot_status_notifier.clone(),
1238 exit.clone(),
1239 ))
1240 } else {
1241 None
1242 };
1243
1244 let dependency_tracker = transaction_status_sender
1245 .is_some()
1246 .then_some(dependency_tracker);
1247 let optimistically_confirmed_bank_tracker =
1248 Some(OptimisticallyConfirmedBankTracker::new(
1249 bank_notification_receiver,
1250 exit.clone(),
1251 bank_forks.clone(),
1252 optimistically_confirmed_bank,
1253 rpc_subscriptions.clone(),
1254 confirmed_bank_subscribers,
1255 prioritization_fee_cache.clone(),
1256 dependency_tracker.clone(),
1257 ));
1258 let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1259 sender: bank_notification_sender,
1260 should_send_parents: geyser_plugin_service.is_some(),
1261 dependency_tracker,
1262 });
1263 (
1264 Some(json_rpc_service),
1265 Some(rpc_subscriptions),
1266 pubsub_service,
1267 completed_data_sets_sender,
1268 completed_data_sets_service,
1269 rpc_completed_slots_service,
1270 optimistically_confirmed_bank_tracker,
1271 bank_notification_sender_config,
1272 )
1273 } else {
1274 (None, None, None, None, None, None, None, None)
1275 };
1276
1277 if config.halt_at_slot.is_some() {
1278 block_commitment_cache
1281 .write()
1282 .unwrap()
1283 .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1284
1285 warn!("Validator halted");
1287 *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1288 std::thread::park();
1289 }
1290 let ip_echo_server = match node.sockets.ip_echo {
1291 None => None,
1292 Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1293 tcp_listener,
1294 config.ip_echo_server_threads,
1295 Some(node.info.shred_version()),
1296 )),
1297 };
1298
1299 let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1300
1301 let stats_reporter_service =
1302 StatsReporterService::new(stats_reporter_receiver, exit.clone());
1303
1304 let gossip_service = GossipService::new(
1305 &cluster_info,
1306 Some(bank_forks.clone()),
1307 node.sockets.gossip.clone(),
1308 config.gossip_validators.clone(),
1309 should_check_duplicate_instance,
1310 Some(stats_reporter_sender.clone()),
1311 exit.clone(),
1312 );
1313 let serve_repair = config.repair_handler_type.create_serve_repair(
1314 blockstore.clone(),
1315 cluster_info.clone(),
1316 bank_forks.read().unwrap().sharable_root_bank(),
1317 config.repair_whitelist.clone(),
1318 );
1319 let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1320 let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1321 let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1322 unbounded();
1323
1324 let waited_for_supermajority = wait_for_supermajority(
1325 config,
1326 Some(&mut process_blockstore),
1327 &bank_forks,
1328 &cluster_info,
1329 rpc_override_health_check,
1330 &start_progress,
1331 )?;
1332
1333 let blockstore_metric_report_service =
1334 BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1335
1336 let wait_for_vote_to_start_leader =
1337 !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1338
1339 let poh_service = PohService::new(
1340 poh_recorder.clone(),
1341 &genesis_config.poh_config,
1342 exit.clone(),
1343 bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1344 config.poh_pinned_cpu_core,
1345 config.poh_hashes_per_batch,
1346 record_receiver,
1347 );
1348 assert_eq!(
1349 blockstore.get_new_shred_signals_len(),
1350 1,
1351 "New shred signal for the TVU should be the same as the clear bank signal."
1352 );
1353
1354 let vote_tracker = Arc::<VoteTracker>::default();
1355
1356 let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1357 let (verified_vote_sender, verified_vote_receiver) = unbounded();
1358 let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1359 let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1360
1361 let entry_notification_sender = entry_notifier_service
1362 .as_ref()
1363 .map(|service| service.sender_cloned());
1364
1365 let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1366 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1367 .then(|| {
1368 tokio::runtime::Builder::new_multi_thread()
1369 .enable_all()
1370 .thread_name("solTurbineQuic")
1371 .build()
1372 .unwrap()
1373 });
1374 let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1375 let (
1376 turbine_quic_endpoint,
1377 turbine_quic_endpoint_sender,
1378 turbine_quic_endpoint_join_handle,
1379 ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1380 let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1381 (None, sender, None)
1382 } else {
1383 solana_turbine::quic_endpoint::new_quic_endpoint(
1384 turbine_quic_endpoint_runtime
1385 .as_ref()
1386 .map(TokioRuntime::handle)
1387 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1388 &identity_keypair,
1389 node.sockets.tvu_quic,
1390 turbine_quic_endpoint_sender,
1391 bank_forks.clone(),
1392 )
1393 .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1394 .unwrap()
1395 };
1396
1397 let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1399 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1400 .then(|| {
1401 tokio::runtime::Builder::new_multi_thread()
1402 .enable_all()
1403 .thread_name("solRepairQuic")
1404 .build()
1405 .unwrap()
1406 });
1407 let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1408 if genesis_config.cluster_type == ClusterType::MainnetBeta {
1409 (None, RepairQuicAsyncSenders::new_dummy(), None)
1410 } else {
1411 let repair_quic_sockets = RepairQuicSockets {
1412 repair_server_quic_socket: node.sockets.serve_repair_quic,
1413 repair_client_quic_socket: node.sockets.repair_quic,
1414 ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1415 };
1416 let repair_quic_senders = RepairQuicSenders {
1417 repair_request_quic_sender: repair_request_quic_sender.clone(),
1418 repair_response_quic_sender,
1419 ancestor_hashes_response_quic_sender,
1420 };
1421 repair::quic_endpoint::new_quic_endpoints(
1422 repair_quic_endpoints_runtime
1423 .as_ref()
1424 .map(TokioRuntime::handle)
1425 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1426 &identity_keypair,
1427 repair_quic_sockets,
1428 repair_quic_senders,
1429 bank_forks.clone(),
1430 )
1431 .map(|(endpoints, senders, join_handle)| {
1432 (Some(endpoints), senders, Some(join_handle))
1433 })
1434 .unwrap()
1435 };
1436 let serve_repair_service = ServeRepairService::new(
1437 serve_repair,
1438 repair_request_quic_sender,
1441 repair_request_quic_receiver,
1442 repair_quic_async_senders.repair_response_quic_sender,
1443 node.sockets.serve_repair,
1444 socket_addr_space,
1445 stats_reporter_sender,
1446 exit.clone(),
1447 );
1448
1449 let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1450 let wen_restart_repair_slots = if in_wen_restart {
1451 Some(Arc::new(RwLock::new(Vec::new())))
1452 } else {
1453 None
1454 };
1455 let tower = match process_blockstore.process_to_create_tower() {
1456 Ok(tower) => {
1457 info!("Tower state: {tower:?}");
1458 tower
1459 }
1460 Err(e) => {
1461 warn!("Unable to retrieve tower: {e:?} creating default tower....");
1462 Tower::default()
1463 }
1464 };
1465 let last_vote = tower.last_vote();
1466
1467 let outstanding_repair_requests =
1468 Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1469 let cluster_slots =
1470 Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1471
1472 let connection_cache_for_warmup =
1474 if json_rpc_service.is_some() && connection_cache.is_some() {
1475 connection_cache.as_ref()
1476 } else {
1477 None
1478 };
1479 let (xdp_retransmitter, xdp_sender) =
1480 if let Some(xdp_config) = config.retransmit_xdp.clone() {
1481 let src_port = node.sockets.retransmit_sockets[0]
1482 .local_addr()
1483 .expect("failed to get local address")
1484 .port();
1485 let src_ip = match node.bind_ip_addrs.active() {
1486 IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
1487 IpAddr::V4(_unspecified) => xdp_config
1488 .interface
1489 .as_ref()
1490 .and_then(|iface| master_ip_if_bonded(iface)),
1491 _ => panic!("IPv6 not supported"),
1492 };
1493 let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip)
1494 .expect("failed to create xdp retransmitter");
1495 (Some(rtx), Some(sender))
1496 } else {
1497 (None, None)
1498 };
1499
1500 let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet
1502 || genesis_config.cluster_type == ClusterType::Development
1503 {
1504 node.sockets.alpenglow
1505 } else {
1506 None
1507 };
1508
1509 let tvu = Tvu::new(
1510 vote_account,
1511 authorized_voter_keypairs,
1512 &bank_forks,
1513 &cluster_info,
1514 TvuSockets {
1515 repair: node.sockets.repair.try_clone().unwrap(),
1516 retransmit: node.sockets.retransmit_sockets,
1517 fetch: node.sockets.tvu,
1518 ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1519 alpenglow: alpenglow_socket,
1520 },
1521 blockstore.clone(),
1522 ledger_signal_receiver,
1523 rpc_subscriptions.clone(),
1524 &poh_recorder,
1525 tower,
1526 config.tower_storage.clone(),
1527 &leader_schedule_cache,
1528 exit.clone(),
1529 block_commitment_cache,
1530 config.turbine_disabled.clone(),
1531 transaction_status_sender.clone(),
1532 entry_notification_sender.clone(),
1533 vote_tracker.clone(),
1534 retransmit_slots_sender,
1535 gossip_verified_vote_hash_receiver,
1536 verified_vote_receiver,
1537 replay_vote_sender.clone(),
1538 completed_data_sets_sender,
1539 bank_notification_sender.clone(),
1540 duplicate_confirmed_slots_receiver,
1541 TvuConfig {
1542 max_ledger_shreds: config.max_ledger_shreds,
1543 shred_version: node.info.shred_version(),
1544 repair_validators: config.repair_validators.clone(),
1545 repair_whitelist: config.repair_whitelist.clone(),
1546 wait_for_vote_to_start_leader,
1547 replay_forks_threads: config.replay_forks_threads,
1548 replay_transactions_threads: config.replay_transactions_threads,
1549 shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1550 xdp_sender: xdp_sender.clone(),
1551 },
1552 &max_slots,
1553 block_metadata_notifier,
1554 config.wait_to_vote_slot,
1555 Some(snapshot_controller.clone()),
1556 config.runtime_config.log_messages_bytes_limit,
1557 connection_cache_for_warmup,
1558 &prioritization_fee_cache,
1559 banking_tracer.clone(),
1560 turbine_quic_endpoint_sender.clone(),
1561 turbine_quic_endpoint_receiver,
1562 repair_response_quic_receiver,
1563 repair_quic_async_senders.repair_request_quic_sender,
1564 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1565 ancestor_hashes_response_quic_receiver,
1566 outstanding_repair_requests.clone(),
1567 cluster_slots.clone(),
1568 wen_restart_repair_slots.clone(),
1569 slot_status_notifier,
1570 vote_connection_cache,
1571 )
1572 .map_err(ValidatorError::Other)?;
1573
1574 if in_wen_restart {
1575 info!("Waiting for wen_restart to finish");
1576 wait_for_wen_restart(WenRestartConfig {
1577 wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1578 wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1579 last_vote,
1580 blockstore: blockstore.clone(),
1581 cluster_info: cluster_info.clone(),
1582 bank_forks: bank_forks.clone(),
1583 wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1584 wait_for_supermajority_threshold_percent:
1585 WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1586 snapshot_controller: Some(snapshot_controller.clone()),
1587 abs_status: accounts_background_service.status().clone(),
1588 genesis_config_hash: genesis_config.hash(),
1589 exit: exit.clone(),
1590 })?;
1591 return Err(ValidatorError::WenRestartFinished.into());
1592 }
1593
1594 let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1595 let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1596 ForwardingClientOption::ConnectionCache(connection_cache.clone())
1597 } else {
1598 let runtime_handle = tpu_client_next_runtime
1599 .as_ref()
1600 .map(TokioRuntime::handle)
1601 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1602 ForwardingClientOption::TpuClientNext((
1603 Arc::as_ref(&identity_keypair),
1604 tpu_transactions_forwards_client
1605 .take()
1606 .expect("Socket should exist."),
1607 runtime_handle.clone(),
1608 cancel_tpu_client_next,
1609 ))
1610 };
1611 let tpu = Tpu::new_with_client(
1612 &cluster_info,
1613 &poh_recorder,
1614 transaction_recorder,
1615 entry_receiver,
1616 retransmit_slots_receiver,
1617 TpuSockets {
1618 transactions: node.sockets.tpu,
1619 transaction_forwards: node.sockets.tpu_forwards,
1620 vote: node.sockets.tpu_vote,
1621 broadcast: node.sockets.broadcast,
1622 transactions_quic: node.sockets.tpu_quic,
1623 transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1624 vote_quic: node.sockets.tpu_vote_quic,
1625 vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1626 vortexor_receivers: node.sockets.vortexor_receivers,
1627 },
1628 rpc_subscriptions.clone(),
1629 transaction_status_sender,
1630 entry_notification_sender,
1631 blockstore.clone(),
1632 &config.broadcast_stage_type,
1633 xdp_sender,
1634 exit,
1635 node.info.shred_version(),
1636 vote_tracker,
1637 bank_forks.clone(),
1638 verified_vote_sender,
1639 gossip_verified_vote_hash_sender,
1640 replay_vote_receiver,
1641 replay_vote_sender,
1642 bank_notification_sender,
1643 config.tpu_coalesce,
1644 duplicate_confirmed_slot_sender,
1645 forwarding_tpu_client,
1646 turbine_quic_endpoint_sender,
1647 &identity_keypair,
1648 config.runtime_config.log_messages_bytes_limit,
1649 &staked_nodes,
1650 config.staked_nodes_overrides.clone(),
1651 banking_tracer_channels,
1652 tracer_thread,
1653 tpu_enable_udp,
1654 tpu_quic_server_config,
1655 tpu_fwd_quic_server_config,
1656 vote_quic_server_config,
1657 &prioritization_fee_cache,
1658 config.block_production_method.clone(),
1659 config.block_production_num_workers,
1660 config.transaction_struct.clone(),
1661 config.enable_block_production_forwarding,
1662 config.generator_config.clone(),
1663 key_notifiers.clone(),
1664 );
1665
1666 datapoint_info!(
1667 "validator-new",
1668 ("id", id.to_string(), String),
1669 ("version", solana_version::version!(), String),
1670 ("cluster_type", genesis_config.cluster_type as u32, i64),
1671 ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1672 ("waited_for_supermajority", waited_for_supermajority, bool),
1673 ("shred_version", shred_version as i64, i64),
1674 );
1675
1676 *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1677 if config.use_tpu_client_next {
1678 if let Some(json_rpc_service) = &json_rpc_service {
1679 key_notifiers.write().unwrap().add(
1680 KeyUpdaterType::RpcService,
1681 json_rpc_service.get_client_key_updater(),
1682 );
1683 }
1684 }
1687
1688 *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1689 bank_forks: bank_forks.clone(),
1690 cluster_info: cluster_info.clone(),
1691 vote_account: *vote_account,
1692 repair_whitelist: config.repair_whitelist.clone(),
1693 notifies: key_notifiers,
1694 repair_socket: Arc::new(node.sockets.repair),
1695 outstanding_repair_requests,
1696 cluster_slots,
1697 node: Some(Arc::new(node_multihoming)),
1698 });
1699
1700 Ok(Self {
1701 stats_reporter_service,
1702 gossip_service,
1703 serve_repair_service,
1704 json_rpc_service,
1705 pubsub_service,
1706 rpc_completed_slots_service,
1707 optimistically_confirmed_bank_tracker,
1708 transaction_status_service,
1709 entry_notifier_service,
1710 system_monitor_service,
1711 sample_performance_service,
1712 snapshot_packager_service,
1713 completed_data_sets_service,
1714 tpu,
1715 tvu,
1716 poh_service,
1717 poh_recorder,
1718 ip_echo_server,
1719 validator_exit: config.validator_exit.clone(),
1720 cluster_info,
1721 bank_forks,
1722 blockstore,
1723 geyser_plugin_service,
1724 blockstore_metric_report_service,
1725 accounts_background_service,
1726 turbine_quic_endpoint,
1727 turbine_quic_endpoint_runtime,
1728 turbine_quic_endpoint_join_handle,
1729 repair_quic_endpoints,
1730 repair_quic_endpoints_runtime,
1731 repair_quic_endpoints_join_handle,
1732 xdp_retransmitter,
1733 _tpu_client_next_runtime: tpu_client_next_runtime,
1734 })
1735 }
1736
1737 pub fn exit(&mut self) {
1739 self.validator_exit.write().unwrap().exit();
1740
1741 self.blockstore.drop_signal();
1743 }
1744
1745 pub fn close(mut self) {
1746 self.exit();
1747 self.join();
1748 }
1749
1750 fn print_node_info(node: &Node) {
1751 info!("{:?}", node.info);
1752 info!(
1753 "local gossip address: {}",
1754 node.sockets.gossip[0].local_addr().unwrap()
1755 );
1756 info!(
1757 "local broadcast address: {}",
1758 node.sockets
1759 .broadcast
1760 .first()
1761 .unwrap()
1762 .local_addr()
1763 .unwrap()
1764 );
1765 info!(
1766 "local repair address: {}",
1767 node.sockets.repair.local_addr().unwrap()
1768 );
1769 info!(
1770 "local retransmit address: {}",
1771 node.sockets.retransmit_sockets[0].local_addr().unwrap()
1772 );
1773 }
1774
1775 pub fn join(self) {
1776 drop(self.bank_forks);
1777 drop(self.cluster_info);
1778
1779 self.poh_service.join().expect("poh_service");
1780 drop(self.poh_recorder);
1781
1782 if let Some(json_rpc_service) = self.json_rpc_service {
1783 json_rpc_service.join().expect("rpc_service");
1784 }
1785
1786 if let Some(pubsub_service) = self.pubsub_service {
1787 pubsub_service.join().expect("pubsub_service");
1788 }
1789
1790 if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1791 rpc_completed_slots_service
1792 .join()
1793 .expect("rpc_completed_slots_service");
1794 }
1795
1796 if let Some(optimistically_confirmed_bank_tracker) =
1797 self.optimistically_confirmed_bank_tracker
1798 {
1799 optimistically_confirmed_bank_tracker
1800 .join()
1801 .expect("optimistically_confirmed_bank_tracker");
1802 }
1803
1804 if let Some(transaction_status_service) = self.transaction_status_service {
1805 transaction_status_service
1806 .join()
1807 .expect("transaction_status_service");
1808 }
1809
1810 if let Some(system_monitor_service) = self.system_monitor_service {
1811 system_monitor_service
1812 .join()
1813 .expect("system_monitor_service");
1814 }
1815
1816 if let Some(sample_performance_service) = self.sample_performance_service {
1817 sample_performance_service
1818 .join()
1819 .expect("sample_performance_service");
1820 }
1821
1822 if let Some(entry_notifier_service) = self.entry_notifier_service {
1823 entry_notifier_service
1824 .join()
1825 .expect("entry_notifier_service");
1826 }
1827
1828 if let Some(s) = self.snapshot_packager_service {
1829 s.join().expect("snapshot_packager_service");
1830 }
1831
1832 self.gossip_service.join().expect("gossip_service");
1833 self.repair_quic_endpoints
1834 .iter()
1835 .flatten()
1836 .for_each(repair::quic_endpoint::close_quic_endpoint);
1837 self.serve_repair_service
1838 .join()
1839 .expect("serve_repair_service");
1840 if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1841 self.repair_quic_endpoints_runtime
1842 .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1843 .transpose()
1844 .unwrap();
1845 }
1846 self.stats_reporter_service
1847 .join()
1848 .expect("stats_reporter_service");
1849 self.blockstore_metric_report_service
1850 .join()
1851 .expect("ledger_metric_report_service");
1852 self.accounts_background_service
1853 .join()
1854 .expect("accounts_background_service");
1855 if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1856 solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1857 }
1858 if let Some(xdp_retransmitter) = self.xdp_retransmitter {
1859 xdp_retransmitter.join().expect("xdp_retransmitter");
1860 }
1861 self.tpu.join().expect("tpu");
1862 self.tvu.join().expect("tvu");
1863 if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1864 self.turbine_quic_endpoint_runtime
1865 .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1866 .transpose()
1867 .unwrap();
1868 }
1869 if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1870 completed_data_sets_service
1871 .join()
1872 .expect("completed_data_sets_service");
1873 }
1874 if let Some(ip_echo_server) = self.ip_echo_server {
1875 ip_echo_server.shutdown_background();
1876 }
1877
1878 if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1879 geyser_plugin_service.join().expect("geyser_plugin_service");
1880 }
1881 }
1882}
1883
1884fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1885 if let Some(account) = &bank.get_account(vote_account) {
1886 if let Some(vote_state) = vote_state::from(account) {
1887 return !vote_state.votes.is_empty();
1888 }
1889 }
1890 false
1891}
1892
1893fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1894 let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1895 warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1896 return Ok(());
1897 };
1898
1899 let ticks_per_slot = bank.ticks_per_slot();
1900 let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1901 let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1902
1903 let hash_time = compute_hash_time(hash_samples);
1904 let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1905
1906 let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1907 let target_hashes_per_second =
1908 (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1909
1910 info!(
1911 "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1912 second {target_hashes_per_second}"
1913 );
1914 if my_hashes_per_second < target_hashes_per_second {
1915 return Err(ValidatorError::PohTooSlow {
1916 mine: my_hashes_per_second,
1917 target: target_hashes_per_second,
1918 });
1919 }
1920
1921 Ok(())
1922}
1923
1924fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1925 if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1927 if wait_slot_for_supermajority == root_slot {
1928 return Some(wait_slot_for_supermajority);
1929 }
1930 }
1931
1932 None
1933}
1934
1935fn post_process_restored_tower(
1936 restored_tower: crate::consensus::Result<Tower>,
1937 validator_identity: &Pubkey,
1938 vote_account: &Pubkey,
1939 config: &ValidatorConfig,
1940 bank_forks: &BankForks,
1941) -> Result<Tower, String> {
1942 let mut should_require_tower = config.require_tower;
1943
1944 let restored_tower = restored_tower.and_then(|tower| {
1945 let root_bank = bank_forks.root_bank();
1946 let slot_history = root_bank.get_slot_history();
1947 let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1949
1950 if let Some(hard_fork_restart_slot) =
1951 maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1952 {
1953 let message =
1957 format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1958 datapoint_error!("tower_error", ("error", message, String),);
1959 error!("{message}");
1960
1961 should_require_tower = false;
1964 return Err(crate::consensus::TowerError::HardFork(
1965 hard_fork_restart_slot,
1966 ));
1967 }
1968
1969 if let Some(warp_slot) = config.warp_slot {
1970 should_require_tower = false;
1973 return Err(crate::consensus::TowerError::HardFork(warp_slot));
1974 }
1975
1976 tower
1977 });
1978
1979 let restored_tower = match restored_tower {
1980 Ok(tower) => tower,
1981 Err(err) => {
1982 let voting_has_been_active =
1983 active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1984 if !err.is_file_missing() {
1985 datapoint_error!(
1986 "tower_error",
1987 ("error", format!("Unable to restore tower: {err}"), String),
1988 );
1989 }
1990 if should_require_tower && voting_has_been_active {
1991 return Err(format!(
1992 "Requested mandatory tower restore failed: {err}. And there is an existing \
1993 vote_account containing actual votes. Aborting due to possible conflicting \
1994 duplicate votes"
1995 ));
1996 }
1997 if err.is_file_missing() && !voting_has_been_active {
1998 info!(
2000 "Ignoring expected failed tower restore because this is the initial validator \
2001 start with the vote account..."
2002 );
2003 } else {
2004 error!(
2005 "Rebuilding a new tower from the latest vote account due to failed tower \
2006 restore: {err}"
2007 );
2008 }
2009
2010 Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2011 }
2012 };
2013
2014 Ok(restored_tower)
2015}
2016
2017fn load_genesis(
2018 config: &ValidatorConfig,
2019 ledger_path: &Path,
2020) -> Result<GenesisConfig, ValidatorError> {
2021 let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2022 .map_err(ValidatorError::OpenGenesisConfig)?;
2023
2024 let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2027 let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2028 let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2029 assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2030
2031 let genesis_hash = genesis_config.hash();
2032 info!("genesis hash: {genesis_hash}");
2033
2034 if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2035 if genesis_hash != expected_genesis_hash {
2036 return Err(ValidatorError::GenesisHashMismatch(
2037 genesis_hash,
2038 expected_genesis_hash,
2039 ));
2040 }
2041 }
2042
2043 Ok(genesis_config)
2044}
2045
2046#[allow(clippy::type_complexity)]
2047fn load_blockstore(
2048 config: &ValidatorConfig,
2049 ledger_path: &Path,
2050 genesis_config: &GenesisConfig,
2051 exit: Arc<AtomicBool>,
2052 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2053 accounts_update_notifier: Option<AccountsUpdateNotifier>,
2054 transaction_notifier: Option<TransactionNotifierArc>,
2055 entry_notifier: Option<EntryNotifierArc>,
2056 dependency_tracker: Option<Arc<DependencyTracker>>,
2057) -> Result<
2058 (
2059 Arc<RwLock<BankForks>>,
2060 Arc<Blockstore>,
2061 Slot,
2062 Receiver<bool>,
2063 LeaderScheduleCache,
2064 Option<StartingSnapshotHashes>,
2065 TransactionHistoryServices,
2066 blockstore_processor::ProcessOptions,
2067 BlockstoreRootScan,
2068 DroppedSlotsReceiver,
2069 Option<EntryNotifierService>,
2070 ),
2071 String,
2072> {
2073 info!("loading ledger from {ledger_path:?}...");
2074 *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2075
2076 let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2077 .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2078
2079 let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2080 blockstore.add_new_shred_signal(ledger_signal_sender);
2081
2082 let original_blockstore_root = blockstore.max_root();
2085
2086 let blockstore = Arc::new(blockstore);
2087 let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2088 let halt_at_slot = config
2089 .halt_at_slot
2090 .or_else(|| blockstore.highest_slot().unwrap_or(None));
2091
2092 let process_options = blockstore_processor::ProcessOptions {
2093 run_verification: config.run_verification,
2094 halt_at_slot,
2095 new_hard_forks: config.new_hard_forks.clone(),
2096 debug_keys: config.debug_keys.clone(),
2097 accounts_db_config: config.accounts_db_config.clone(),
2098 accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2099 accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2100 runtime_config: config.runtime_config.clone(),
2101 use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2102 ..blockstore_processor::ProcessOptions::default()
2103 };
2104
2105 let enable_rpc_transaction_history =
2106 config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2107 let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2108 let transaction_history_services =
2109 if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2110 initialize_rpc_transaction_history_services(
2111 blockstore.clone(),
2112 exit.clone(),
2113 enable_rpc_transaction_history,
2114 config.rpc_config.enable_extended_tx_metadata_storage,
2115 transaction_notifier,
2116 dependency_tracker,
2117 )
2118 } else {
2119 TransactionHistoryServices::default()
2120 };
2121
2122 let entry_notifier_service = entry_notifier
2123 .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2124
2125 let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2126 bank_forks_utils::load_bank_forks(
2127 genesis_config,
2128 &blockstore,
2129 config.account_paths.clone(),
2130 &config.snapshot_config,
2131 &process_options,
2132 transaction_history_services
2133 .transaction_status_sender
2134 .as_ref(),
2135 entry_notifier_service
2136 .as_ref()
2137 .map(|service| service.sender()),
2138 accounts_update_notifier,
2139 exit,
2140 )
2141 .map_err(|err| err.to_string())?;
2142
2143 let pruned_banks_receiver =
2149 AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2150
2151 leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2152
2153 Ok((
2154 bank_forks,
2155 blockstore,
2156 original_blockstore_root,
2157 ledger_signal_receiver,
2158 leader_schedule_cache,
2159 starting_snapshot_hashes,
2160 transaction_history_services,
2161 process_options,
2162 blockstore_root_scan,
2163 pruned_banks_receiver,
2164 entry_notifier_service,
2165 ))
2166}
2167
2168pub struct ProcessBlockStore<'a> {
2169 id: &'a Pubkey,
2170 vote_account: &'a Pubkey,
2171 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2172 blockstore: &'a Blockstore,
2173 original_blockstore_root: Slot,
2174 bank_forks: &'a Arc<RwLock<BankForks>>,
2175 leader_schedule_cache: &'a LeaderScheduleCache,
2176 process_options: &'a blockstore_processor::ProcessOptions,
2177 transaction_status_sender: Option<&'a TransactionStatusSender>,
2178 entry_notification_sender: Option<&'a EntryNotifierSender>,
2179 blockstore_root_scan: Option<BlockstoreRootScan>,
2180 snapshot_controller: &'a SnapshotController,
2181 config: &'a ValidatorConfig,
2182 tower: Option<Tower>,
2183}
2184
2185impl<'a> ProcessBlockStore<'a> {
2186 #[allow(clippy::too_many_arguments)]
2187 fn new(
2188 id: &'a Pubkey,
2189 vote_account: &'a Pubkey,
2190 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2191 blockstore: &'a Blockstore,
2192 original_blockstore_root: Slot,
2193 bank_forks: &'a Arc<RwLock<BankForks>>,
2194 leader_schedule_cache: &'a LeaderScheduleCache,
2195 process_options: &'a blockstore_processor::ProcessOptions,
2196 transaction_status_sender: Option<&'a TransactionStatusSender>,
2197 entry_notification_sender: Option<&'a EntryNotifierSender>,
2198 blockstore_root_scan: BlockstoreRootScan,
2199 snapshot_controller: &'a SnapshotController,
2200 config: &'a ValidatorConfig,
2201 ) -> Self {
2202 Self {
2203 id,
2204 vote_account,
2205 start_progress,
2206 blockstore,
2207 original_blockstore_root,
2208 bank_forks,
2209 leader_schedule_cache,
2210 process_options,
2211 transaction_status_sender,
2212 entry_notification_sender,
2213 blockstore_root_scan: Some(blockstore_root_scan),
2214 snapshot_controller,
2215 config,
2216 tower: None,
2217 }
2218 }
2219
2220 pub(crate) fn process(&mut self) -> Result<(), String> {
2221 if self.tower.is_none() {
2222 let previous_start_process = *self.start_progress.read().unwrap();
2223 *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2224
2225 let exit = Arc::new(AtomicBool::new(false));
2226 if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2227 let bank_forks = self.bank_forks.clone();
2228 let exit = exit.clone();
2229 let start_progress = self.start_progress.clone();
2230
2231 let _ = Builder::new()
2232 .name("solRptLdgrStat".to_string())
2233 .spawn(move || {
2234 while !exit.load(Ordering::Relaxed) {
2235 let slot = bank_forks.read().unwrap().working_bank().slot();
2236 *start_progress.write().unwrap() =
2237 ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2238 sleep(Duration::from_secs(2));
2239 }
2240 })
2241 .unwrap();
2242 }
2243 blockstore_processor::process_blockstore_from_root(
2244 self.blockstore,
2245 self.bank_forks,
2246 self.leader_schedule_cache,
2247 self.process_options,
2248 self.transaction_status_sender,
2249 self.entry_notification_sender,
2250 Some(self.snapshot_controller),
2251 )
2252 .map_err(|err| {
2253 exit.store(true, Ordering::Relaxed);
2254 format!("Failed to load ledger: {err:?}")
2255 })?;
2256 exit.store(true, Ordering::Relaxed);
2257
2258 if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2259 blockstore_root_scan.join();
2260 }
2261
2262 self.tower = Some({
2263 let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2264 if let Ok(tower) = &restored_tower {
2265 reconcile_blockstore_roots_with_external_source(
2267 ExternalRootSource::Tower(tower.root()),
2268 self.blockstore,
2269 &mut self.original_blockstore_root,
2270 )
2271 .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2272 }
2273
2274 post_process_restored_tower(
2275 restored_tower,
2276 self.id,
2277 self.vote_account,
2278 self.config,
2279 &self.bank_forks.read().unwrap(),
2280 )?
2281 });
2282
2283 if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2284 self.config,
2285 self.bank_forks.read().unwrap().root(),
2286 ) {
2287 reconcile_blockstore_roots_with_external_source(
2290 ExternalRootSource::HardFork(hard_fork_restart_slot),
2291 self.blockstore,
2292 &mut self.original_blockstore_root,
2293 )
2294 .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2295 }
2296
2297 *self.start_progress.write().unwrap() = previous_start_process;
2298 }
2299 Ok(())
2300 }
2301
2302 pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2303 self.process()?;
2304 Ok(self.tower.unwrap())
2305 }
2306}
2307
2308fn maybe_warp_slot(
2309 config: &ValidatorConfig,
2310 process_blockstore: &mut ProcessBlockStore,
2311 ledger_path: &Path,
2312 bank_forks: &RwLock<BankForks>,
2313 leader_schedule_cache: &LeaderScheduleCache,
2314 snapshot_controller: &SnapshotController,
2315) -> Result<(), String> {
2316 if let Some(warp_slot) = config.warp_slot {
2317 let mut bank_forks = bank_forks.write().unwrap();
2318
2319 let working_bank = bank_forks.working_bank();
2320
2321 if warp_slot <= working_bank.slot() {
2322 return Err(format!(
2323 "warp slot ({}) cannot be less than the working bank slot ({})",
2324 warp_slot,
2325 working_bank.slot()
2326 ));
2327 }
2328 info!("warping to slot {warp_slot}");
2329
2330 let root_bank = bank_forks.root_bank();
2331
2332 root_bank.squash();
2336 root_bank.force_flush_accounts_cache();
2337
2338 bank_forks.insert(Bank::warp_from_parent(
2339 root_bank,
2340 &Pubkey::default(),
2341 warp_slot,
2342 ));
2343 bank_forks
2344 .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2345 .map_err(|err| err.to_string())?;
2346 leader_schedule_cache.set_root(&bank_forks.root_bank());
2347
2348 let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2349 ledger_path,
2350 &bank_forks.root_bank(),
2351 None,
2352 &config.snapshot_config.full_snapshot_archives_dir,
2353 &config.snapshot_config.incremental_snapshot_archives_dir,
2354 config.snapshot_config.archive_format,
2355 ) {
2356 Ok(archive_info) => archive_info,
2357 Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2358 };
2359 info!(
2360 "created snapshot: {}",
2361 full_snapshot_archive_info.path().display()
2362 );
2363
2364 drop(bank_forks);
2365 process_blockstore.process()?;
2368 }
2369 Ok(())
2370}
2371
2372fn should_cleanup_blockstore_incorrect_shred_versions(
2375 config: &ValidatorConfig,
2376 blockstore: &Blockstore,
2377 root_slot: Slot,
2378 hard_forks: &HardForks,
2379) -> Result<Option<Slot>, BlockstoreError> {
2380 let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2382 if maybe_cluster_restart_slot.is_some() {
2383 return Ok(Some(root_slot + 1));
2384 }
2385
2386 let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2388 return Ok(None);
2389 };
2390
2391 let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2393 return Ok(None);
2394 };
2395 let blockstore_min_slot = blockstore.lowest_slot();
2396 info!(
2397 "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2398 latest hard fork is {latest_hard_fork}"
2399 );
2400
2401 if latest_hard_fork < blockstore_min_slot {
2402 Ok(None)
2410 } else if latest_hard_fork < blockstore_max_slot {
2411 Ok(Some(latest_hard_fork + 1))
2422 } else {
2423 Ok(None)
2431 }
2432}
2433
2434fn scan_blockstore_for_incorrect_shred_version(
2437 blockstore: &Blockstore,
2438 start_slot: Slot,
2439 expected_shred_version: u16,
2440) -> Result<Option<u16>, BlockstoreError> {
2441 const TIMEOUT: Duration = Duration::from_secs(60);
2442 let timer = Instant::now();
2443 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2445
2446 info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2447 for (slot, _meta) in slot_meta_iterator {
2448 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2449 for shred in &shreds {
2450 if shred.version() != expected_shred_version {
2451 return Ok(Some(shred.version()));
2452 }
2453 }
2454 if timer.elapsed() > TIMEOUT {
2455 info!("Didn't find incorrect shreds after 60 seconds, aborting");
2456 break;
2457 }
2458 }
2459 Ok(None)
2460}
2461
2462fn cleanup_blockstore_incorrect_shred_versions(
2465 blockstore: &Blockstore,
2466 config: &ValidatorConfig,
2467 start_slot: Slot,
2468 expected_shred_version: u16,
2469) -> Result<(), BlockstoreError> {
2470 let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2471 blockstore,
2472 start_slot,
2473 expected_shred_version,
2474 )?;
2475 let Some(incorrect_shred_version) = incorrect_shred_version else {
2476 info!("Only shreds with the correct version were found in the blockstore");
2477 return Ok(());
2478 };
2479
2480 let end_slot = blockstore.highest_slot()?.unwrap();
2482
2483 let backup_folder = format!(
2486 "{BLOCKSTORE_DIRECTORY_ROCKS_LEVEL}_backup_{incorrect_shred_version}_{start_slot}_{end_slot}"
2487 );
2488 match Blockstore::open_with_options(
2489 &blockstore.ledger_path().join(backup_folder),
2490 config.blockstore_options.clone(),
2491 ) {
2492 Ok(backup_blockstore) => {
2493 info!("Backing up slots from {start_slot} to {end_slot}");
2494 let mut timer = Measure::start("blockstore backup");
2495
2496 const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2497 let mut print_timer = Instant::now();
2498 let mut num_slots_copied = 0;
2499 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2500 for (slot, _meta) in slot_meta_iterator {
2501 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2502 let shreds = shreds.into_iter().map(Cow::Owned);
2503 let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2504 num_slots_copied += 1;
2505
2506 if print_timer.elapsed() > PRINT_INTERVAL {
2507 info!("Backed up {num_slots_copied} slots thus far");
2508 print_timer = Instant::now();
2509 }
2510 }
2511
2512 timer.stop();
2513 info!("Backing up slots done. {timer}");
2514 }
2515 Err(err) => {
2516 warn!("Unable to backup shreds with incorrect shred version: {err}");
2517 }
2518 }
2519
2520 info!("Purging slots {start_slot} to {end_slot} from blockstore");
2521 let mut timer = Measure::start("blockstore purge");
2522 blockstore.purge_from_next_slots(start_slot, end_slot);
2523 blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2524 timer.stop();
2525 info!("Purging slots done. {timer}");
2526
2527 Ok(())
2528}
2529
2530fn initialize_rpc_transaction_history_services(
2531 blockstore: Arc<Blockstore>,
2532 exit: Arc<AtomicBool>,
2533 enable_rpc_transaction_history: bool,
2534 enable_extended_tx_metadata_storage: bool,
2535 transaction_notifier: Option<TransactionNotifierArc>,
2536 dependency_tracker: Option<Arc<DependencyTracker>>,
2537) -> TransactionHistoryServices {
2538 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2539 let (transaction_status_sender, transaction_status_receiver) = unbounded();
2540 let transaction_status_sender = Some(TransactionStatusSender {
2541 sender: transaction_status_sender,
2542 dependency_tracker: dependency_tracker.clone(),
2543 });
2544 let transaction_status_service = Some(TransactionStatusService::new(
2545 transaction_status_receiver,
2546 max_complete_transaction_status_slot.clone(),
2547 enable_rpc_transaction_history,
2548 transaction_notifier,
2549 blockstore.clone(),
2550 enable_extended_tx_metadata_storage,
2551 dependency_tracker,
2552 exit.clone(),
2553 ));
2554
2555 TransactionHistoryServices {
2556 transaction_status_sender,
2557 transaction_status_service,
2558 max_complete_transaction_status_slot,
2559 }
2560}
2561
2562#[derive(Error, Debug)]
2563pub enum ValidatorError {
2564 #[error("bank hash mismatch: actual={0}, expected={1}")]
2565 BankHashMismatch(Hash, Hash),
2566
2567 #[error("blockstore error: {0}")]
2568 Blockstore(#[source] BlockstoreError),
2569
2570 #[error("genesis hash mismatch: actual={0}, expected={1}")]
2571 GenesisHashMismatch(Hash, Hash),
2572
2573 #[error(
2574 "ledger does not have enough data to wait for supermajority: current slot={0}, needed \
2575 slot={1}"
2576 )]
2577 NotEnoughLedgerData(Slot, Slot),
2578
2579 #[error("failed to open genesis: {0}")]
2580 OpenGenesisConfig(#[source] OpenGenesisConfigError),
2581
2582 #[error("{0}")]
2583 Other(String),
2584
2585 #[error(
2586 "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2587 )]
2588 PohTooSlow { mine: u64, target: u64 },
2589
2590 #[error("shred version mismatch: actual {actual}, expected {expected}")]
2591 ShredVersionMismatch { actual: u16, expected: u16 },
2592
2593 #[error(transparent)]
2594 TraceError(#[from] TraceError),
2595
2596 #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2597 WenRestartFinished,
2598}
2599
2600fn wait_for_supermajority(
2607 config: &ValidatorConfig,
2608 process_blockstore: Option<&mut ProcessBlockStore>,
2609 bank_forks: &RwLock<BankForks>,
2610 cluster_info: &ClusterInfo,
2611 rpc_override_health_check: Arc<AtomicBool>,
2612 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2613) -> Result<bool, ValidatorError> {
2614 match config.wait_for_supermajority {
2615 None => Ok(false),
2616 Some(wait_for_supermajority_slot) => {
2617 if let Some(process_blockstore) = process_blockstore {
2618 process_blockstore
2619 .process()
2620 .map_err(ValidatorError::Other)?;
2621 }
2622
2623 let bank = bank_forks.read().unwrap().working_bank();
2624 match wait_for_supermajority_slot.cmp(&bank.slot()) {
2625 std::cmp::Ordering::Less => return Ok(false),
2626 std::cmp::Ordering::Greater => {
2627 return Err(ValidatorError::NotEnoughLedgerData(
2628 bank.slot(),
2629 wait_for_supermajority_slot,
2630 ));
2631 }
2632 _ => {}
2633 }
2634
2635 if let Some(expected_bank_hash) = config.expected_bank_hash {
2636 if bank.hash() != expected_bank_hash {
2637 return Err(ValidatorError::BankHashMismatch(
2638 bank.hash(),
2639 expected_bank_hash,
2640 ));
2641 }
2642 }
2643
2644 for i in 1.. {
2645 let logging = i % 10 == 1;
2646 if logging {
2647 info!(
2648 "Waiting for {}% of activated stake at slot {} to be in gossip...",
2649 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2650 bank.slot()
2651 );
2652 }
2653
2654 let gossip_stake_percent =
2655 get_stake_percent_in_gossip(&bank, cluster_info, logging);
2656
2657 *start_progress.write().unwrap() =
2658 ValidatorStartProgress::WaitingForSupermajority {
2659 slot: wait_for_supermajority_slot,
2660 gossip_stake_percent,
2661 };
2662
2663 if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2664 info!(
2665 "Supermajority reached, {gossip_stake_percent}% active stake detected, \
2666 starting up now.",
2667 );
2668 break;
2669 }
2670 rpc_override_health_check.store(true, Ordering::Relaxed);
2674 sleep(Duration::new(1, 0));
2675 }
2676 rpc_override_health_check.store(false, Ordering::Relaxed);
2677 Ok(true)
2678 }
2679 }
2680}
2681
2682fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2684 let mut online_stake = 0;
2685 let mut wrong_shred_stake = 0;
2686 let mut wrong_shred_nodes = vec![];
2687 let mut offline_stake = 0;
2688 let mut offline_nodes = vec![];
2689
2690 let mut total_activated_stake = 0;
2691 let now = timestamp();
2692 let peers: HashMap<_, _> = cluster_info
2696 .tvu_peers(ContactInfo::clone)
2697 .into_iter()
2698 .filter(|node| {
2699 let age = now.saturating_sub(node.wallclock());
2700 age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2702 })
2703 .map(|node| (*node.pubkey(), node))
2704 .collect();
2705 let my_shred_version = cluster_info.my_shred_version();
2706 let my_id = cluster_info.id();
2707
2708 for (activated_stake, vote_account) in bank.vote_accounts().values() {
2709 let activated_stake = *activated_stake;
2710 total_activated_stake += activated_stake;
2711
2712 if activated_stake == 0 {
2713 continue;
2714 }
2715 let vote_state_node_pubkey = *vote_account.node_pubkey();
2716
2717 if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2718 if peer.shred_version() == my_shred_version {
2719 trace!(
2720 "observed {vote_state_node_pubkey} in gossip, \
2721 (activated_stake={activated_stake})"
2722 );
2723 online_stake += activated_stake;
2724 } else {
2725 wrong_shred_stake += activated_stake;
2726 wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2727 }
2728 } else if vote_state_node_pubkey == my_id {
2729 online_stake += activated_stake; } else {
2731 offline_stake += activated_stake;
2732 offline_nodes.push((activated_stake, vote_state_node_pubkey));
2733 }
2734 }
2735
2736 let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2737 if log {
2738 info!("{online_stake_percentage:.3}% of active stake visible in gossip");
2739
2740 if !wrong_shred_nodes.is_empty() {
2741 info!(
2742 "{:.3}% of active stake has the wrong shred version in gossip",
2743 (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2744 );
2745 wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in wrong_shred_nodes {
2747 info!(
2748 " {:.3}% - {}",
2749 (stake as f64 / total_activated_stake as f64) * 100.,
2750 identity
2751 );
2752 }
2753 }
2754
2755 if !offline_nodes.is_empty() {
2756 info!(
2757 "{:.3}% of active stake is not visible in gossip",
2758 (offline_stake as f64 / total_activated_stake as f64) * 100.
2759 );
2760 offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in offline_nodes {
2762 info!(
2763 " {:.3}% - {}",
2764 (stake as f64 / total_activated_stake as f64) * 100.,
2765 identity
2766 );
2767 }
2768 }
2769 }
2770
2771 online_stake_percentage as u64
2772}
2773
2774fn cleanup_accounts_paths(config: &ValidatorConfig) {
2775 for account_path in &config.account_paths {
2776 move_and_async_delete_path_contents(account_path);
2777 }
2778 if let Some(shrink_paths) = config
2779 .accounts_db_config
2780 .as_ref()
2781 .and_then(|config| config.shrink_paths.as_ref())
2782 {
2783 for shrink_path in shrink_paths {
2784 move_and_async_delete_path_contents(shrink_path);
2785 }
2786 }
2787}
2788
2789pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2790 if !snapshot_config.should_generate_snapshots() {
2792 return true;
2793 }
2794
2795 let SnapshotInterval::Slots(full_snapshot_interval_slots) =
2796 snapshot_config.full_snapshot_archive_interval
2797 else {
2798 return false;
2800 };
2801
2802 match snapshot_config.incremental_snapshot_archive_interval {
2803 SnapshotInterval::Disabled => true,
2804 SnapshotInterval::Slots(incremental_snapshot_interval_slots) => {
2805 full_snapshot_interval_slots > incremental_snapshot_interval_slots
2806 }
2807 }
2808}
2809
2810#[cfg(test)]
2811mod tests {
2812 use {
2813 super::*,
2814 crossbeam_channel::{bounded, RecvTimeoutError},
2815 solana_entry::entry,
2816 solana_genesis_config::create_genesis_config,
2817 solana_gossip::contact_info::ContactInfo,
2818 solana_ledger::{
2819 blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2820 get_tmp_ledger_path_auto_delete,
2821 },
2822 solana_poh_config::PohConfig,
2823 solana_sha256_hasher::hash,
2824 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2825 std::{fs::remove_dir_all, num::NonZeroU64, thread, time::Duration},
2826 };
2827
2828 #[test]
2829 fn validator_exit() {
2830 solana_logger::setup();
2831 let leader_keypair = Keypair::new();
2832 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2833
2834 let validator_keypair = Keypair::new();
2835 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2836 let genesis_config =
2837 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2838 .genesis_config;
2839 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2840
2841 let voting_keypair = Arc::new(Keypair::new());
2842 let config = ValidatorConfig {
2843 rpc_addrs: Some((
2844 validator_node.info.rpc().unwrap(),
2845 validator_node.info.rpc_pubsub().unwrap(),
2846 )),
2847 ..ValidatorConfig::default_for_test()
2848 };
2849 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2850 let validator = Validator::new(
2851 validator_node,
2852 Arc::new(validator_keypair),
2853 &validator_ledger_path,
2854 &voting_keypair.pubkey(),
2855 Arc::new(RwLock::new(vec![voting_keypair])),
2856 vec![leader_node.info],
2857 &config,
2858 true, None, start_progress.clone(),
2861 SocketAddrSpace::Unspecified,
2862 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2863 Arc::new(RwLock::new(None)),
2864 )
2865 .expect("assume successful validator start");
2866 assert_eq!(
2867 *start_progress.read().unwrap(),
2868 ValidatorStartProgress::Running
2869 );
2870 validator.close();
2871 remove_dir_all(validator_ledger_path).unwrap();
2872 }
2873
2874 #[test]
2875 fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2876 solana_logger::setup();
2877
2878 let ledger_path = get_tmp_ledger_path_auto_delete!();
2879 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2880
2881 let mut validator_config = ValidatorConfig::default_for_test();
2882 let mut hard_forks = HardForks::default();
2883 let mut root_slot;
2884
2885 root_slot = 10;
2887 validator_config.wait_for_supermajority = Some(root_slot);
2888 assert_eq!(
2889 should_cleanup_blockstore_incorrect_shred_versions(
2890 &validator_config,
2891 &blockstore,
2892 root_slot,
2893 &hard_forks
2894 )
2895 .unwrap(),
2896 Some(root_slot + 1)
2897 );
2898
2899 root_slot = 15;
2902 assert_eq!(
2903 should_cleanup_blockstore_incorrect_shred_versions(
2904 &validator_config,
2905 &blockstore,
2906 root_slot,
2907 &hard_forks
2908 )
2909 .unwrap(),
2910 None,
2911 );
2912
2913 hard_forks.register(10);
2916 assert_eq!(
2917 should_cleanup_blockstore_incorrect_shred_versions(
2918 &validator_config,
2919 &blockstore,
2920 root_slot,
2921 &hard_forks
2922 )
2923 .unwrap(),
2924 None,
2925 );
2926
2927 let entries = entry::create_ticks(1, 0, Hash::default());
2929 for i in 20..35 {
2930 let shreds = blockstore::entries_to_test_shreds(
2931 &entries,
2932 i, i - 1, true, 1, );
2937 blockstore.insert_shreds(shreds, None, true).unwrap();
2938 }
2939
2940 assert_eq!(
2942 should_cleanup_blockstore_incorrect_shred_versions(
2943 &validator_config,
2944 &blockstore,
2945 root_slot,
2946 &hard_forks
2947 )
2948 .unwrap(),
2949 None,
2950 );
2951
2952 root_slot = 25;
2955 hard_forks.register(root_slot);
2956 validator_config.wait_for_supermajority = Some(root_slot);
2957 assert_eq!(
2958 should_cleanup_blockstore_incorrect_shred_versions(
2959 &validator_config,
2960 &blockstore,
2961 root_slot,
2962 &hard_forks
2963 )
2964 .unwrap(),
2965 Some(root_slot + 1),
2966 );
2967 validator_config.wait_for_supermajority = None;
2968 assert_eq!(
2969 should_cleanup_blockstore_incorrect_shred_versions(
2970 &validator_config,
2971 &blockstore,
2972 root_slot,
2973 &hard_forks
2974 )
2975 .unwrap(),
2976 Some(root_slot + 1),
2977 );
2978
2979 root_slot = 30;
2982 let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2983 assert_eq!(
2984 should_cleanup_blockstore_incorrect_shred_versions(
2985 &validator_config,
2986 &blockstore,
2987 root_slot,
2988 &hard_forks
2989 )
2990 .unwrap(),
2991 Some(latest_hard_fork + 1),
2992 );
2993
2994 blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2997 assert_eq!(
2998 should_cleanup_blockstore_incorrect_shred_versions(
2999 &validator_config,
3000 &blockstore,
3001 root_slot,
3002 &hard_forks
3003 )
3004 .unwrap(),
3005 None,
3006 );
3007 }
3008
3009 #[test]
3010 fn test_cleanup_blockstore_incorrect_shred_versions() {
3011 solana_logger::setup();
3012
3013 let validator_config = ValidatorConfig::default_for_test();
3014 let ledger_path = get_tmp_ledger_path_auto_delete!();
3015 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3016
3017 let entries = entry::create_ticks(1, 0, Hash::default());
3018 for i in 1..10 {
3019 let shreds = blockstore::entries_to_test_shreds(
3020 &entries,
3021 i, i - 1, true, 1, );
3026 blockstore.insert_shreds(shreds, None, true).unwrap();
3027 }
3028
3029 cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3031 assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3033 for i in 5..10 {
3034 assert!(blockstore
3035 .get_data_shreds_for_slot(i, 0)
3036 .unwrap()
3037 .is_empty());
3038 }
3039 }
3040
3041 #[test]
3042 fn validator_parallel_exit() {
3043 let leader_keypair = Keypair::new();
3044 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3045 let genesis_config =
3046 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3047 .genesis_config;
3048
3049 let mut ledger_paths = vec![];
3050 let mut validators: Vec<Validator> = (0..2)
3051 .map(|_| {
3052 let validator_keypair = Keypair::new();
3053 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3054 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3055 ledger_paths.push(validator_ledger_path.clone());
3056 let vote_account_keypair = Keypair::new();
3057 let config = ValidatorConfig {
3058 rpc_addrs: Some((
3059 validator_node.info.rpc().unwrap(),
3060 validator_node.info.rpc_pubsub().unwrap(),
3061 )),
3062 ..ValidatorConfig::default_for_test()
3063 };
3064 Validator::new(
3065 validator_node,
3066 Arc::new(validator_keypair),
3067 &validator_ledger_path,
3068 &vote_account_keypair.pubkey(),
3069 Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3070 vec![leader_node.info.clone()],
3071 &config,
3072 true, None, Arc::new(RwLock::new(ValidatorStartProgress::default())),
3075 SocketAddrSpace::Unspecified,
3076 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3077 Arc::new(RwLock::new(None)),
3078 )
3079 .expect("assume successful validator start")
3080 })
3081 .collect();
3082
3083 validators.iter_mut().for_each(|v| v.exit());
3085
3086 let (sender, receiver) = bounded(0);
3088 let _ = thread::spawn(move || {
3089 validators.into_iter().for_each(|validator| {
3090 validator.join();
3091 });
3092 sender.send(()).unwrap();
3093 });
3094
3095 let timeout = Duration::from_secs(60);
3096 if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3097 panic!("timeout for shutting down validators",);
3098 }
3099
3100 for path in ledger_paths {
3101 remove_dir_all(path).unwrap();
3102 }
3103 }
3104
3105 #[test]
3106 fn test_wait_for_supermajority() {
3107 solana_logger::setup();
3108 let node_keypair = Arc::new(Keypair::new());
3109 let cluster_info = ClusterInfo::new(
3110 ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3111 node_keypair,
3112 SocketAddrSpace::Unspecified,
3113 );
3114
3115 let (genesis_config, _mint_keypair) = create_genesis_config(1);
3116 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3117 let mut config = ValidatorConfig::default_for_test();
3118 let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3119 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3120
3121 assert!(!wait_for_supermajority(
3122 &config,
3123 None,
3124 &bank_forks,
3125 &cluster_info,
3126 rpc_override_health_check.clone(),
3127 &start_progress,
3128 )
3129 .unwrap());
3130
3131 config.wait_for_supermajority = Some(1);
3133 assert!(matches!(
3134 wait_for_supermajority(
3135 &config,
3136 None,
3137 &bank_forks,
3138 &cluster_info,
3139 rpc_override_health_check.clone(),
3140 &start_progress,
3141 ),
3142 Err(ValidatorError::NotEnoughLedgerData(_, _)),
3143 ));
3144
3145 let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3147 bank_forks.read().unwrap().root_bank(),
3148 &Pubkey::default(),
3149 1,
3150 ));
3151 config.wait_for_supermajority = Some(0);
3152 assert!(!wait_for_supermajority(
3153 &config,
3154 None,
3155 &bank_forks,
3156 &cluster_info,
3157 rpc_override_health_check.clone(),
3158 &start_progress,
3159 )
3160 .unwrap());
3161
3162 config.wait_for_supermajority = Some(1);
3164 config.expected_bank_hash = Some(hash(&[1]));
3165 assert!(matches!(
3166 wait_for_supermajority(
3167 &config,
3168 None,
3169 &bank_forks,
3170 &cluster_info,
3171 rpc_override_health_check,
3172 &start_progress,
3173 ),
3174 Err(ValidatorError::BankHashMismatch(_, _)),
3175 ));
3176 }
3177
3178 #[test]
3179 fn test_is_snapshot_config_valid() {
3180 fn new_snapshot_config(
3181 full_snapshot_archive_interval_slots: Slot,
3182 incremental_snapshot_archive_interval_slots: Slot,
3183 ) -> SnapshotConfig {
3184 SnapshotConfig {
3185 full_snapshot_archive_interval: SnapshotInterval::Slots(
3186 NonZeroU64::new(full_snapshot_archive_interval_slots).unwrap(),
3187 ),
3188 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3189 NonZeroU64::new(incremental_snapshot_archive_interval_slots).unwrap(),
3190 ),
3191 ..SnapshotConfig::default()
3192 }
3193 }
3194
3195 assert!(is_snapshot_config_valid(&SnapshotConfig::default()));
3197
3198 assert!(is_snapshot_config_valid(&SnapshotConfig {
3200 incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3201 ..SnapshotConfig::default()
3202 }));
3203
3204 assert!(!is_snapshot_config_valid(&SnapshotConfig {
3206 full_snapshot_archive_interval: SnapshotInterval::Disabled,
3207 ..SnapshotConfig::default()
3208 }));
3209
3210 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 200)));
3212 assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3213 assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3214 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3215
3216 assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3218 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3219 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3220
3221 assert!(is_snapshot_config_valid(&SnapshotConfig::new_disabled()));
3223 assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3224 assert!(is_snapshot_config_valid(&SnapshotConfig {
3225 full_snapshot_archive_interval: SnapshotInterval::Slots(NonZeroU64::new(37).unwrap()),
3226 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3227 NonZeroU64::new(41).unwrap()
3228 ),
3229 ..SnapshotConfig::new_load_only()
3230 }));
3231 assert!(is_snapshot_config_valid(&SnapshotConfig {
3232 full_snapshot_archive_interval: SnapshotInterval::Disabled,
3233 incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3234 ..SnapshotConfig::new_load_only()
3235 }));
3236 }
3237
3238 fn target_tick_duration() -> Duration {
3239 let target_tick_duration_us =
3247 solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3248 assert_eq!(target_tick_duration_us, 6250);
3249 Duration::from_micros(target_tick_duration_us)
3250 }
3251
3252 #[test]
3253 fn test_poh_speed() {
3254 solana_logger::setup();
3255 let poh_config = PohConfig {
3256 target_tick_duration: target_tick_duration(),
3257 hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3259 ..PohConfig::default()
3260 };
3261 let genesis_config = GenesisConfig {
3262 poh_config,
3263 ..GenesisConfig::default()
3264 };
3265 let bank = Bank::new_for_tests(&genesis_config);
3266 assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3267 }
3268
3269 #[test]
3270 fn test_poh_speed_no_hashes_per_tick() {
3271 solana_logger::setup();
3272 let poh_config = PohConfig {
3273 target_tick_duration: target_tick_duration(),
3274 hashes_per_tick: None,
3275 ..PohConfig::default()
3276 };
3277 let genesis_config = GenesisConfig {
3278 poh_config,
3279 ..GenesisConfig::default()
3280 };
3281 let bank = Bank::new_for_tests(&genesis_config);
3282 check_poh_speed(&bank, Some(10_000)).unwrap();
3283 }
3284}