1pub use solana_perf::report_target_features;
4use {
5 crate::{
6 accounts_hash_verifier::AccountsHashVerifier,
7 admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
8 banking_trace::{self, BankingTracer, TraceError},
9 cluster_info_vote_listener::VoteTracker,
10 completed_data_sets_service::CompletedDataSetsService,
11 consensus::{
12 reconcile_blockstore_roots_with_external_source,
13 tower_storage::{NullTowerStorage, TowerStorage},
14 ExternalRootSource, Tower,
15 },
16 repair::{
17 self,
18 quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
19 serve_repair::ServeRepair,
20 serve_repair_service::ServeRepairService,
21 },
22 sample_performance_service::SamplePerformanceService,
23 sigverify,
24 snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService},
25 stats_reporter_service::StatsReporterService,
26 system_monitor_service::{
27 verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
28 },
29 tpu::{ForwardingClientOption, Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
30 tvu::{Tvu, TvuConfig, TvuSockets},
31 },
32 anyhow::{anyhow, Context, Result},
33 crossbeam_channel::{bounded, unbounded, Receiver},
34 quinn::Endpoint,
35 solana_accounts_db::{
36 accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
37 accounts_update_notifier_interface::AccountsUpdateNotifier,
38 hardened_unpack::{
39 open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
40 },
41 utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
42 },
43 solana_client::connection_cache::{ConnectionCache, Protocol},
44 solana_clock::Slot,
45 solana_entry::poh::compute_hash_time,
46 solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
47 solana_genesis_config::{ClusterType, GenesisConfig},
48 solana_geyser_plugin_manager::{
49 geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
50 },
51 solana_gossip::{
52 cluster_info::{
53 ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
54 DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
55 },
56 contact_info::ContactInfo,
57 crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
58 gossip_service::GossipService,
59 },
60 solana_hard_forks::HardForks,
61 solana_hash::Hash,
62 solana_keypair::Keypair,
63 solana_ledger::{
64 bank_forks_utils,
65 blockstore::{
66 Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
67 MAX_REPLAY_WAKE_UP_SIGNALS,
68 },
69 blockstore_metric_report_service::BlockstoreMetricReportService,
70 blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
71 blockstore_processor::{self, TransactionStatusSender},
72 entry_notifier_interface::EntryNotifierArc,
73 entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
74 leader_schedule::FixedSchedule,
75 leader_schedule_cache::LeaderScheduleCache,
76 use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
77 },
78 solana_measure::measure::Measure,
79 solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
80 solana_poh::{
81 poh_recorder::PohRecorder,
82 poh_service::{self, PohService},
83 transaction_recorder::TransactionRecorder,
84 },
85 solana_pubkey::Pubkey,
86 solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
87 solana_rpc::{
88 max_slots::MaxSlots,
89 optimistically_confirmed_bank_tracker::{
90 BankNotificationSenderConfig, OptimisticallyConfirmedBank,
91 OptimisticallyConfirmedBankTracker,
92 },
93 rpc::JsonRpcConfig,
94 rpc_completed_slots_service::RpcCompletedSlotsService,
95 rpc_pubsub_service::{PubSubConfig, PubSubService},
96 rpc_service::{ClientOption, JsonRpcService, JsonRpcServiceConfig},
97 rpc_subscriptions::RpcSubscriptions,
98 transaction_notifier_interface::TransactionNotifierArc,
99 transaction_status_service::TransactionStatusService,
100 },
101 solana_runtime::{
102 accounts_background_service::{
103 AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
104 PrunedBanksRequestHandler, SnapshotRequestHandler,
105 },
106 bank::Bank,
107 bank_forks::BankForks,
108 commitment::BlockCommitmentCache,
109 prioritization_fee_cache::PrioritizationFeeCache,
110 runtime_config::RuntimeConfig,
111 snapshot_archive_info::SnapshotArchiveInfoGetter,
112 snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL},
113 snapshot_config::SnapshotConfig,
114 snapshot_controller::SnapshotController,
115 snapshot_hash::StartingSnapshotHashes,
116 snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
117 },
118 solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
119 solana_shred_version::compute_shred_version,
120 solana_signer::Signer,
121 solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
122 solana_time_utils::timestamp,
123 solana_tpu_client::tpu_client::{
124 DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
125 },
126 solana_turbine::{self, broadcast_stage::BroadcastStageType, xdp::XdpConfig},
127 solana_unified_scheduler_pool::DefaultSchedulerPool,
128 solana_validator_exit::Exit,
129 solana_vote_program::vote_state,
130 solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
131 std::{
132 borrow::Cow,
133 collections::{HashMap, HashSet},
134 net::SocketAddr,
135 num::NonZeroUsize,
136 path::{Path, PathBuf},
137 sync::{
138 atomic::{AtomicBool, AtomicU64, Ordering},
139 Arc, Mutex, RwLock,
140 },
141 thread::{sleep, Builder, JoinHandle},
142 time::{Duration, Instant},
143 },
144 strum::VariantNames,
145 strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
146 thiserror::Error,
147 tokio::runtime::Runtime as TokioRuntime,
148 tokio_util::sync::CancellationToken,
149};
150
151const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
152const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
153const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
157 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
158
159#[derive(
160 Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
161)]
162#[strum(serialize_all = "kebab-case")]
163pub enum BlockVerificationMethod {
164 BlockstoreProcessor,
165 #[default]
166 UnifiedScheduler,
167}
168
169impl BlockVerificationMethod {
170 pub const fn cli_names() -> &'static [&'static str] {
171 Self::VARIANTS
172 }
173
174 pub fn cli_message() -> &'static str {
175 "Switch transaction scheduling method for verifying ledger entries"
176 }
177}
178
179#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
180#[strum(serialize_all = "kebab-case")]
181pub enum BlockProductionMethod {
182 CentralScheduler,
183 #[default]
184 CentralSchedulerGreedy,
185}
186
187impl BlockProductionMethod {
188 pub const fn cli_names() -> &'static [&'static str] {
189 Self::VARIANTS
190 }
191
192 pub fn cli_message() -> &'static str {
193 "Switch transaction scheduling method for producing ledger entries"
194 }
195}
196
197#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
198#[strum(serialize_all = "kebab-case")]
199pub enum TransactionStructure {
200 #[default]
201 Sdk,
202 View,
203}
204
205impl TransactionStructure {
206 pub const fn cli_names() -> &'static [&'static str] {
207 Self::VARIANTS
208 }
209
210 pub fn cli_message() -> &'static str {
211 "Switch internal transaction structure/representation"
212 }
213}
214
215#[derive(Clone, Debug)]
217pub struct GeneratorConfig {
218 pub accounts_path: String,
219 pub starting_keypairs: Arc<Vec<Keypair>>,
220}
221
222pub struct ValidatorConfig {
223 pub halt_at_slot: Option<Slot>,
224 pub expected_genesis_hash: Option<Hash>,
225 pub expected_bank_hash: Option<Hash>,
226 pub expected_shred_version: Option<u16>,
227 pub voting_disabled: bool,
228 pub account_paths: Vec<PathBuf>,
229 pub account_snapshot_paths: Vec<PathBuf>,
230 pub rpc_config: JsonRpcConfig,
231 pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
233 pub geyser_plugin_always_enabled: bool,
234 pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, pub pubsub_config: PubSubConfig,
236 pub snapshot_config: SnapshotConfig,
237 pub max_ledger_shreds: Option<u64>,
238 pub blockstore_options: BlockstoreOptions,
239 pub broadcast_stage_type: BroadcastStageType,
240 pub turbine_disabled: Arc<AtomicBool>,
241 pub fixed_leader_schedule: Option<FixedSchedule>,
242 pub wait_for_supermajority: Option<Slot>,
243 pub new_hard_forks: Option<Vec<Slot>>,
244 pub known_validators: Option<HashSet<Pubkey>>, pub repair_validators: Option<HashSet<Pubkey>>, pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, pub gossip_validators: Option<HashSet<Pubkey>>, pub max_genesis_archive_unpacked_size: u64,
249 pub run_verification: bool,
252 pub require_tower: bool,
253 pub tower_storage: Arc<dyn TowerStorage>,
254 pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
255 pub contact_debug_interval: u64,
256 pub contact_save_interval: u64,
257 pub send_transaction_service_config: SendTransactionServiceConfig,
258 pub no_poh_speed_test: bool,
259 pub no_os_memory_stats_reporting: bool,
260 pub no_os_network_stats_reporting: bool,
261 pub no_os_cpu_stats_reporting: bool,
262 pub no_os_disk_stats_reporting: bool,
263 pub poh_pinned_cpu_core: usize,
264 pub poh_hashes_per_batch: u64,
265 pub process_ledger_before_services: bool,
266 pub accounts_db_config: Option<AccountsDbConfig>,
267 pub warp_slot: Option<Slot>,
268 pub accounts_db_test_hash_calculation: bool,
269 pub accounts_db_skip_shrink: bool,
270 pub accounts_db_force_initial_clean: bool,
271 pub tpu_coalesce: Duration,
272 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
273 pub validator_exit: Arc<RwLock<Exit>>,
274 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
275 pub no_wait_for_vote_to_start_leader: bool,
276 pub wait_to_vote_slot: Option<Slot>,
277 pub runtime_config: RuntimeConfig,
278 pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
279 pub block_verification_method: BlockVerificationMethod,
280 pub block_production_method: BlockProductionMethod,
281 pub transaction_struct: TransactionStructure,
282 pub enable_block_production_forwarding: bool,
283 pub generator_config: Option<GeneratorConfig>,
284 pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
285 pub wen_restart_proto_path: Option<PathBuf>,
286 pub wen_restart_coordinator: Option<Pubkey>,
287 pub unified_scheduler_handler_threads: Option<usize>,
288 pub ip_echo_server_threads: NonZeroUsize,
289 pub rayon_global_threads: NonZeroUsize,
290 pub replay_forks_threads: NonZeroUsize,
291 pub replay_transactions_threads: NonZeroUsize,
292 pub tvu_shred_sigverify_threads: NonZeroUsize,
293 pub delay_leader_block_for_pending_fork: bool,
294 pub use_tpu_client_next: bool,
295 pub retransmit_xdp: Option<XdpConfig>,
296}
297
298impl Default for ValidatorConfig {
299 fn default() -> Self {
300 Self {
301 halt_at_slot: None,
302 expected_genesis_hash: None,
303 expected_bank_hash: None,
304 expected_shred_version: None,
305 voting_disabled: false,
306 max_ledger_shreds: None,
307 blockstore_options: BlockstoreOptions::default(),
308 account_paths: Vec::new(),
309 account_snapshot_paths: Vec::new(),
310 rpc_config: JsonRpcConfig::default(),
311 on_start_geyser_plugin_config_files: None,
312 geyser_plugin_always_enabled: false,
313 rpc_addrs: None,
314 pubsub_config: PubSubConfig::default(),
315 snapshot_config: SnapshotConfig::new_load_only(),
316 broadcast_stage_type: BroadcastStageType::Standard,
317 turbine_disabled: Arc::<AtomicBool>::default(),
318 fixed_leader_schedule: None,
319 wait_for_supermajority: None,
320 new_hard_forks: None,
321 known_validators: None,
322 repair_validators: None,
323 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
324 gossip_validators: None,
325 max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
326 run_verification: true,
327 require_tower: false,
328 tower_storage: Arc::new(NullTowerStorage::default()),
329 debug_keys: None,
330 contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
331 contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
332 send_transaction_service_config: SendTransactionServiceConfig::default(),
333 no_poh_speed_test: true,
334 no_os_memory_stats_reporting: true,
335 no_os_network_stats_reporting: true,
336 no_os_cpu_stats_reporting: true,
337 no_os_disk_stats_reporting: true,
338 poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
339 poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
340 process_ledger_before_services: false,
341 warp_slot: None,
342 accounts_db_test_hash_calculation: false,
343 accounts_db_skip_shrink: false,
344 accounts_db_force_initial_clean: false,
345 tpu_coalesce: DEFAULT_TPU_COALESCE,
346 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
347 validator_exit: Arc::new(RwLock::new(Exit::default())),
348 validator_exit_backpressure: HashMap::default(),
349 no_wait_for_vote_to_start_leader: true,
350 accounts_db_config: None,
351 wait_to_vote_slot: None,
352 runtime_config: RuntimeConfig::default(),
353 banking_trace_dir_byte_limit: 0,
354 block_verification_method: BlockVerificationMethod::default(),
355 block_production_method: BlockProductionMethod::default(),
356 transaction_struct: TransactionStructure::default(),
357 enable_block_production_forwarding: false,
358 generator_config: None,
359 use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
360 wen_restart_proto_path: None,
361 wen_restart_coordinator: None,
362 unified_scheduler_handler_threads: None,
363 ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
364 rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
365 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
366 replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
367 tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
368 delay_leader_block_for_pending_fork: false,
369 use_tpu_client_next: true,
370 retransmit_xdp: None,
371 }
372 }
373}
374
375impl ValidatorConfig {
376 pub fn default_for_test() -> Self {
377 let max_thread_count =
378 NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");
379
380 Self {
381 accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
382 blockstore_options: BlockstoreOptions::default_for_tests(),
383 rpc_config: JsonRpcConfig::default_for_test(),
384 block_production_method: BlockProductionMethod::default(),
385 enable_block_production_forwarding: true, rayon_global_threads: max_thread_count,
387 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
388 replay_transactions_threads: max_thread_count,
389 tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
390 .expect("thread count is non-zero"),
391 ..Self::default()
392 }
393 }
394
395 pub fn enable_default_rpc_block_subscribe(&mut self) {
396 let pubsub_config = PubSubConfig {
397 enable_block_subscription: true,
398 ..PubSubConfig::default()
399 };
400 let rpc_config = JsonRpcConfig {
401 enable_rpc_transaction_history: true,
402 ..JsonRpcConfig::default_for_test()
403 };
404
405 self.pubsub_config = pubsub_config;
406 self.rpc_config = rpc_config;
407 }
408}
409
410#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
414pub enum ValidatorStartProgress {
415 Initializing, SearchingForRpcService,
417 DownloadingSnapshot {
418 slot: Slot,
419 rpc_addr: SocketAddr,
420 },
421 CleaningBlockStore,
422 CleaningAccounts,
423 LoadingLedger,
424 ProcessingLedger {
425 slot: Slot,
426 max_slot: Slot,
427 },
428 StartingServices,
429 Halted, WaitingForSupermajority {
431 slot: Slot,
432 gossip_stake_percent: u64,
433 },
434
435 Running,
438}
439
440impl Default for ValidatorStartProgress {
441 fn default() -> Self {
442 Self::Initializing
443 }
444}
445
446struct BlockstoreRootScan {
447 thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
448}
449
450impl BlockstoreRootScan {
451 fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
452 let thread = if config.rpc_addrs.is_some()
453 && config.rpc_config.enable_rpc_transaction_history
454 && config.rpc_config.rpc_scan_and_fix_roots
455 {
456 Some(
457 Builder::new()
458 .name("solBStoreRtScan".to_string())
459 .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
460 .unwrap(),
461 )
462 } else {
463 None
464 };
465 Self { thread }
466 }
467
468 fn join(self) {
469 if let Some(blockstore_root_scan) = self.thread {
470 if let Err(err) = blockstore_root_scan.join() {
471 warn!("blockstore_root_scan failed to join {:?}", err);
472 }
473 }
474 }
475}
476
477#[derive(Default)]
478struct TransactionHistoryServices {
479 transaction_status_sender: Option<TransactionStatusSender>,
480 transaction_status_service: Option<TransactionStatusService>,
481 max_complete_transaction_status_slot: Arc<AtomicU64>,
482}
483
484pub struct ValidatorTpuConfig {
486 pub use_quic: bool,
488 pub vote_use_quic: bool,
490 pub tpu_connection_pool_size: usize,
492 pub tpu_enable_udp: bool,
494 pub tpu_quic_server_config: QuicServerParams,
496 pub tpu_fwd_quic_server_config: QuicServerParams,
498 pub vote_quic_server_config: QuicServerParams,
500}
501
502impl ValidatorTpuConfig {
503 pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
506 let tpu_quic_server_config = QuicServerParams {
507 max_connections_per_ipaddr_per_min: 32,
508 coalesce_channel_size: 100_000, ..Default::default()
510 };
511
512 let tpu_fwd_quic_server_config = QuicServerParams {
513 max_connections_per_ipaddr_per_min: 32,
514 max_unstaked_connections: 0,
515 coalesce_channel_size: 100_000, ..Default::default()
517 };
518
519 let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
521
522 ValidatorTpuConfig {
523 use_quic: DEFAULT_TPU_USE_QUIC,
524 vote_use_quic: DEFAULT_VOTE_USE_QUIC,
525 tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
526 tpu_enable_udp,
527 tpu_quic_server_config,
528 tpu_fwd_quic_server_config,
529 vote_quic_server_config,
530 }
531 }
532}
533
534pub struct Validator {
535 validator_exit: Arc<RwLock<Exit>>,
536 json_rpc_service: Option<JsonRpcService>,
537 pubsub_service: Option<PubSubService>,
538 rpc_completed_slots_service: Option<JoinHandle<()>>,
539 optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
540 transaction_status_service: Option<TransactionStatusService>,
541 entry_notifier_service: Option<EntryNotifierService>,
542 system_monitor_service: Option<SystemMonitorService>,
543 sample_performance_service: Option<SamplePerformanceService>,
544 stats_reporter_service: StatsReporterService,
545 gossip_service: GossipService,
546 serve_repair_service: ServeRepairService,
547 completed_data_sets_service: Option<CompletedDataSetsService>,
548 snapshot_packager_service: Option<SnapshotPackagerService>,
549 poh_recorder: Arc<RwLock<PohRecorder>>,
550 poh_service: PohService,
551 tpu: Tpu,
552 tvu: Tvu,
553 ip_echo_server: Option<solana_net_utils::IpEchoServer>,
554 pub cluster_info: Arc<ClusterInfo>,
555 pub bank_forks: Arc<RwLock<BankForks>>,
556 pub blockstore: Arc<Blockstore>,
557 geyser_plugin_service: Option<GeyserPluginService>,
558 blockstore_metric_report_service: BlockstoreMetricReportService,
559 accounts_background_service: AccountsBackgroundService,
560 accounts_hash_verifier: AccountsHashVerifier,
561 turbine_quic_endpoint: Option<Endpoint>,
562 turbine_quic_endpoint_runtime: Option<TokioRuntime>,
563 turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
564 repair_quic_endpoints: Option<[Endpoint; 3]>,
565 repair_quic_endpoints_runtime: Option<TokioRuntime>,
566 repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
567 _tpu_client_next_runtime: Option<TokioRuntime>,
571}
572
573impl Validator {
574 #[allow(clippy::too_many_arguments)]
575 pub fn new(
576 mut node: Node,
577 identity_keypair: Arc<Keypair>,
578 ledger_path: &Path,
579 vote_account: &Pubkey,
580 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
581 cluster_entrypoints: Vec<ContactInfo>,
582 config: &ValidatorConfig,
583 should_check_duplicate_instance: bool,
584 rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
585 start_progress: Arc<RwLock<ValidatorStartProgress>>,
586 socket_addr_space: SocketAddrSpace,
587 tpu_config: ValidatorTpuConfig,
588 admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
589 ) -> Result<Self> {
590 #[cfg(debug_assertions)]
591 const DEBUG_ASSERTION_STATUS: &str = "enabled";
592 #[cfg(not(debug_assertions))]
593 const DEBUG_ASSERTION_STATUS: &str = "disabled";
594 info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
595
596 let ValidatorTpuConfig {
597 use_quic,
598 vote_use_quic,
599 tpu_connection_pool_size,
600 tpu_enable_udp,
601 tpu_quic_server_config,
602 tpu_fwd_quic_server_config,
603 vote_quic_server_config,
604 } = tpu_config;
605
606 let start_time = Instant::now();
607
608 if rayon::ThreadPoolBuilder::new()
612 .thread_name(|i| format!("solRayonGlob{i:02}"))
613 .num_threads(config.rayon_global_threads.get())
614 .build_global()
615 .is_err()
616 {
617 warn!("Rayon global thread pool already initialized");
618 }
619
620 let id = identity_keypair.pubkey();
621 assert_eq!(&id, node.info.pubkey());
622
623 info!("identity pubkey: {id}");
624 info!("vote account pubkey: {vote_account}");
625
626 if !config.no_os_network_stats_reporting {
627 verify_net_stats_access().map_err(|e| {
628 ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
629 })?;
630 }
631
632 let mut bank_notification_senders = Vec::new();
633
634 let exit = Arc::new(AtomicBool::new(false));
635
636 let geyser_plugin_config_files = config
637 .on_start_geyser_plugin_config_files
638 .as_ref()
639 .map(Cow::Borrowed)
640 .or_else(|| {
641 config
642 .geyser_plugin_always_enabled
643 .then_some(Cow::Owned(vec![]))
644 });
645 let geyser_plugin_service =
646 if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
647 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
648 bank_notification_senders.push(confirmed_bank_sender);
649 let rpc_to_plugin_manager_receiver_and_exit =
650 rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
651 Some(
652 GeyserPluginService::new_with_receiver(
653 confirmed_bank_receiver,
654 config.geyser_plugin_always_enabled,
655 geyser_plugin_config_files.as_ref(),
656 rpc_to_plugin_manager_receiver_and_exit,
657 )
658 .map_err(|err| {
659 ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
660 })?,
661 )
662 } else {
663 None
664 };
665
666 if config.voting_disabled {
667 warn!("voting disabled");
668 authorized_voter_keypairs.write().unwrap().clear();
669 } else {
670 for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
671 warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
672 }
673 }
674
675 for cluster_entrypoint in &cluster_entrypoints {
676 info!("entrypoint: {:?}", cluster_entrypoint);
677 }
678
679 if solana_perf::perf_libs::api().is_some() {
680 info!("Initializing sigverify, this could take a while...");
681 } else {
682 info!("Initializing sigverify...");
683 }
684 sigverify::init();
685 info!("Initializing sigverify done.");
686
687 if !ledger_path.is_dir() {
688 return Err(anyhow!(
689 "ledger directory does not exist or is not accessible: {ledger_path:?}"
690 ));
691 }
692 let genesis_config = load_genesis(config, ledger_path)?;
693 metrics_config_sanity_check(genesis_config.cluster_type)?;
694
695 info!("Cleaning accounts paths..");
696 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
697 let mut timer = Measure::start("clean_accounts_paths");
698 cleanup_accounts_paths(config);
699 timer.stop();
700 info!("Cleaning accounts paths done. {timer}");
701
702 snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
703 snapshot_utils::purge_old_bank_snapshots_at_startup(
704 &config.snapshot_config.bank_snapshots_dir,
705 );
706
707 info!("Cleaning orphaned account snapshot directories..");
708 let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
709 clean_orphaned_account_snapshot_dirs(
710 &config.snapshot_config.bank_snapshots_dir,
711 &config.account_snapshot_paths,
712 )
713 .context("failed to clean orphaned account snapshot directories")?;
714 timer.stop();
715 info!("Cleaning orphaned account snapshot directories done. {timer}");
716
717 let accounts_hash_cache_path = config
719 .accounts_db_config
720 .as_ref()
721 .and_then(|config| config.accounts_hash_cache_path.as_ref())
722 .map(PathBuf::as_path)
723 .unwrap_or(ledger_path);
724 let old_accounts_hash_cache_dirs = [
725 ledger_path.join("calculate_accounts_hash_cache"),
726 accounts_hash_cache_path.join("full"),
727 accounts_hash_cache_path.join("incremental"),
728 accounts_hash_cache_path.join("transient"),
729 ];
730 for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs {
731 if old_accounts_hash_cache_dir.exists() {
732 move_and_async_delete_path(old_accounts_hash_cache_dir);
733 }
734 }
735
736 let cancel_tpu_client_next = CancellationToken::new();
738 {
739 let exit = exit.clone();
740 config
741 .validator_exit
742 .write()
743 .unwrap()
744 .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
745 let cancel_tpu_client_next = cancel_tpu_client_next.clone();
746 config
747 .validator_exit
748 .write()
749 .unwrap()
750 .register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
751 }
752
753 let accounts_update_notifier = geyser_plugin_service
754 .as_ref()
755 .and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier());
756
757 let transaction_notifier = geyser_plugin_service
758 .as_ref()
759 .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());
760
761 let entry_notifier = geyser_plugin_service
762 .as_ref()
763 .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());
764
765 let block_metadata_notifier = geyser_plugin_service
766 .as_ref()
767 .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
768
769 let slot_status_notifier = geyser_plugin_service
770 .as_ref()
771 .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier());
772
773 info!(
774 "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
775 entry_notifier: {}",
776 accounts_update_notifier.is_some(),
777 transaction_notifier.is_some(),
778 entry_notifier.is_some()
779 );
780
781 let system_monitor_service = Some(SystemMonitorService::new(
782 exit.clone(),
783 SystemMonitorStatsReportConfig {
784 report_os_memory_stats: !config.no_os_memory_stats_reporting,
785 report_os_network_stats: !config.no_os_network_stats_reporting,
786 report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
787 report_os_disk_stats: !config.no_os_disk_stats_reporting,
788 },
789 ));
790
791 let (
792 bank_forks,
793 blockstore,
794 original_blockstore_root,
795 ledger_signal_receiver,
796 leader_schedule_cache,
797 starting_snapshot_hashes,
798 TransactionHistoryServices {
799 transaction_status_sender,
800 transaction_status_service,
801 max_complete_transaction_status_slot,
802 },
803 blockstore_process_options,
804 blockstore_root_scan,
805 pruned_banks_receiver,
806 entry_notifier_service,
807 ) = load_blockstore(
808 config,
809 ledger_path,
810 &genesis_config,
811 exit.clone(),
812 &start_progress,
813 accounts_update_notifier,
814 transaction_notifier,
815 entry_notifier,
816 )
817 .map_err(ValidatorError::Other)?;
818
819 if !config.no_poh_speed_test {
820 check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
821 }
822
823 let (root_slot, hard_forks) = {
824 let root_bank = bank_forks.read().unwrap().root_bank();
825 (root_bank.slot(), root_bank.hard_forks())
826 };
827 let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
828 info!(
829 "shred version: {shred_version}, hard forks: {:?}",
830 hard_forks
831 );
832
833 if let Some(expected_shred_version) = config.expected_shred_version {
834 if expected_shred_version != shred_version {
835 return Err(ValidatorError::ShredVersionMismatch {
836 actual: shred_version,
837 expected: expected_shred_version,
838 }
839 .into());
840 }
841 }
842
843 if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
844 config,
845 &blockstore,
846 root_slot,
847 &hard_forks,
848 )? {
849 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
850 cleanup_blockstore_incorrect_shred_versions(
851 &blockstore,
852 config,
853 start_slot,
854 shred_version,
855 )?;
856 } else {
857 info!("Skipping the blockstore check for shreds with incorrect version");
858 }
859
860 node.info.set_shred_version(shred_version);
861 node.info.set_wallclock(timestamp());
862 Self::print_node_info(&node);
863
864 let mut cluster_info = ClusterInfo::new(
865 node.info.clone(),
866 identity_keypair.clone(),
867 socket_addr_space,
868 );
869 cluster_info.set_contact_debug_interval(config.contact_debug_interval);
870 cluster_info.set_entrypoints(cluster_entrypoints);
871 cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
872 let cluster_info = Arc::new(cluster_info);
873
874 assert!(is_snapshot_config_valid(&config.snapshot_config));
875
876 let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
877 let snapshot_controller = Arc::new(SnapshotController::new(
878 snapshot_request_sender.clone(),
879 config.snapshot_config.clone(),
880 bank_forks.read().unwrap().root(),
881 ));
882
883 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
884 let snapshot_packager_service = if snapshot_controller
885 .snapshot_config()
886 .should_generate_snapshots()
887 {
888 let exit_backpressure = config
889 .validator_exit_backpressure
890 .get(SnapshotPackagerService::NAME)
891 .cloned();
892 let enable_gossip_push = true;
893 let snapshot_packager_service = SnapshotPackagerService::new(
894 pending_snapshot_packages.clone(),
895 starting_snapshot_hashes,
896 exit.clone(),
897 exit_backpressure,
898 cluster_info.clone(),
899 snapshot_controller.clone(),
900 enable_gossip_push,
901 );
902 Some(snapshot_packager_service)
903 } else {
904 None
905 };
906
907 let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
908 let accounts_hash_verifier = AccountsHashVerifier::new(
909 accounts_package_sender.clone(),
910 accounts_package_receiver,
911 pending_snapshot_packages,
912 exit.clone(),
913 snapshot_controller.clone(),
914 );
915 let snapshot_request_handler = SnapshotRequestHandler {
916 snapshot_controller: snapshot_controller.clone(),
917 snapshot_request_receiver,
918 accounts_package_sender,
919 };
920 let pruned_banks_request_handler = PrunedBanksRequestHandler {
921 pruned_banks_receiver,
922 };
923 let accounts_background_service = AccountsBackgroundService::new(
924 bank_forks.clone(),
925 exit.clone(),
926 AbsRequestHandlers {
927 snapshot_request_handler,
928 pruned_banks_request_handler,
929 },
930 config.accounts_db_test_hash_calculation,
931 );
932 info!(
933 "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}",
934 config.block_verification_method, config.block_production_method, config.transaction_struct
935 );
936
937 let (replay_vote_sender, replay_vote_receiver) = unbounded();
938
939 let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
942
943 let leader_schedule_cache = Arc::new(leader_schedule_cache);
944 let startup_verification_complete;
945 let (poh_recorder, entry_receiver) = {
946 let bank = &bank_forks.read().unwrap().working_bank();
947 startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
948 PohRecorder::new_with_clear_signal(
949 bank.tick_height(),
950 bank.last_blockhash(),
951 bank.clone(),
952 None,
953 bank.ticks_per_slot(),
954 config.delay_leader_block_for_pending_fork,
955 blockstore.clone(),
956 blockstore.get_new_shred_signal(0),
957 &leader_schedule_cache,
958 &genesis_config.poh_config,
959 exit.clone(),
960 )
961 };
962 let (record_sender, record_receiver) = unbounded();
963 let transaction_recorder =
964 TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
965 let poh_recorder = Arc::new(RwLock::new(poh_recorder));
966
967 let (banking_tracer, tracer_thread) =
968 BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
969 &blockstore.banking_trace_path(),
970 exit.clone(),
971 config.banking_trace_dir_byte_limit,
972 )))?;
973 if banking_tracer.is_enabled() {
974 info!(
975 "Enabled banking trace (dir_byte_limit: {})",
976 config.banking_trace_dir_byte_limit
977 );
978 } else {
979 info!("Disabled banking trace");
980 }
981 let banking_tracer_channels = banking_tracer.create_channels(false);
982
983 match &config.block_verification_method {
984 BlockVerificationMethod::BlockstoreProcessor => {
985 info!("no scheduler pool is installed for block verification...");
986 if let Some(count) = config.unified_scheduler_handler_threads {
987 warn!(
988 "--unified-scheduler-handler-threads={count} is ignored because unified \
989 scheduler isn't enabled"
990 );
991 }
992 }
993 BlockVerificationMethod::UnifiedScheduler => {
994 let scheduler_pool = DefaultSchedulerPool::new_dyn(
995 config.unified_scheduler_handler_threads,
996 config.runtime_config.log_messages_bytes_limit,
997 transaction_status_sender.clone(),
998 Some(replay_vote_sender.clone()),
999 prioritization_fee_cache.clone(),
1000 );
1001 bank_forks
1002 .write()
1003 .unwrap()
1004 .install_scheduler_pool(scheduler_pool);
1005 }
1006 }
1007
1008 let entry_notification_sender = entry_notifier_service
1009 .as_ref()
1010 .map(|service| service.sender());
1011 let mut process_blockstore = ProcessBlockStore::new(
1012 &id,
1013 vote_account,
1014 &start_progress,
1015 &blockstore,
1016 original_blockstore_root,
1017 &bank_forks,
1018 &leader_schedule_cache,
1019 &blockstore_process_options,
1020 transaction_status_sender.as_ref(),
1021 entry_notification_sender,
1022 blockstore_root_scan,
1023 &snapshot_controller,
1024 config,
1025 );
1026
1027 maybe_warp_slot(
1028 config,
1029 &mut process_blockstore,
1030 ledger_path,
1031 &bank_forks,
1032 &leader_schedule_cache,
1033 &snapshot_controller,
1034 )
1035 .map_err(ValidatorError::Other)?;
1036
1037 if config.process_ledger_before_services {
1038 process_blockstore
1039 .process()
1040 .map_err(ValidatorError::Other)?;
1041 }
1042 *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1043
1044 let sample_performance_service =
1045 if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1046 Some(SamplePerformanceService::new(
1047 &bank_forks,
1048 blockstore.clone(),
1049 exit.clone(),
1050 ))
1051 } else {
1052 None
1053 };
1054
1055 let mut block_commitment_cache = BlockCommitmentCache::default();
1056 let bank_forks_guard = bank_forks.read().unwrap();
1057 block_commitment_cache.initialize_slots(
1058 bank_forks_guard.working_bank().slot(),
1059 bank_forks_guard.root(),
1060 );
1061 drop(bank_forks_guard);
1062 let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1063
1064 let optimistically_confirmed_bank =
1065 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1066
1067 let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1068 exit.clone(),
1069 max_complete_transaction_status_slot.clone(),
1070 blockstore.clone(),
1071 bank_forks.clone(),
1072 block_commitment_cache.clone(),
1073 optimistically_confirmed_bank.clone(),
1074 &config.pubsub_config,
1075 None,
1076 ));
1077
1078 let max_slots = Arc::new(MaxSlots::default());
1079
1080 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1081
1082 let mut tpu_transactions_forwards_client =
1083 Some(node.sockets.tpu_transaction_forwarding_client);
1084
1085 let connection_cache = match (config.use_tpu_client_next, use_quic) {
1086 (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1087 "connection_cache_tpu_quic",
1088 tpu_connection_pool_size,
1089 Some(
1090 tpu_transactions_forwards_client
1091 .take()
1092 .expect("Socket should exist."),
1093 ),
1094 Some((
1095 &identity_keypair,
1096 node.info
1097 .tpu(Protocol::UDP)
1098 .ok_or_else(|| {
1099 ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1100 })?
1101 .ip(),
1102 )),
1103 Some((&staked_nodes, &identity_keypair.pubkey())),
1104 ))),
1105 (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1106 "connection_cache_tpu_udp",
1107 tpu_connection_pool_size,
1108 ))),
1109 (true, _) => None,
1110 };
1111
1112 let vote_connection_cache = if vote_use_quic {
1113 let vote_connection_cache = ConnectionCache::new_with_client_options(
1114 "connection_cache_vote_quic",
1115 tpu_connection_pool_size,
1116 Some(node.sockets.quic_vote_client),
1117 Some((
1118 &identity_keypair,
1119 node.info
1120 .tpu_vote(Protocol::QUIC)
1121 .ok_or_else(|| {
1122 ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1123 })?
1124 .ip(),
1125 )),
1126 Some((&staked_nodes, &identity_keypair.pubkey())),
1127 );
1128 Arc::new(vote_connection_cache)
1129 } else {
1130 Arc::new(ConnectionCache::with_udp(
1131 "connection_cache_vote_udp",
1132 tpu_connection_pool_size,
1133 ))
1134 };
1135
1136 let current_runtime_handle = tokio::runtime::Handle::try_current();
1142 let tpu_client_next_runtime =
1143 (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1144 tokio::runtime::Builder::new_multi_thread()
1145 .enable_all()
1146 .worker_threads(2)
1147 .thread_name("solTpuClientRt")
1148 .build()
1149 .unwrap()
1150 });
1151
1152 let rpc_override_health_check =
1153 Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1154 let (
1155 json_rpc_service,
1156 pubsub_service,
1157 completed_data_sets_sender,
1158 completed_data_sets_service,
1159 rpc_completed_slots_service,
1160 optimistically_confirmed_bank_tracker,
1161 bank_notification_sender,
1162 ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1163 assert_eq!(
1164 node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1165 node.info
1166 .rpc_pubsub()
1167 .map(|addr| socket_addr_space.check(&addr))
1168 );
1169 let (bank_notification_sender, bank_notification_receiver) = unbounded();
1170 let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1171 Some(Arc::new(RwLock::new(bank_notification_senders)))
1172 } else {
1173 None
1174 };
1175
1176 let client_option = if config.use_tpu_client_next {
1177 let runtime_handle = tpu_client_next_runtime
1178 .as_ref()
1179 .map(TokioRuntime::handle)
1180 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1181 ClientOption::TpuClientNext(
1182 Arc::as_ref(&identity_keypair),
1183 node.sockets.rpc_sts_client,
1184 runtime_handle.clone(),
1185 cancel_tpu_client_next.clone(),
1186 )
1187 } else {
1188 let Some(connection_cache) = &connection_cache else {
1189 panic!("ConnectionCache should exist by construction.");
1190 };
1191 ClientOption::ConnectionCache(connection_cache.clone())
1192 };
1193 let rpc_svc_config = JsonRpcServiceConfig {
1194 rpc_addr,
1195 rpc_config: config.rpc_config.clone(),
1196 snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1197 bank_forks: bank_forks.clone(),
1198 block_commitment_cache: block_commitment_cache.clone(),
1199 blockstore: blockstore.clone(),
1200 cluster_info: cluster_info.clone(),
1201 poh_recorder: Some(poh_recorder.clone()),
1202 genesis_hash: genesis_config.hash(),
1203 ledger_path: ledger_path.to_path_buf(),
1204 validator_exit: config.validator_exit.clone(),
1205 exit: exit.clone(),
1206 override_health_check: rpc_override_health_check.clone(),
1207 startup_verification_complete,
1208 optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1209 send_transaction_service_config: config.send_transaction_service_config.clone(),
1210 max_slots: max_slots.clone(),
1211 leader_schedule_cache: leader_schedule_cache.clone(),
1212 max_complete_transaction_status_slot,
1213 prioritization_fee_cache: prioritization_fee_cache.clone(),
1214 client_option,
1215 };
1216 let json_rpc_service =
1217 JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1218
1219 let pubsub_service = if !config.rpc_config.full_api {
1220 None
1221 } else {
1222 let (trigger, pubsub_service) = PubSubService::new(
1223 config.pubsub_config.clone(),
1224 &rpc_subscriptions,
1225 rpc_pubsub_addr,
1226 );
1227 config
1228 .validator_exit
1229 .write()
1230 .unwrap()
1231 .register_exit(Box::new(move || trigger.cancel()));
1232
1233 Some(pubsub_service)
1234 };
1235
1236 let (completed_data_sets_sender, completed_data_sets_service) =
1237 if !config.rpc_config.full_api {
1238 (None, None)
1239 } else {
1240 let (completed_data_sets_sender, completed_data_sets_receiver) =
1241 bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1242 let completed_data_sets_service = CompletedDataSetsService::new(
1243 completed_data_sets_receiver,
1244 blockstore.clone(),
1245 rpc_subscriptions.clone(),
1246 exit.clone(),
1247 max_slots.clone(),
1248 );
1249 (
1250 Some(completed_data_sets_sender),
1251 Some(completed_data_sets_service),
1252 )
1253 };
1254
1255 let rpc_completed_slots_service =
1256 if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1257 let (completed_slots_sender, completed_slots_receiver) =
1258 bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1259 blockstore.add_completed_slots_signal(completed_slots_sender);
1260
1261 Some(RpcCompletedSlotsService::spawn(
1262 completed_slots_receiver,
1263 rpc_subscriptions.clone(),
1264 slot_status_notifier.clone(),
1265 exit.clone(),
1266 ))
1267 } else {
1268 None
1269 };
1270
1271 let optimistically_confirmed_bank_tracker =
1272 Some(OptimisticallyConfirmedBankTracker::new(
1273 bank_notification_receiver,
1274 exit.clone(),
1275 bank_forks.clone(),
1276 optimistically_confirmed_bank,
1277 rpc_subscriptions.clone(),
1278 confirmed_bank_subscribers,
1279 prioritization_fee_cache.clone(),
1280 ));
1281 let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1282 sender: bank_notification_sender,
1283 should_send_parents: geyser_plugin_service.is_some(),
1284 });
1285 (
1286 Some(json_rpc_service),
1287 pubsub_service,
1288 completed_data_sets_sender,
1289 completed_data_sets_service,
1290 rpc_completed_slots_service,
1291 optimistically_confirmed_bank_tracker,
1292 bank_notification_sender_config,
1293 )
1294 } else {
1295 (None, None, None, None, None, None, None)
1296 };
1297
1298 if config.halt_at_slot.is_some() {
1299 block_commitment_cache
1302 .write()
1303 .unwrap()
1304 .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1305
1306 warn!("Validator halted");
1308 *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1309 std::thread::park();
1310 }
1311 let ip_echo_server = match node.sockets.ip_echo {
1312 None => None,
1313 Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1314 tcp_listener,
1315 config.ip_echo_server_threads,
1316 Some(node.info.shred_version()),
1317 )),
1318 };
1319
1320 let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1321
1322 let stats_reporter_service =
1323 StatsReporterService::new(stats_reporter_receiver, exit.clone());
1324
1325 let gossip_service = GossipService::new(
1326 &cluster_info,
1327 Some(bank_forks.clone()),
1328 node.sockets.gossip,
1329 config.gossip_validators.clone(),
1330 should_check_duplicate_instance,
1331 Some(stats_reporter_sender.clone()),
1332 exit.clone(),
1333 );
1334 let serve_repair = ServeRepair::new(
1335 cluster_info.clone(),
1336 bank_forks.clone(),
1337 config.repair_whitelist.clone(),
1338 );
1339 let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1340 let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1341 let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1342 unbounded();
1343
1344 let waited_for_supermajority = wait_for_supermajority(
1345 config,
1346 Some(&mut process_blockstore),
1347 &bank_forks,
1348 &cluster_info,
1349 rpc_override_health_check,
1350 &start_progress,
1351 )?;
1352
1353 let blockstore_metric_report_service =
1354 BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1355
1356 let wait_for_vote_to_start_leader =
1357 !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1358
1359 let poh_service = PohService::new(
1360 poh_recorder.clone(),
1361 &genesis_config.poh_config,
1362 exit.clone(),
1363 bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1364 config.poh_pinned_cpu_core,
1365 config.poh_hashes_per_batch,
1366 record_receiver,
1367 );
1368 assert_eq!(
1369 blockstore.get_new_shred_signals_len(),
1370 1,
1371 "New shred signal for the TVU should be the same as the clear bank signal."
1372 );
1373
1374 let vote_tracker = Arc::<VoteTracker>::default();
1375
1376 let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1377 let (verified_vote_sender, verified_vote_receiver) = unbounded();
1378 let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1379 let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1380
1381 let entry_notification_sender = entry_notifier_service
1382 .as_ref()
1383 .map(|service| service.sender_cloned());
1384
1385 let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1386 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1387 .then(|| {
1388 tokio::runtime::Builder::new_multi_thread()
1389 .enable_all()
1390 .thread_name("solTurbineQuic")
1391 .build()
1392 .unwrap()
1393 });
1394 let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1395 let (
1396 turbine_quic_endpoint,
1397 turbine_quic_endpoint_sender,
1398 turbine_quic_endpoint_join_handle,
1399 ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1400 let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1401 (None, sender, None)
1402 } else {
1403 solana_turbine::quic_endpoint::new_quic_endpoint(
1404 turbine_quic_endpoint_runtime
1405 .as_ref()
1406 .map(TokioRuntime::handle)
1407 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1408 &identity_keypair,
1409 node.sockets.tvu_quic,
1410 turbine_quic_endpoint_sender,
1411 bank_forks.clone(),
1412 )
1413 .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1414 .unwrap()
1415 };
1416
1417 let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1419 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1420 .then(|| {
1421 tokio::runtime::Builder::new_multi_thread()
1422 .enable_all()
1423 .thread_name("solRepairQuic")
1424 .build()
1425 .unwrap()
1426 });
1427 let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1428 if genesis_config.cluster_type == ClusterType::MainnetBeta {
1429 (None, RepairQuicAsyncSenders::new_dummy(), None)
1430 } else {
1431 let repair_quic_sockets = RepairQuicSockets {
1432 repair_server_quic_socket: node.sockets.serve_repair_quic,
1433 repair_client_quic_socket: node.sockets.repair_quic,
1434 ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1435 };
1436 let repair_quic_senders = RepairQuicSenders {
1437 repair_request_quic_sender: repair_request_quic_sender.clone(),
1438 repair_response_quic_sender,
1439 ancestor_hashes_response_quic_sender,
1440 };
1441 repair::quic_endpoint::new_quic_endpoints(
1442 repair_quic_endpoints_runtime
1443 .as_ref()
1444 .map(TokioRuntime::handle)
1445 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1446 &identity_keypair,
1447 repair_quic_sockets,
1448 repair_quic_senders,
1449 bank_forks.clone(),
1450 )
1451 .map(|(endpoints, senders, join_handle)| {
1452 (Some(endpoints), senders, Some(join_handle))
1453 })
1454 .unwrap()
1455 };
1456 let serve_repair_service = ServeRepairService::new(
1457 serve_repair,
1458 repair_request_quic_sender,
1461 repair_request_quic_receiver,
1462 repair_quic_async_senders.repair_response_quic_sender,
1463 blockstore.clone(),
1464 node.sockets.serve_repair,
1465 socket_addr_space,
1466 stats_reporter_sender,
1467 exit.clone(),
1468 );
1469
1470 let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1471 let wen_restart_repair_slots = if in_wen_restart {
1472 Some(Arc::new(RwLock::new(Vec::new())))
1473 } else {
1474 None
1475 };
1476 let tower = match process_blockstore.process_to_create_tower() {
1477 Ok(tower) => {
1478 info!("Tower state: {:?}", tower);
1479 tower
1480 }
1481 Err(e) => {
1482 warn!(
1483 "Unable to retrieve tower: {:?} creating default tower....",
1484 e
1485 );
1486 Tower::default()
1487 }
1488 };
1489 let last_vote = tower.last_vote();
1490
1491 let outstanding_repair_requests =
1492 Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1493 let cluster_slots =
1494 Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1495
1496 let connection_cache_for_warmup =
1498 if json_rpc_service.is_some() && connection_cache.is_some() {
1499 connection_cache.as_ref()
1500 } else {
1501 None
1502 };
1503
1504 let tvu = Tvu::new(
1505 vote_account,
1506 authorized_voter_keypairs,
1507 &bank_forks,
1508 &cluster_info,
1509 TvuSockets {
1510 repair: node.sockets.repair.try_clone().unwrap(),
1511 retransmit: node.sockets.retransmit_sockets,
1512 fetch: node.sockets.tvu,
1513 ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1514 },
1515 blockstore.clone(),
1516 ledger_signal_receiver,
1517 &rpc_subscriptions,
1518 &poh_recorder,
1519 tower,
1520 config.tower_storage.clone(),
1521 &leader_schedule_cache,
1522 exit.clone(),
1523 block_commitment_cache,
1524 config.turbine_disabled.clone(),
1525 transaction_status_sender.clone(),
1526 entry_notification_sender.clone(),
1527 vote_tracker.clone(),
1528 retransmit_slots_sender,
1529 gossip_verified_vote_hash_receiver,
1530 verified_vote_receiver,
1531 replay_vote_sender.clone(),
1532 completed_data_sets_sender,
1533 bank_notification_sender.clone(),
1534 duplicate_confirmed_slots_receiver,
1535 TvuConfig {
1536 max_ledger_shreds: config.max_ledger_shreds,
1537 shred_version: node.info.shred_version(),
1538 repair_validators: config.repair_validators.clone(),
1539 repair_whitelist: config.repair_whitelist.clone(),
1540 wait_for_vote_to_start_leader,
1541 replay_forks_threads: config.replay_forks_threads,
1542 replay_transactions_threads: config.replay_transactions_threads,
1543 shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1544 retransmit_xdp: config.retransmit_xdp.clone(),
1545 },
1546 &max_slots,
1547 block_metadata_notifier,
1548 config.wait_to_vote_slot,
1549 Some(snapshot_controller.clone()),
1550 config.runtime_config.log_messages_bytes_limit,
1551 connection_cache_for_warmup,
1552 &prioritization_fee_cache,
1553 banking_tracer.clone(),
1554 turbine_quic_endpoint_sender.clone(),
1555 turbine_quic_endpoint_receiver,
1556 repair_response_quic_receiver,
1557 repair_quic_async_senders.repair_request_quic_sender,
1558 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1559 ancestor_hashes_response_quic_receiver,
1560 outstanding_repair_requests.clone(),
1561 cluster_slots.clone(),
1562 wen_restart_repair_slots.clone(),
1563 slot_status_notifier,
1564 vote_connection_cache,
1565 )
1566 .map_err(ValidatorError::Other)?;
1567
1568 if in_wen_restart {
1569 info!("Waiting for wen_restart to finish");
1570 wait_for_wen_restart(WenRestartConfig {
1571 wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1572 wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1573 last_vote,
1574 blockstore: blockstore.clone(),
1575 cluster_info: cluster_info.clone(),
1576 bank_forks: bank_forks.clone(),
1577 wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1578 wait_for_supermajority_threshold_percent:
1579 WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1580 snapshot_controller: Some(snapshot_controller.clone()),
1581 abs_status: accounts_background_service.status().clone(),
1582 genesis_config_hash: genesis_config.hash(),
1583 exit: exit.clone(),
1584 })?;
1585 return Err(ValidatorError::WenRestartFinished.into());
1586 }
1587
1588 let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1589 let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1590 ForwardingClientOption::ConnectionCache(connection_cache.clone())
1591 } else {
1592 let runtime_handle = tpu_client_next_runtime
1593 .as_ref()
1594 .map(TokioRuntime::handle)
1595 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1596 ForwardingClientOption::TpuClientNext((
1597 Arc::as_ref(&identity_keypair),
1598 tpu_transactions_forwards_client
1599 .take()
1600 .expect("Socket should exist."),
1601 runtime_handle.clone(),
1602 cancel_tpu_client_next,
1603 ))
1604 };
1605 let tpu = Tpu::new_with_client(
1606 &cluster_info,
1607 &poh_recorder,
1608 transaction_recorder,
1609 entry_receiver,
1610 retransmit_slots_receiver,
1611 TpuSockets {
1612 transactions: node.sockets.tpu,
1613 transaction_forwards: node.sockets.tpu_forwards,
1614 vote: node.sockets.tpu_vote,
1615 broadcast: node.sockets.broadcast,
1616 transactions_quic: node.sockets.tpu_quic,
1617 transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1618 vote_quic: node.sockets.tpu_vote_quic,
1619 vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1620 },
1621 &rpc_subscriptions,
1622 transaction_status_sender,
1623 entry_notification_sender,
1624 blockstore.clone(),
1625 &config.broadcast_stage_type,
1626 exit,
1627 node.info.shred_version(),
1628 vote_tracker,
1629 bank_forks.clone(),
1630 verified_vote_sender,
1631 gossip_verified_vote_hash_sender,
1632 replay_vote_receiver,
1633 replay_vote_sender,
1634 bank_notification_sender.map(|sender| sender.sender),
1635 config.tpu_coalesce,
1636 duplicate_confirmed_slot_sender,
1637 forwarding_tpu_client,
1638 turbine_quic_endpoint_sender,
1639 &identity_keypair,
1640 config.runtime_config.log_messages_bytes_limit,
1641 &staked_nodes,
1642 config.staked_nodes_overrides.clone(),
1643 banking_tracer_channels,
1644 tracer_thread,
1645 tpu_enable_udp,
1646 tpu_quic_server_config,
1647 tpu_fwd_quic_server_config,
1648 vote_quic_server_config,
1649 &prioritization_fee_cache,
1650 config.block_production_method.clone(),
1651 config.transaction_struct.clone(),
1652 config.enable_block_production_forwarding,
1653 config.generator_config.clone(),
1654 key_notifiers.clone(),
1655 );
1656
1657 datapoint_info!(
1658 "validator-new",
1659 ("id", id.to_string(), String),
1660 ("version", solana_version::version!(), String),
1661 ("cluster_type", genesis_config.cluster_type as u32, i64),
1662 ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1663 ("waited_for_supermajority", waited_for_supermajority, bool),
1664 ("shred_version", shred_version as i64, i64),
1665 );
1666
1667 *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1668 if config.use_tpu_client_next {
1669 if let Some(json_rpc_service) = &json_rpc_service {
1670 key_notifiers.write().unwrap().add(
1671 KeyUpdaterType::RpcService,
1672 json_rpc_service.get_client_key_updater(),
1673 );
1674 }
1675 }
1678
1679 *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1680 bank_forks: bank_forks.clone(),
1681 cluster_info: cluster_info.clone(),
1682 vote_account: *vote_account,
1683 repair_whitelist: config.repair_whitelist.clone(),
1684 notifies: key_notifiers,
1685 repair_socket: Arc::new(node.sockets.repair),
1686 outstanding_repair_requests,
1687 cluster_slots,
1688 });
1689
1690 Ok(Self {
1691 stats_reporter_service,
1692 gossip_service,
1693 serve_repair_service,
1694 json_rpc_service,
1695 pubsub_service,
1696 rpc_completed_slots_service,
1697 optimistically_confirmed_bank_tracker,
1698 transaction_status_service,
1699 entry_notifier_service,
1700 system_monitor_service,
1701 sample_performance_service,
1702 snapshot_packager_service,
1703 completed_data_sets_service,
1704 tpu,
1705 tvu,
1706 poh_service,
1707 poh_recorder,
1708 ip_echo_server,
1709 validator_exit: config.validator_exit.clone(),
1710 cluster_info,
1711 bank_forks,
1712 blockstore,
1713 geyser_plugin_service,
1714 blockstore_metric_report_service,
1715 accounts_background_service,
1716 accounts_hash_verifier,
1717 turbine_quic_endpoint,
1718 turbine_quic_endpoint_runtime,
1719 turbine_quic_endpoint_join_handle,
1720 repair_quic_endpoints,
1721 repair_quic_endpoints_runtime,
1722 repair_quic_endpoints_join_handle,
1723 _tpu_client_next_runtime: tpu_client_next_runtime,
1724 })
1725 }
1726
1727 pub fn exit(&mut self) {
1729 self.validator_exit.write().unwrap().exit();
1730
1731 self.blockstore.drop_signal();
1733 }
1734
1735 pub fn close(mut self) {
1736 self.exit();
1737 self.join();
1738 }
1739
1740 fn print_node_info(node: &Node) {
1741 info!("{:?}", node.info);
1742 info!(
1743 "local gossip address: {}",
1744 node.sockets.gossip.local_addr().unwrap()
1745 );
1746 info!(
1747 "local broadcast address: {}",
1748 node.sockets
1749 .broadcast
1750 .first()
1751 .unwrap()
1752 .local_addr()
1753 .unwrap()
1754 );
1755 info!(
1756 "local repair address: {}",
1757 node.sockets.repair.local_addr().unwrap()
1758 );
1759 info!(
1760 "local retransmit address: {}",
1761 node.sockets.retransmit_sockets[0].local_addr().unwrap()
1762 );
1763 }
1764
1765 pub fn join(self) {
1766 drop(self.bank_forks);
1767 drop(self.cluster_info);
1768
1769 self.poh_service.join().expect("poh_service");
1770 drop(self.poh_recorder);
1771
1772 if let Some(json_rpc_service) = self.json_rpc_service {
1773 json_rpc_service.join().expect("rpc_service");
1774 }
1775
1776 if let Some(pubsub_service) = self.pubsub_service {
1777 pubsub_service.join().expect("pubsub_service");
1778 }
1779
1780 if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1781 rpc_completed_slots_service
1782 .join()
1783 .expect("rpc_completed_slots_service");
1784 }
1785
1786 if let Some(optimistically_confirmed_bank_tracker) =
1787 self.optimistically_confirmed_bank_tracker
1788 {
1789 optimistically_confirmed_bank_tracker
1790 .join()
1791 .expect("optimistically_confirmed_bank_tracker");
1792 }
1793
1794 if let Some(transaction_status_service) = self.transaction_status_service {
1795 transaction_status_service
1796 .join()
1797 .expect("transaction_status_service");
1798 }
1799
1800 if let Some(system_monitor_service) = self.system_monitor_service {
1801 system_monitor_service
1802 .join()
1803 .expect("system_monitor_service");
1804 }
1805
1806 if let Some(sample_performance_service) = self.sample_performance_service {
1807 sample_performance_service
1808 .join()
1809 .expect("sample_performance_service");
1810 }
1811
1812 if let Some(entry_notifier_service) = self.entry_notifier_service {
1813 entry_notifier_service
1814 .join()
1815 .expect("entry_notifier_service");
1816 }
1817
1818 if let Some(s) = self.snapshot_packager_service {
1819 s.join().expect("snapshot_packager_service");
1820 }
1821
1822 self.gossip_service.join().expect("gossip_service");
1823 self.repair_quic_endpoints
1824 .iter()
1825 .flatten()
1826 .for_each(repair::quic_endpoint::close_quic_endpoint);
1827 self.serve_repair_service
1828 .join()
1829 .expect("serve_repair_service");
1830 if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1831 self.repair_quic_endpoints_runtime
1832 .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1833 .transpose()
1834 .unwrap();
1835 }
1836 self.stats_reporter_service
1837 .join()
1838 .expect("stats_reporter_service");
1839 self.blockstore_metric_report_service
1840 .join()
1841 .expect("ledger_metric_report_service");
1842 self.accounts_background_service
1843 .join()
1844 .expect("accounts_background_service");
1845 self.accounts_hash_verifier
1846 .join()
1847 .expect("accounts_hash_verifier");
1848 if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1849 solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1850 }
1851 self.tpu.join().expect("tpu");
1852 self.tvu.join().expect("tvu");
1853 if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1854 self.turbine_quic_endpoint_runtime
1855 .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1856 .transpose()
1857 .unwrap();
1858 }
1859 if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1860 completed_data_sets_service
1861 .join()
1862 .expect("completed_data_sets_service");
1863 }
1864 if let Some(ip_echo_server) = self.ip_echo_server {
1865 ip_echo_server.shutdown_background();
1866 }
1867
1868 if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1869 geyser_plugin_service.join().expect("geyser_plugin_service");
1870 }
1871 }
1872}
1873
1874fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1875 if let Some(account) = &bank.get_account(vote_account) {
1876 if let Some(vote_state) = vote_state::from(account) {
1877 return !vote_state.votes.is_empty();
1878 }
1879 }
1880 false
1881}
1882
1883fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1884 let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1885 warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1886 return Ok(());
1887 };
1888
1889 let ticks_per_slot = bank.ticks_per_slot();
1890 let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1891 let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1892
1893 let hash_time = compute_hash_time(hash_samples);
1894 let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1895
1896 let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1897 let target_hashes_per_second =
1898 (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1899
1900 info!(
1901 "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1902 second {target_hashes_per_second}"
1903 );
1904 if my_hashes_per_second < target_hashes_per_second {
1905 return Err(ValidatorError::PohTooSlow {
1906 mine: my_hashes_per_second,
1907 target: target_hashes_per_second,
1908 });
1909 }
1910
1911 Ok(())
1912}
1913
1914fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1915 if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1917 if wait_slot_for_supermajority == root_slot {
1918 return Some(wait_slot_for_supermajority);
1919 }
1920 }
1921
1922 None
1923}
1924
1925fn post_process_restored_tower(
1926 restored_tower: crate::consensus::Result<Tower>,
1927 validator_identity: &Pubkey,
1928 vote_account: &Pubkey,
1929 config: &ValidatorConfig,
1930 bank_forks: &BankForks,
1931) -> Result<Tower, String> {
1932 let mut should_require_tower = config.require_tower;
1933
1934 let restored_tower = restored_tower.and_then(|tower| {
1935 let root_bank = bank_forks.root_bank();
1936 let slot_history = root_bank.get_slot_history();
1937 let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1939
1940 if let Some(hard_fork_restart_slot) =
1941 maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1942 {
1943 let message =
1947 format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1948 datapoint_error!("tower_error", ("error", message, String),);
1949 error!("{}", message);
1950
1951 should_require_tower = false;
1954 return Err(crate::consensus::TowerError::HardFork(
1955 hard_fork_restart_slot,
1956 ));
1957 }
1958
1959 if let Some(warp_slot) = config.warp_slot {
1960 should_require_tower = false;
1963 return Err(crate::consensus::TowerError::HardFork(warp_slot));
1964 }
1965
1966 tower
1967 });
1968
1969 let restored_tower = match restored_tower {
1970 Ok(tower) => tower,
1971 Err(err) => {
1972 let voting_has_been_active =
1973 active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1974 if !err.is_file_missing() {
1975 datapoint_error!(
1976 "tower_error",
1977 ("error", format!("Unable to restore tower: {err}"), String),
1978 );
1979 }
1980 if should_require_tower && voting_has_been_active {
1981 return Err(format!(
1982 "Requested mandatory tower restore failed: {err}. And there is an existing \
1983 vote_account containing actual votes. Aborting due to possible conflicting \
1984 duplicate votes"
1985 ));
1986 }
1987 if err.is_file_missing() && !voting_has_been_active {
1988 info!(
1990 "Ignoring expected failed tower restore because this is the initial validator \
1991 start with the vote account..."
1992 );
1993 } else {
1994 error!(
1995 "Rebuilding a new tower from the latest vote account due to failed tower \
1996 restore: {}",
1997 err
1998 );
1999 }
2000
2001 Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2002 }
2003 };
2004
2005 Ok(restored_tower)
2006}
2007
2008fn load_genesis(
2009 config: &ValidatorConfig,
2010 ledger_path: &Path,
2011) -> Result<GenesisConfig, ValidatorError> {
2012 let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2013 .map_err(ValidatorError::OpenGenesisConfig)?;
2014
2015 let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2018 let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2019 let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2020 assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2021
2022 let genesis_hash = genesis_config.hash();
2023 info!("genesis hash: {}", genesis_hash);
2024
2025 if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2026 if genesis_hash != expected_genesis_hash {
2027 return Err(ValidatorError::GenesisHashMismatch(
2028 genesis_hash,
2029 expected_genesis_hash,
2030 ));
2031 }
2032 }
2033
2034 Ok(genesis_config)
2035}
2036
2037#[allow(clippy::type_complexity)]
2038fn load_blockstore(
2039 config: &ValidatorConfig,
2040 ledger_path: &Path,
2041 genesis_config: &GenesisConfig,
2042 exit: Arc<AtomicBool>,
2043 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2044 accounts_update_notifier: Option<AccountsUpdateNotifier>,
2045 transaction_notifier: Option<TransactionNotifierArc>,
2046 entry_notifier: Option<EntryNotifierArc>,
2047) -> Result<
2048 (
2049 Arc<RwLock<BankForks>>,
2050 Arc<Blockstore>,
2051 Slot,
2052 Receiver<bool>,
2053 LeaderScheduleCache,
2054 Option<StartingSnapshotHashes>,
2055 TransactionHistoryServices,
2056 blockstore_processor::ProcessOptions,
2057 BlockstoreRootScan,
2058 DroppedSlotsReceiver,
2059 Option<EntryNotifierService>,
2060 ),
2061 String,
2062> {
2063 info!("loading ledger from {:?}...", ledger_path);
2064 *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2065
2066 let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2067 .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2068
2069 let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2070 blockstore.add_new_shred_signal(ledger_signal_sender);
2071
2072 let original_blockstore_root = blockstore.max_root();
2075
2076 let blockstore = Arc::new(blockstore);
2077 let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2078 let halt_at_slot = config
2079 .halt_at_slot
2080 .or_else(|| blockstore.highest_slot().unwrap_or(None));
2081
2082 let process_options = blockstore_processor::ProcessOptions {
2083 run_verification: config.run_verification,
2084 halt_at_slot,
2085 new_hard_forks: config.new_hard_forks.clone(),
2086 debug_keys: config.debug_keys.clone(),
2087 accounts_db_config: config.accounts_db_config.clone(),
2088 accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
2089 accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2090 accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2091 runtime_config: config.runtime_config.clone(),
2092 use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2093 ..blockstore_processor::ProcessOptions::default()
2094 };
2095
2096 let enable_rpc_transaction_history =
2097 config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2098 let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2099 let transaction_history_services =
2100 if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2101 initialize_rpc_transaction_history_services(
2102 blockstore.clone(),
2103 exit.clone(),
2104 enable_rpc_transaction_history,
2105 config.rpc_config.enable_extended_tx_metadata_storage,
2106 transaction_notifier,
2107 )
2108 } else {
2109 TransactionHistoryServices::default()
2110 };
2111
2112 let entry_notifier_service = entry_notifier
2113 .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2114
2115 let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2116 bank_forks_utils::load_bank_forks(
2117 genesis_config,
2118 &blockstore,
2119 config.account_paths.clone(),
2120 &config.snapshot_config,
2121 &process_options,
2122 transaction_history_services
2123 .transaction_status_sender
2124 .as_ref(),
2125 entry_notifier_service
2126 .as_ref()
2127 .map(|service| service.sender()),
2128 accounts_update_notifier,
2129 exit,
2130 )
2131 .map_err(|err| err.to_string())?;
2132
2133 let pruned_banks_receiver =
2139 AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2140
2141 leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2142
2143 Ok((
2144 bank_forks,
2145 blockstore,
2146 original_blockstore_root,
2147 ledger_signal_receiver,
2148 leader_schedule_cache,
2149 starting_snapshot_hashes,
2150 transaction_history_services,
2151 process_options,
2152 blockstore_root_scan,
2153 pruned_banks_receiver,
2154 entry_notifier_service,
2155 ))
2156}
2157
2158pub struct ProcessBlockStore<'a> {
2159 id: &'a Pubkey,
2160 vote_account: &'a Pubkey,
2161 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2162 blockstore: &'a Blockstore,
2163 original_blockstore_root: Slot,
2164 bank_forks: &'a Arc<RwLock<BankForks>>,
2165 leader_schedule_cache: &'a LeaderScheduleCache,
2166 process_options: &'a blockstore_processor::ProcessOptions,
2167 transaction_status_sender: Option<&'a TransactionStatusSender>,
2168 entry_notification_sender: Option<&'a EntryNotifierSender>,
2169 blockstore_root_scan: Option<BlockstoreRootScan>,
2170 snapshot_controller: &'a SnapshotController,
2171 config: &'a ValidatorConfig,
2172 tower: Option<Tower>,
2173}
2174
2175impl<'a> ProcessBlockStore<'a> {
2176 #[allow(clippy::too_many_arguments)]
2177 fn new(
2178 id: &'a Pubkey,
2179 vote_account: &'a Pubkey,
2180 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2181 blockstore: &'a Blockstore,
2182 original_blockstore_root: Slot,
2183 bank_forks: &'a Arc<RwLock<BankForks>>,
2184 leader_schedule_cache: &'a LeaderScheduleCache,
2185 process_options: &'a blockstore_processor::ProcessOptions,
2186 transaction_status_sender: Option<&'a TransactionStatusSender>,
2187 entry_notification_sender: Option<&'a EntryNotifierSender>,
2188 blockstore_root_scan: BlockstoreRootScan,
2189 snapshot_controller: &'a SnapshotController,
2190 config: &'a ValidatorConfig,
2191 ) -> Self {
2192 Self {
2193 id,
2194 vote_account,
2195 start_progress,
2196 blockstore,
2197 original_blockstore_root,
2198 bank_forks,
2199 leader_schedule_cache,
2200 process_options,
2201 transaction_status_sender,
2202 entry_notification_sender,
2203 blockstore_root_scan: Some(blockstore_root_scan),
2204 snapshot_controller,
2205 config,
2206 tower: None,
2207 }
2208 }
2209
2210 pub(crate) fn process(&mut self) -> Result<(), String> {
2211 if self.tower.is_none() {
2212 let previous_start_process = *self.start_progress.read().unwrap();
2213 *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2214
2215 let exit = Arc::new(AtomicBool::new(false));
2216 if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2217 let bank_forks = self.bank_forks.clone();
2218 let exit = exit.clone();
2219 let start_progress = self.start_progress.clone();
2220
2221 let _ = Builder::new()
2222 .name("solRptLdgrStat".to_string())
2223 .spawn(move || {
2224 while !exit.load(Ordering::Relaxed) {
2225 let slot = bank_forks.read().unwrap().working_bank().slot();
2226 *start_progress.write().unwrap() =
2227 ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2228 sleep(Duration::from_secs(2));
2229 }
2230 })
2231 .unwrap();
2232 }
2233 blockstore_processor::process_blockstore_from_root(
2234 self.blockstore,
2235 self.bank_forks,
2236 self.leader_schedule_cache,
2237 self.process_options,
2238 self.transaction_status_sender,
2239 self.entry_notification_sender,
2240 Some(self.snapshot_controller),
2241 )
2242 .map_err(|err| {
2243 exit.store(true, Ordering::Relaxed);
2244 format!("Failed to load ledger: {err:?}")
2245 })?;
2246 exit.store(true, Ordering::Relaxed);
2247
2248 if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2249 blockstore_root_scan.join();
2250 }
2251
2252 self.tower = Some({
2253 let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2254 if let Ok(tower) = &restored_tower {
2255 reconcile_blockstore_roots_with_external_source(
2257 ExternalRootSource::Tower(tower.root()),
2258 self.blockstore,
2259 &mut self.original_blockstore_root,
2260 )
2261 .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2262 }
2263
2264 post_process_restored_tower(
2265 restored_tower,
2266 self.id,
2267 self.vote_account,
2268 self.config,
2269 &self.bank_forks.read().unwrap(),
2270 )?
2271 });
2272
2273 if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2274 self.config,
2275 self.bank_forks.read().unwrap().root(),
2276 ) {
2277 reconcile_blockstore_roots_with_external_source(
2280 ExternalRootSource::HardFork(hard_fork_restart_slot),
2281 self.blockstore,
2282 &mut self.original_blockstore_root,
2283 )
2284 .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2285 }
2286
2287 *self.start_progress.write().unwrap() = previous_start_process;
2288 }
2289 Ok(())
2290 }
2291
2292 pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2293 self.process()?;
2294 Ok(self.tower.unwrap())
2295 }
2296}
2297
2298fn maybe_warp_slot(
2299 config: &ValidatorConfig,
2300 process_blockstore: &mut ProcessBlockStore,
2301 ledger_path: &Path,
2302 bank_forks: &RwLock<BankForks>,
2303 leader_schedule_cache: &LeaderScheduleCache,
2304 snapshot_controller: &SnapshotController,
2305) -> Result<(), String> {
2306 if let Some(warp_slot) = config.warp_slot {
2307 let mut bank_forks = bank_forks.write().unwrap();
2308
2309 let working_bank = bank_forks.working_bank();
2310
2311 if warp_slot <= working_bank.slot() {
2312 return Err(format!(
2313 "warp slot ({}) cannot be less than the working bank slot ({})",
2314 warp_slot,
2315 working_bank.slot()
2316 ));
2317 }
2318 info!("warping to slot {}", warp_slot);
2319
2320 let root_bank = bank_forks.root_bank();
2321
2322 root_bank.squash();
2326 root_bank.force_flush_accounts_cache();
2327
2328 bank_forks.insert(Bank::warp_from_parent(
2329 root_bank,
2330 &Pubkey::default(),
2331 warp_slot,
2332 solana_accounts_db::accounts_db::CalcAccountsHashDataSource::Storages,
2333 ));
2334 bank_forks
2335 .set_root(warp_slot, Some(snapshot_controller), Some(warp_slot))
2336 .map_err(|err| err.to_string())?;
2337 leader_schedule_cache.set_root(&bank_forks.root_bank());
2338
2339 let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2340 ledger_path,
2341 &bank_forks.root_bank(),
2342 None,
2343 &config.snapshot_config.full_snapshot_archives_dir,
2344 &config.snapshot_config.incremental_snapshot_archives_dir,
2345 config.snapshot_config.archive_format,
2346 ) {
2347 Ok(archive_info) => archive_info,
2348 Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2349 };
2350 info!(
2351 "created snapshot: {}",
2352 full_snapshot_archive_info.path().display()
2353 );
2354
2355 drop(bank_forks);
2356 process_blockstore.process()?;
2359 }
2360 Ok(())
2361}
2362
2363fn should_cleanup_blockstore_incorrect_shred_versions(
2366 config: &ValidatorConfig,
2367 blockstore: &Blockstore,
2368 root_slot: Slot,
2369 hard_forks: &HardForks,
2370) -> Result<Option<Slot>, BlockstoreError> {
2371 let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2373 if maybe_cluster_restart_slot.is_some() {
2374 return Ok(Some(root_slot + 1));
2375 }
2376
2377 let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2379 return Ok(None);
2380 };
2381
2382 let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2384 return Ok(None);
2385 };
2386 let blockstore_min_slot = blockstore.lowest_slot();
2387 info!(
2388 "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2389 latest hard fork is {latest_hard_fork}"
2390 );
2391
2392 if latest_hard_fork < blockstore_min_slot {
2393 Ok(None)
2401 } else if latest_hard_fork < blockstore_max_slot {
2402 Ok(Some(latest_hard_fork + 1))
2413 } else {
2414 Ok(None)
2422 }
2423}
2424
2425fn scan_blockstore_for_incorrect_shred_version(
2428 blockstore: &Blockstore,
2429 start_slot: Slot,
2430 expected_shred_version: u16,
2431) -> Result<Option<u16>, BlockstoreError> {
2432 const TIMEOUT: Duration = Duration::from_secs(60);
2433 let timer = Instant::now();
2434 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2436
2437 info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2438 for (slot, _meta) in slot_meta_iterator {
2439 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2440 for shred in &shreds {
2441 if shred.version() != expected_shred_version {
2442 return Ok(Some(shred.version()));
2443 }
2444 }
2445 if timer.elapsed() > TIMEOUT {
2446 info!("Didn't find incorrect shreds after 60 seconds, aborting");
2447 break;
2448 }
2449 }
2450 Ok(None)
2451}
2452
2453fn cleanup_blockstore_incorrect_shred_versions(
2456 blockstore: &Blockstore,
2457 config: &ValidatorConfig,
2458 start_slot: Slot,
2459 expected_shred_version: u16,
2460) -> Result<(), BlockstoreError> {
2461 let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2462 blockstore,
2463 start_slot,
2464 expected_shred_version,
2465 )?;
2466 let Some(incorrect_shred_version) = incorrect_shred_version else {
2467 info!("Only shreds with the correct version were found in the blockstore");
2468 return Ok(());
2469 };
2470
2471 let end_slot = blockstore.highest_slot()?.unwrap();
2473
2474 let backup_folder = format!(
2477 "{}_backup_{}_{}_{}",
2478 BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, incorrect_shred_version, start_slot, end_slot
2479 );
2480 match Blockstore::open_with_options(
2481 &blockstore.ledger_path().join(backup_folder),
2482 config.blockstore_options.clone(),
2483 ) {
2484 Ok(backup_blockstore) => {
2485 info!("Backing up slots from {start_slot} to {end_slot}");
2486 let mut timer = Measure::start("blockstore backup");
2487
2488 const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2489 let mut print_timer = Instant::now();
2490 let mut num_slots_copied = 0;
2491 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2492 for (slot, _meta) in slot_meta_iterator {
2493 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2494 let shreds = shreds.into_iter().map(Cow::Owned);
2495 let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2496 num_slots_copied += 1;
2497
2498 if print_timer.elapsed() > PRINT_INTERVAL {
2499 info!("Backed up {num_slots_copied} slots thus far");
2500 print_timer = Instant::now();
2501 }
2502 }
2503
2504 timer.stop();
2505 info!("Backing up slots done. {timer}");
2506 }
2507 Err(err) => {
2508 warn!("Unable to backup shreds with incorrect shred version: {err}");
2509 }
2510 }
2511
2512 info!("Purging slots {start_slot} to {end_slot} from blockstore");
2513 let mut timer = Measure::start("blockstore purge");
2514 blockstore.purge_from_next_slots(start_slot, end_slot);
2515 blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2516 timer.stop();
2517 info!("Purging slots done. {timer}");
2518
2519 Ok(())
2520}
2521
2522fn initialize_rpc_transaction_history_services(
2523 blockstore: Arc<Blockstore>,
2524 exit: Arc<AtomicBool>,
2525 enable_rpc_transaction_history: bool,
2526 enable_extended_tx_metadata_storage: bool,
2527 transaction_notifier: Option<TransactionNotifierArc>,
2528) -> TransactionHistoryServices {
2529 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2530 let (transaction_status_sender, transaction_status_receiver) = unbounded();
2531 let transaction_status_sender = Some(TransactionStatusSender {
2532 sender: transaction_status_sender,
2533 });
2534 let transaction_status_service = Some(TransactionStatusService::new(
2535 transaction_status_receiver,
2536 max_complete_transaction_status_slot.clone(),
2537 enable_rpc_transaction_history,
2538 transaction_notifier,
2539 blockstore.clone(),
2540 enable_extended_tx_metadata_storage,
2541 exit.clone(),
2542 ));
2543
2544 TransactionHistoryServices {
2545 transaction_status_sender,
2546 transaction_status_service,
2547 max_complete_transaction_status_slot,
2548 }
2549}
2550
2551#[derive(Error, Debug)]
2552pub enum ValidatorError {
2553 #[error("bank hash mismatch: actual={0}, expected={1}")]
2554 BankHashMismatch(Hash, Hash),
2555
2556 #[error("blockstore error: {0}")]
2557 Blockstore(#[source] BlockstoreError),
2558
2559 #[error("genesis hash mismatch: actual={0}, expected={1}")]
2560 GenesisHashMismatch(Hash, Hash),
2561
2562 #[error(
2563 "ledger does not have enough data to wait for supermajority: \
2564 current slot={0}, needed slot={1}"
2565 )]
2566 NotEnoughLedgerData(Slot, Slot),
2567
2568 #[error("failed to open genesis: {0}")]
2569 OpenGenesisConfig(#[source] OpenGenesisConfigError),
2570
2571 #[error("{0}")]
2572 Other(String),
2573
2574 #[error(
2575 "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2576 )]
2577 PohTooSlow { mine: u64, target: u64 },
2578
2579 #[error("shred version mismatch: actual {actual}, expected {expected}")]
2580 ShredVersionMismatch { actual: u16, expected: u16 },
2581
2582 #[error(transparent)]
2583 TraceError(#[from] TraceError),
2584
2585 #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2586 WenRestartFinished,
2587}
2588
2589fn wait_for_supermajority(
2596 config: &ValidatorConfig,
2597 process_blockstore: Option<&mut ProcessBlockStore>,
2598 bank_forks: &RwLock<BankForks>,
2599 cluster_info: &ClusterInfo,
2600 rpc_override_health_check: Arc<AtomicBool>,
2601 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2602) -> Result<bool, ValidatorError> {
2603 match config.wait_for_supermajority {
2604 None => Ok(false),
2605 Some(wait_for_supermajority_slot) => {
2606 if let Some(process_blockstore) = process_blockstore {
2607 process_blockstore
2608 .process()
2609 .map_err(ValidatorError::Other)?;
2610 }
2611
2612 let bank = bank_forks.read().unwrap().working_bank();
2613 match wait_for_supermajority_slot.cmp(&bank.slot()) {
2614 std::cmp::Ordering::Less => return Ok(false),
2615 std::cmp::Ordering::Greater => {
2616 return Err(ValidatorError::NotEnoughLedgerData(
2617 bank.slot(),
2618 wait_for_supermajority_slot,
2619 ));
2620 }
2621 _ => {}
2622 }
2623
2624 if let Some(expected_bank_hash) = config.expected_bank_hash {
2625 if bank.hash() != expected_bank_hash {
2626 return Err(ValidatorError::BankHashMismatch(
2627 bank.hash(),
2628 expected_bank_hash,
2629 ));
2630 }
2631 }
2632
2633 for i in 1.. {
2634 let logging = i % 10 == 1;
2635 if logging {
2636 info!(
2637 "Waiting for {}% of activated stake at slot {} to be in gossip...",
2638 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2639 bank.slot()
2640 );
2641 }
2642
2643 let gossip_stake_percent =
2644 get_stake_percent_in_gossip(&bank, cluster_info, logging);
2645
2646 *start_progress.write().unwrap() =
2647 ValidatorStartProgress::WaitingForSupermajority {
2648 slot: wait_for_supermajority_slot,
2649 gossip_stake_percent,
2650 };
2651
2652 if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2653 info!(
2654 "Supermajority reached, {}% active stake detected, starting up now.",
2655 gossip_stake_percent,
2656 );
2657 break;
2658 }
2659 rpc_override_health_check.store(true, Ordering::Relaxed);
2663 sleep(Duration::new(1, 0));
2664 }
2665 rpc_override_health_check.store(false, Ordering::Relaxed);
2666 Ok(true)
2667 }
2668 }
2669}
2670
2671fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2673 let mut online_stake = 0;
2674 let mut wrong_shred_stake = 0;
2675 let mut wrong_shred_nodes = vec![];
2676 let mut offline_stake = 0;
2677 let mut offline_nodes = vec![];
2678
2679 let mut total_activated_stake = 0;
2680 let now = timestamp();
2681 let peers: HashMap<_, _> = cluster_info
2685 .all_tvu_peers()
2686 .into_iter()
2687 .filter(|node| {
2688 let age = now.saturating_sub(node.wallclock());
2689 age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2691 })
2692 .map(|node| (*node.pubkey(), node))
2693 .collect();
2694 let my_shred_version = cluster_info.my_shred_version();
2695 let my_id = cluster_info.id();
2696
2697 for (activated_stake, vote_account) in bank.vote_accounts().values() {
2698 let activated_stake = *activated_stake;
2699 total_activated_stake += activated_stake;
2700
2701 if activated_stake == 0 {
2702 continue;
2703 }
2704 let vote_state_node_pubkey = *vote_account.node_pubkey();
2705
2706 if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2707 if peer.shred_version() == my_shred_version {
2708 trace!(
2709 "observed {} in gossip, (activated_stake={})",
2710 vote_state_node_pubkey,
2711 activated_stake
2712 );
2713 online_stake += activated_stake;
2714 } else {
2715 wrong_shred_stake += activated_stake;
2716 wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2717 }
2718 } else if vote_state_node_pubkey == my_id {
2719 online_stake += activated_stake; } else {
2721 offline_stake += activated_stake;
2722 offline_nodes.push((activated_stake, vote_state_node_pubkey));
2723 }
2724 }
2725
2726 let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2727 if log {
2728 info!(
2729 "{:.3}% of active stake visible in gossip",
2730 online_stake_percentage
2731 );
2732
2733 if !wrong_shred_nodes.is_empty() {
2734 info!(
2735 "{:.3}% of active stake has the wrong shred version in gossip",
2736 (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2737 );
2738 wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in wrong_shred_nodes {
2740 info!(
2741 " {:.3}% - {}",
2742 (stake as f64 / total_activated_stake as f64) * 100.,
2743 identity
2744 );
2745 }
2746 }
2747
2748 if !offline_nodes.is_empty() {
2749 info!(
2750 "{:.3}% of active stake is not visible in gossip",
2751 (offline_stake as f64 / total_activated_stake as f64) * 100.
2752 );
2753 offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in offline_nodes {
2755 info!(
2756 " {:.3}% - {}",
2757 (stake as f64 / total_activated_stake as f64) * 100.,
2758 identity
2759 );
2760 }
2761 }
2762 }
2763
2764 online_stake_percentage as u64
2765}
2766
2767fn cleanup_accounts_paths(config: &ValidatorConfig) {
2768 for account_path in &config.account_paths {
2769 move_and_async_delete_path_contents(account_path);
2770 }
2771 if let Some(shrink_paths) = config
2772 .accounts_db_config
2773 .as_ref()
2774 .and_then(|config| config.shrink_paths.as_ref())
2775 {
2776 for shrink_path in shrink_paths {
2777 move_and_async_delete_path_contents(shrink_path);
2778 }
2779 }
2780}
2781
2782pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2783 if !snapshot_config.should_generate_snapshots() {
2785 return true;
2786 }
2787
2788 let full_snapshot_interval_slots = snapshot_config.full_snapshot_archive_interval_slots;
2789 let incremental_snapshot_interval_slots =
2790 snapshot_config.incremental_snapshot_archive_interval_slots;
2791
2792 if incremental_snapshot_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
2793 true
2794 } else if incremental_snapshot_interval_slots == 0 {
2795 false
2796 } else {
2797 full_snapshot_interval_slots > incremental_snapshot_interval_slots
2798 }
2799}
2800
2801#[cfg(test)]
2802mod tests {
2803 use {
2804 super::*,
2805 crossbeam_channel::{bounded, RecvTimeoutError},
2806 solana_entry::entry,
2807 solana_genesis_config::create_genesis_config,
2808 solana_gossip::contact_info::ContactInfo,
2809 solana_ledger::{
2810 blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2811 get_tmp_ledger_path_auto_delete,
2812 },
2813 solana_poh_config::PohConfig,
2814 solana_sha256_hasher::hash,
2815 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2816 std::{fs::remove_dir_all, thread, time::Duration},
2817 };
2818
2819 #[test]
2820 fn validator_exit() {
2821 solana_logger::setup();
2822 let leader_keypair = Keypair::new();
2823 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2824
2825 let validator_keypair = Keypair::new();
2826 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2827 let genesis_config =
2828 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2829 .genesis_config;
2830 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2831
2832 let voting_keypair = Arc::new(Keypair::new());
2833 let config = ValidatorConfig {
2834 rpc_addrs: Some((
2835 validator_node.info.rpc().unwrap(),
2836 validator_node.info.rpc_pubsub().unwrap(),
2837 )),
2838 ..ValidatorConfig::default_for_test()
2839 };
2840 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2841 let validator = Validator::new(
2842 validator_node,
2843 Arc::new(validator_keypair),
2844 &validator_ledger_path,
2845 &voting_keypair.pubkey(),
2846 Arc::new(RwLock::new(vec![voting_keypair])),
2847 vec![leader_node.info],
2848 &config,
2849 true, None, start_progress.clone(),
2852 SocketAddrSpace::Unspecified,
2853 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2854 Arc::new(RwLock::new(None)),
2855 )
2856 .expect("assume successful validator start");
2857 assert_eq!(
2858 *start_progress.read().unwrap(),
2859 ValidatorStartProgress::Running
2860 );
2861 validator.close();
2862 remove_dir_all(validator_ledger_path).unwrap();
2863 }
2864
2865 #[test]
2866 fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2867 solana_logger::setup();
2868
2869 let ledger_path = get_tmp_ledger_path_auto_delete!();
2870 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2871
2872 let mut validator_config = ValidatorConfig::default_for_test();
2873 let mut hard_forks = HardForks::default();
2874 let mut root_slot;
2875
2876 root_slot = 10;
2878 validator_config.wait_for_supermajority = Some(root_slot);
2879 assert_eq!(
2880 should_cleanup_blockstore_incorrect_shred_versions(
2881 &validator_config,
2882 &blockstore,
2883 root_slot,
2884 &hard_forks
2885 )
2886 .unwrap(),
2887 Some(root_slot + 1)
2888 );
2889
2890 root_slot = 15;
2893 assert_eq!(
2894 should_cleanup_blockstore_incorrect_shred_versions(
2895 &validator_config,
2896 &blockstore,
2897 root_slot,
2898 &hard_forks
2899 )
2900 .unwrap(),
2901 None,
2902 );
2903
2904 hard_forks.register(10);
2907 assert_eq!(
2908 should_cleanup_blockstore_incorrect_shred_versions(
2909 &validator_config,
2910 &blockstore,
2911 root_slot,
2912 &hard_forks
2913 )
2914 .unwrap(),
2915 None,
2916 );
2917
2918 let entries = entry::create_ticks(1, 0, Hash::default());
2920 for i in 20..35 {
2921 let shreds = blockstore::entries_to_test_shreds(
2922 &entries,
2923 i, i - 1, true, 1, true, );
2929 blockstore.insert_shreds(shreds, None, true).unwrap();
2930 }
2931
2932 assert_eq!(
2934 should_cleanup_blockstore_incorrect_shred_versions(
2935 &validator_config,
2936 &blockstore,
2937 root_slot,
2938 &hard_forks
2939 )
2940 .unwrap(),
2941 None,
2942 );
2943
2944 root_slot = 25;
2947 hard_forks.register(root_slot);
2948 validator_config.wait_for_supermajority = Some(root_slot);
2949 assert_eq!(
2950 should_cleanup_blockstore_incorrect_shred_versions(
2951 &validator_config,
2952 &blockstore,
2953 root_slot,
2954 &hard_forks
2955 )
2956 .unwrap(),
2957 Some(root_slot + 1),
2958 );
2959 validator_config.wait_for_supermajority = None;
2960 assert_eq!(
2961 should_cleanup_blockstore_incorrect_shred_versions(
2962 &validator_config,
2963 &blockstore,
2964 root_slot,
2965 &hard_forks
2966 )
2967 .unwrap(),
2968 Some(root_slot + 1),
2969 );
2970
2971 root_slot = 30;
2974 let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2975 assert_eq!(
2976 should_cleanup_blockstore_incorrect_shred_versions(
2977 &validator_config,
2978 &blockstore,
2979 root_slot,
2980 &hard_forks
2981 )
2982 .unwrap(),
2983 Some(latest_hard_fork + 1),
2984 );
2985
2986 blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2989 assert_eq!(
2990 should_cleanup_blockstore_incorrect_shred_versions(
2991 &validator_config,
2992 &blockstore,
2993 root_slot,
2994 &hard_forks
2995 )
2996 .unwrap(),
2997 None,
2998 );
2999 }
3000
3001 #[test]
3002 fn test_cleanup_blockstore_incorrect_shred_versions() {
3003 solana_logger::setup();
3004
3005 let validator_config = ValidatorConfig::default_for_test();
3006 let ledger_path = get_tmp_ledger_path_auto_delete!();
3007 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3008
3009 let entries = entry::create_ticks(1, 0, Hash::default());
3010 for i in 1..10 {
3011 let shreds = blockstore::entries_to_test_shreds(
3012 &entries,
3013 i, i - 1, true, 1, true, );
3019 blockstore.insert_shreds(shreds, None, true).unwrap();
3020 }
3021
3022 cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3024 assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3026 for i in 5..10 {
3027 assert!(blockstore
3028 .get_data_shreds_for_slot(i, 0)
3029 .unwrap()
3030 .is_empty());
3031 }
3032 }
3033
3034 #[test]
3035 fn validator_parallel_exit() {
3036 let leader_keypair = Keypair::new();
3037 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3038 let genesis_config =
3039 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3040 .genesis_config;
3041
3042 let mut ledger_paths = vec![];
3043 let mut validators: Vec<Validator> = (0..2)
3044 .map(|_| {
3045 let validator_keypair = Keypair::new();
3046 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3047 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3048 ledger_paths.push(validator_ledger_path.clone());
3049 let vote_account_keypair = Keypair::new();
3050 let config = ValidatorConfig {
3051 rpc_addrs: Some((
3052 validator_node.info.rpc().unwrap(),
3053 validator_node.info.rpc_pubsub().unwrap(),
3054 )),
3055 ..ValidatorConfig::default_for_test()
3056 };
3057 Validator::new(
3058 validator_node,
3059 Arc::new(validator_keypair),
3060 &validator_ledger_path,
3061 &vote_account_keypair.pubkey(),
3062 Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3063 vec![leader_node.info.clone()],
3064 &config,
3065 true, None, Arc::new(RwLock::new(ValidatorStartProgress::default())),
3068 SocketAddrSpace::Unspecified,
3069 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3070 Arc::new(RwLock::new(None)),
3071 )
3072 .expect("assume successful validator start")
3073 })
3074 .collect();
3075
3076 validators.iter_mut().for_each(|v| v.exit());
3078
3079 let (sender, receiver) = bounded(0);
3081 let _ = thread::spawn(move || {
3082 validators.into_iter().for_each(|validator| {
3083 validator.join();
3084 });
3085 sender.send(()).unwrap();
3086 });
3087
3088 let timeout = Duration::from_secs(60);
3089 if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3090 panic!("timeout for shutting down validators",);
3091 }
3092
3093 for path in ledger_paths {
3094 remove_dir_all(path).unwrap();
3095 }
3096 }
3097
3098 #[test]
3099 fn test_wait_for_supermajority() {
3100 solana_logger::setup();
3101 let node_keypair = Arc::new(Keypair::new());
3102 let cluster_info = ClusterInfo::new(
3103 ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3104 node_keypair,
3105 SocketAddrSpace::Unspecified,
3106 );
3107
3108 let (genesis_config, _mint_keypair) = create_genesis_config(1);
3109 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3110 let mut config = ValidatorConfig::default_for_test();
3111 let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3112 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3113
3114 assert!(!wait_for_supermajority(
3115 &config,
3116 None,
3117 &bank_forks,
3118 &cluster_info,
3119 rpc_override_health_check.clone(),
3120 &start_progress,
3121 )
3122 .unwrap());
3123
3124 config.wait_for_supermajority = Some(1);
3126 assert!(matches!(
3127 wait_for_supermajority(
3128 &config,
3129 None,
3130 &bank_forks,
3131 &cluster_info,
3132 rpc_override_health_check.clone(),
3133 &start_progress,
3134 ),
3135 Err(ValidatorError::NotEnoughLedgerData(_, _)),
3136 ));
3137
3138 let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3140 bank_forks.read().unwrap().root_bank(),
3141 &Pubkey::default(),
3142 1,
3143 ));
3144 config.wait_for_supermajority = Some(0);
3145 assert!(!wait_for_supermajority(
3146 &config,
3147 None,
3148 &bank_forks,
3149 &cluster_info,
3150 rpc_override_health_check.clone(),
3151 &start_progress,
3152 )
3153 .unwrap());
3154
3155 config.wait_for_supermajority = Some(1);
3157 config.expected_bank_hash = Some(hash(&[1]));
3158 assert!(matches!(
3159 wait_for_supermajority(
3160 &config,
3161 None,
3162 &bank_forks,
3163 &cluster_info,
3164 rpc_override_health_check,
3165 &start_progress,
3166 ),
3167 Err(ValidatorError::BankHashMismatch(_, _)),
3168 ));
3169 }
3170
3171 #[test]
3172 fn test_interval_check() {
3173 fn new_snapshot_config(
3174 full_snapshot_archive_interval_slots: Slot,
3175 incremental_snapshot_archive_interval_slots: Slot,
3176 ) -> SnapshotConfig {
3177 SnapshotConfig {
3178 full_snapshot_archive_interval_slots,
3179 incremental_snapshot_archive_interval_slots,
3180 ..SnapshotConfig::default()
3181 }
3182 }
3183
3184 assert!(is_snapshot_config_valid(&new_snapshot_config(300, 200)));
3185
3186 assert!(is_snapshot_config_valid(&new_snapshot_config(
3187 snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3188 snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS
3189 )));
3190 assert!(is_snapshot_config_valid(&new_snapshot_config(
3191 snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3192 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3193 )));
3194 assert!(is_snapshot_config_valid(&new_snapshot_config(
3195 snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3196 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3197 )));
3198 assert!(is_snapshot_config_valid(&new_snapshot_config(
3199 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3200 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3201 )));
3202
3203 assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3207 assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3208 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3209
3210 assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 100)));
3211 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 0)));
3212 assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 0)));
3213 assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3214 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3215 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3216
3217 assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3218 assert!(is_snapshot_config_valid(&SnapshotConfig {
3219 full_snapshot_archive_interval_slots: 37,
3220 incremental_snapshot_archive_interval_slots: 41,
3221 ..SnapshotConfig::new_load_only()
3222 }));
3223 assert!(is_snapshot_config_valid(&SnapshotConfig {
3224 full_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3225 incremental_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3226 ..SnapshotConfig::new_load_only()
3227 }));
3228 }
3229
3230 fn target_tick_duration() -> Duration {
3231 let target_tick_duration_us =
3239 solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3240 assert_eq!(target_tick_duration_us, 6250);
3241 Duration::from_micros(target_tick_duration_us)
3242 }
3243
3244 #[test]
3245 fn test_poh_speed() {
3246 solana_logger::setup();
3247 let poh_config = PohConfig {
3248 target_tick_duration: target_tick_duration(),
3249 hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3251 ..PohConfig::default()
3252 };
3253 let genesis_config = GenesisConfig {
3254 poh_config,
3255 ..GenesisConfig::default()
3256 };
3257 let bank = Bank::new_for_tests(&genesis_config);
3258 assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3259 }
3260
3261 #[test]
3262 fn test_poh_speed_no_hashes_per_tick() {
3263 solana_logger::setup();
3264 let poh_config = PohConfig {
3265 target_tick_duration: target_tick_duration(),
3266 hashes_per_tick: None,
3267 ..PohConfig::default()
3268 };
3269 let genesis_config = GenesisConfig {
3270 poh_config,
3271 ..GenesisConfig::default()
3272 };
3273 let bank = Bank::new_for_tests(&genesis_config);
3274 check_poh_speed(&bank, Some(10_000)).unwrap();
3275 }
3276}