1pub use solana_perf::report_target_features;
4use {
5 crate::{
6 accounts_hash_verifier::AccountsHashVerifier,
7 admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
8 banking_trace::{self, BankingTracer, TraceError},
9 cluster_info_vote_listener::VoteTracker,
10 completed_data_sets_service::CompletedDataSetsService,
11 consensus::{
12 reconcile_blockstore_roots_with_external_source,
13 tower_storage::{NullTowerStorage, TowerStorage},
14 ExternalRootSource, Tower,
15 },
16 poh_timing_report_service::PohTimingReportService,
17 repair::{
18 self,
19 quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
20 serve_repair::ServeRepair,
21 serve_repair_service::ServeRepairService,
22 },
23 sample_performance_service::SamplePerformanceService,
24 sigverify,
25 snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService},
26 stats_reporter_service::StatsReporterService,
27 system_monitor_service::{
28 verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
29 },
30 tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
31 tvu::{Tvu, TvuConfig, TvuSockets},
32 },
33 agave_thread_manager::{ThreadManager, ThreadManagerConfig},
34 anyhow::{anyhow, Context, Result},
35 crossbeam_channel::{bounded, unbounded, Receiver},
36 lazy_static::lazy_static,
37 quinn::Endpoint,
38 solana_accounts_db::{
39 accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
40 accounts_update_notifier_interface::AccountsUpdateNotifier,
41 hardened_unpack::{
42 open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
43 },
44 utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
45 },
46 solana_client::connection_cache::{ConnectionCache, Protocol},
47 solana_entry::poh::compute_hash_time,
48 solana_geyser_plugin_manager::{
49 geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
50 },
51 solana_gossip::{
52 cluster_info::{
53 ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
54 DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
55 },
56 contact_info::ContactInfo,
57 crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
58 gossip_service::GossipService,
59 },
60 solana_ledger::{
61 bank_forks_utils,
62 blockstore::{
63 Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
64 MAX_REPLAY_WAKE_UP_SIGNALS,
65 },
66 blockstore_metric_report_service::BlockstoreMetricReportService,
67 blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
68 blockstore_processor::{self, TransactionStatusSender},
69 entry_notifier_interface::EntryNotifierArc,
70 entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
71 leader_schedule::FixedSchedule,
72 leader_schedule_cache::LeaderScheduleCache,
73 use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
74 },
75 solana_measure::measure::Measure,
76 solana_metrics::{
77 datapoint_info, metrics::metrics_config_sanity_check, poh_timing_point::PohTimingSender,
78 },
79 solana_poh::{
80 poh_recorder::PohRecorder,
81 poh_service::{self, PohService},
82 },
83 solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
84 solana_rpc::{
85 block_meta_service::{BlockMetaSender, BlockMetaService},
86 max_slots::MaxSlots,
87 optimistically_confirmed_bank_tracker::{
88 BankNotificationSenderConfig, OptimisticallyConfirmedBank,
89 OptimisticallyConfirmedBankTracker,
90 },
91 rpc::JsonRpcConfig,
92 rpc_completed_slots_service::RpcCompletedSlotsService,
93 rpc_pubsub_service::{PubSubConfig, PubSubService},
94 rpc_service::JsonRpcService,
95 rpc_subscriptions::RpcSubscriptions,
96 transaction_notifier_interface::TransactionNotifierArc,
97 transaction_status_service::TransactionStatusService,
98 },
99 solana_runtime::{
100 accounts_background_service::{
101 AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
102 PrunedBanksRequestHandler, SnapshotRequestHandler,
103 },
104 bank::Bank,
105 bank_forks::BankForks,
106 commitment::BlockCommitmentCache,
107 prioritization_fee_cache::PrioritizationFeeCache,
108 runtime_config::RuntimeConfig,
109 snapshot_archive_info::SnapshotArchiveInfoGetter,
110 snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL},
111 snapshot_config::SnapshotConfig,
112 snapshot_hash::StartingSnapshotHashes,
113 snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
114 },
115 solana_sdk::{
116 clock::Slot,
117 epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
118 exit::Exit,
119 genesis_config::{ClusterType, GenesisConfig},
120 hard_forks::HardForks,
121 hash::Hash,
122 pubkey::Pubkey,
123 shred_version::compute_shred_version,
124 signature::{Keypair, Signer},
125 timing::timestamp,
126 },
127 solana_send_transaction_service::send_transaction_service,
128 solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
129 solana_tpu_client::tpu_client::{
130 DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
131 },
132 solana_turbine::{self, broadcast_stage::BroadcastStageType},
133 solana_unified_scheduler_pool::DefaultSchedulerPool,
134 solana_vote_program::vote_state,
135 solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
136 std::{
137 borrow::Cow,
138 collections::{HashMap, HashSet},
139 net::SocketAddr,
140 num::NonZeroUsize,
141 path::{Path, PathBuf},
142 sync::{
143 atomic::{AtomicBool, AtomicU64, Ordering},
144 Arc, Mutex, RwLock,
145 },
146 thread::{sleep, Builder, JoinHandle},
147 time::{Duration, Instant},
148 },
149 strum::VariantNames,
150 strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
151 thiserror::Error,
152 tokio::runtime::Runtime as TokioRuntime,
153};
154
155const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
156const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
157const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
161 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
162
163#[derive(
164 Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
165)]
166#[strum(serialize_all = "kebab-case")]
167pub enum BlockVerificationMethod {
168 BlockstoreProcessor,
169 #[default]
170 UnifiedScheduler,
171}
172
173impl BlockVerificationMethod {
174 pub const fn cli_names() -> &'static [&'static str] {
175 Self::VARIANTS
176 }
177
178 pub fn cli_message() -> &'static str {
179 lazy_static! {
180 static ref MESSAGE: String = format!(
181 "Switch transaction scheduling method for verifying ledger entries [default: {}]",
182 BlockVerificationMethod::default()
183 );
184 };
185
186 &MESSAGE
187 }
188}
189
190#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
191#[strum(serialize_all = "kebab-case")]
192pub enum BlockProductionMethod {
193 #[default]
194 CentralScheduler,
195 CentralSchedulerGreedy,
196}
197
198impl BlockProductionMethod {
199 pub const fn cli_names() -> &'static [&'static str] {
200 Self::VARIANTS
201 }
202
203 pub fn cli_message() -> &'static str {
204 lazy_static! {
205 static ref MESSAGE: String = format!(
206 "Switch transaction scheduling method for producing ledger entries [default: {}]",
207 BlockProductionMethod::default()
208 );
209 };
210
211 &MESSAGE
212 }
213}
214
215#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
216#[strum(serialize_all = "kebab-case")]
217pub enum TransactionStructure {
218 #[default]
219 Sdk,
220 View,
221}
222
223impl TransactionStructure {
224 pub const fn cli_names() -> &'static [&'static str] {
225 Self::VARIANTS
226 }
227
228 pub fn cli_message() -> &'static str {
229 lazy_static! {
230 static ref MESSAGE: String = format!(
231 "Switch internal transaction structure/representation [default: {}]",
232 TransactionStructure::default()
233 );
234 };
235
236 &MESSAGE
237 }
238}
239
240#[derive(Clone, Debug)]
242pub struct GeneratorConfig {
243 pub accounts_path: String,
244 pub starting_keypairs: Arc<Vec<Keypair>>,
245}
246
247pub struct ValidatorConfig {
248 pub halt_at_slot: Option<Slot>,
249 pub expected_genesis_hash: Option<Hash>,
250 pub expected_bank_hash: Option<Hash>,
251 pub expected_shred_version: Option<u16>,
252 pub voting_disabled: bool,
253 pub account_paths: Vec<PathBuf>,
254 pub account_snapshot_paths: Vec<PathBuf>,
255 pub rpc_config: JsonRpcConfig,
256 pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
258 pub geyser_plugin_always_enabled: bool,
259 pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, pub pubsub_config: PubSubConfig,
261 pub snapshot_config: SnapshotConfig,
262 pub max_ledger_shreds: Option<u64>,
263 pub blockstore_options: BlockstoreOptions,
264 pub broadcast_stage_type: BroadcastStageType,
265 pub turbine_disabled: Arc<AtomicBool>,
266 pub fixed_leader_schedule: Option<FixedSchedule>,
267 pub wait_for_supermajority: Option<Slot>,
268 pub new_hard_forks: Option<Vec<Slot>>,
269 pub known_validators: Option<HashSet<Pubkey>>, pub repair_validators: Option<HashSet<Pubkey>>, pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, pub gossip_validators: Option<HashSet<Pubkey>>, pub accounts_hash_interval_slots: u64,
274 pub max_genesis_archive_unpacked_size: u64,
275 pub run_verification: bool,
278 pub require_tower: bool,
279 pub tower_storage: Arc<dyn TowerStorage>,
280 pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
281 pub contact_debug_interval: u64,
282 pub contact_save_interval: u64,
283 pub send_transaction_service_config: send_transaction_service::Config,
284 pub no_poh_speed_test: bool,
285 pub no_os_memory_stats_reporting: bool,
286 pub no_os_network_stats_reporting: bool,
287 pub no_os_cpu_stats_reporting: bool,
288 pub no_os_disk_stats_reporting: bool,
289 pub poh_pinned_cpu_core: usize,
290 pub poh_hashes_per_batch: u64,
291 pub process_ledger_before_services: bool,
292 pub accounts_db_config: Option<AccountsDbConfig>,
293 pub warp_slot: Option<Slot>,
294 pub accounts_db_test_hash_calculation: bool,
295 pub accounts_db_skip_shrink: bool,
296 pub accounts_db_force_initial_clean: bool,
297 pub tpu_coalesce: Duration,
298 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
299 pub validator_exit: Arc<RwLock<Exit>>,
300 pub no_wait_for_vote_to_start_leader: bool,
301 pub wait_to_vote_slot: Option<Slot>,
302 pub runtime_config: RuntimeConfig,
303 pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
304 pub block_verification_method: BlockVerificationMethod,
305 pub block_production_method: BlockProductionMethod,
306 pub transaction_struct: TransactionStructure,
307 pub enable_block_production_forwarding: bool,
308 pub generator_config: Option<GeneratorConfig>,
309 pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
310 pub wen_restart_proto_path: Option<PathBuf>,
311 pub wen_restart_coordinator: Option<Pubkey>,
312 pub unified_scheduler_handler_threads: Option<usize>,
313 pub ip_echo_server_threads: NonZeroUsize,
314 pub rayon_global_threads: NonZeroUsize,
315 pub replay_forks_threads: NonZeroUsize,
316 pub replay_transactions_threads: NonZeroUsize,
317 pub tvu_shred_sigverify_threads: NonZeroUsize,
318 pub thread_manager_config: ThreadManagerConfig,
319 pub delay_leader_block_for_pending_fork: bool,
320}
321
322impl Default for ValidatorConfig {
323 fn default() -> Self {
324 Self {
325 halt_at_slot: None,
326 expected_genesis_hash: None,
327 expected_bank_hash: None,
328 expected_shred_version: None,
329 voting_disabled: false,
330 max_ledger_shreds: None,
331 blockstore_options: BlockstoreOptions::default(),
332 account_paths: Vec::new(),
333 account_snapshot_paths: Vec::new(),
334 rpc_config: JsonRpcConfig::default(),
335 on_start_geyser_plugin_config_files: None,
336 geyser_plugin_always_enabled: false,
337 rpc_addrs: None,
338 pubsub_config: PubSubConfig::default(),
339 snapshot_config: SnapshotConfig::new_load_only(),
340 broadcast_stage_type: BroadcastStageType::Standard,
341 turbine_disabled: Arc::<AtomicBool>::default(),
342 fixed_leader_schedule: None,
343 wait_for_supermajority: None,
344 new_hard_forks: None,
345 known_validators: None,
346 repair_validators: None,
347 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
348 gossip_validators: None,
349 accounts_hash_interval_slots: u64::MAX,
350 max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
351 run_verification: true,
352 require_tower: false,
353 tower_storage: Arc::new(NullTowerStorage::default()),
354 debug_keys: None,
355 contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
356 contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
357 send_transaction_service_config: send_transaction_service::Config::default(),
358 no_poh_speed_test: true,
359 no_os_memory_stats_reporting: true,
360 no_os_network_stats_reporting: true,
361 no_os_cpu_stats_reporting: true,
362 no_os_disk_stats_reporting: true,
363 poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
364 poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
365 process_ledger_before_services: false,
366 warp_slot: None,
367 accounts_db_test_hash_calculation: false,
368 accounts_db_skip_shrink: false,
369 accounts_db_force_initial_clean: false,
370 tpu_coalesce: DEFAULT_TPU_COALESCE,
371 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
372 validator_exit: Arc::new(RwLock::new(Exit::default())),
373 no_wait_for_vote_to_start_leader: true,
374 accounts_db_config: None,
375 wait_to_vote_slot: None,
376 runtime_config: RuntimeConfig::default(),
377 banking_trace_dir_byte_limit: 0,
378 block_verification_method: BlockVerificationMethod::default(),
379 block_production_method: BlockProductionMethod::default(),
380 transaction_struct: TransactionStructure::default(),
381 enable_block_production_forwarding: false,
382 generator_config: None,
383 use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
384 wen_restart_proto_path: None,
385 wen_restart_coordinator: None,
386 unified_scheduler_handler_threads: None,
387 ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
388 rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
389 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
390 replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
391 thread_manager_config: ThreadManagerConfig::default_for_agave(),
392 tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
393 delay_leader_block_for_pending_fork: false,
394 }
395 }
396}
397
398impl ValidatorConfig {
399 pub fn default_for_test() -> Self {
400 let max_thread_count =
401 NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");
402
403 Self {
404 accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
405 blockstore_options: BlockstoreOptions::default_for_tests(),
406 rpc_config: JsonRpcConfig::default_for_test(),
407 block_production_method: BlockProductionMethod::default(),
408 enable_block_production_forwarding: true, rayon_global_threads: max_thread_count,
410 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
411 replay_transactions_threads: max_thread_count,
412 tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
413 .expect("thread count is non-zero"),
414 ..Self::default()
415 }
416 }
417
418 pub fn enable_default_rpc_block_subscribe(&mut self) {
419 let pubsub_config = PubSubConfig {
420 enable_block_subscription: true,
421 ..PubSubConfig::default()
422 };
423 let rpc_config = JsonRpcConfig {
424 enable_rpc_transaction_history: true,
425 ..JsonRpcConfig::default_for_test()
426 };
427
428 self.pubsub_config = pubsub_config;
429 self.rpc_config = rpc_config;
430 }
431}
432
433#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
437pub enum ValidatorStartProgress {
438 Initializing, SearchingForRpcService,
440 DownloadingSnapshot {
441 slot: Slot,
442 rpc_addr: SocketAddr,
443 },
444 CleaningBlockStore,
445 CleaningAccounts,
446 LoadingLedger,
447 ProcessingLedger {
448 slot: Slot,
449 max_slot: Slot,
450 },
451 StartingServices,
452 Halted, WaitingForSupermajority {
454 slot: Slot,
455 gossip_stake_percent: u64,
456 },
457
458 Running,
461}
462
463impl Default for ValidatorStartProgress {
464 fn default() -> Self {
465 Self::Initializing
466 }
467}
468
469struct BlockstoreRootScan {
470 thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
471}
472
473impl BlockstoreRootScan {
474 fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
475 let thread = if config.rpc_addrs.is_some()
476 && config.rpc_config.enable_rpc_transaction_history
477 && config.rpc_config.rpc_scan_and_fix_roots
478 {
479 Some(
480 Builder::new()
481 .name("solBStoreRtScan".to_string())
482 .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
483 .unwrap(),
484 )
485 } else {
486 None
487 };
488 Self { thread }
489 }
490
491 fn join(self) {
492 if let Some(blockstore_root_scan) = self.thread {
493 if let Err(err) = blockstore_root_scan.join() {
494 warn!("blockstore_root_scan failed to join {:?}", err);
495 }
496 }
497 }
498}
499
500#[derive(Default)]
501struct TransactionHistoryServices {
502 transaction_status_sender: Option<TransactionStatusSender>,
503 transaction_status_service: Option<TransactionStatusService>,
504 max_complete_transaction_status_slot: Arc<AtomicU64>,
505 max_complete_rewards_slot: Arc<AtomicU64>,
506 block_meta_sender: Option<BlockMetaSender>,
507 block_meta_service: Option<BlockMetaService>,
508}
509
510pub struct ValidatorTpuConfig {
512 pub use_quic: bool,
514 pub vote_use_quic: bool,
516 pub tpu_connection_pool_size: usize,
518 pub tpu_enable_udp: bool,
520 pub tpu_quic_server_config: QuicServerParams,
522 pub tpu_fwd_quic_server_config: QuicServerParams,
524 pub vote_quic_server_config: QuicServerParams,
526}
527
528impl ValidatorTpuConfig {
529 pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
532 let tpu_quic_server_config = QuicServerParams {
533 max_connections_per_ipaddr_per_min: 32,
534 coalesce_channel_size: 100_000, ..Default::default()
536 };
537
538 let tpu_fwd_quic_server_config = QuicServerParams {
539 max_connections_per_ipaddr_per_min: 32,
540 max_unstaked_connections: 0,
541 coalesce_channel_size: 100_000, ..Default::default()
543 };
544
545 let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
547
548 ValidatorTpuConfig {
549 use_quic: DEFAULT_TPU_USE_QUIC,
550 vote_use_quic: DEFAULT_VOTE_USE_QUIC,
551 tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
552 tpu_enable_udp,
553 tpu_quic_server_config,
554 tpu_fwd_quic_server_config,
555 vote_quic_server_config,
556 }
557 }
558}
559
560pub struct Validator {
561 validator_exit: Arc<RwLock<Exit>>,
562 json_rpc_service: Option<JsonRpcService>,
563 pubsub_service: Option<PubSubService>,
564 rpc_completed_slots_service: Option<JoinHandle<()>>,
565 optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
566 transaction_status_service: Option<TransactionStatusService>,
567 block_meta_service: Option<BlockMetaService>,
568 entry_notifier_service: Option<EntryNotifierService>,
569 system_monitor_service: Option<SystemMonitorService>,
570 sample_performance_service: Option<SamplePerformanceService>,
571 poh_timing_report_service: PohTimingReportService,
572 stats_reporter_service: StatsReporterService,
573 gossip_service: GossipService,
574 serve_repair_service: ServeRepairService,
575 completed_data_sets_service: Option<CompletedDataSetsService>,
576 snapshot_packager_service: Option<SnapshotPackagerService>,
577 poh_recorder: Arc<RwLock<PohRecorder>>,
578 poh_service: PohService,
579 tpu: Tpu,
580 tvu: Tvu,
581 ip_echo_server: Option<solana_net_utils::IpEchoServer>,
582 pub cluster_info: Arc<ClusterInfo>,
583 pub bank_forks: Arc<RwLock<BankForks>>,
584 pub blockstore: Arc<Blockstore>,
585 geyser_plugin_service: Option<GeyserPluginService>,
586 blockstore_metric_report_service: BlockstoreMetricReportService,
587 accounts_background_service: AccountsBackgroundService,
588 accounts_hash_verifier: AccountsHashVerifier,
589 turbine_quic_endpoint: Option<Endpoint>,
590 turbine_quic_endpoint_runtime: Option<TokioRuntime>,
591 turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
592 repair_quic_endpoints: Option<[Endpoint; 3]>,
593 repair_quic_endpoints_runtime: Option<TokioRuntime>,
594 repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
595 thread_manager: ThreadManager,
596}
597
598impl Validator {
599 #[allow(clippy::too_many_arguments)]
600 pub fn new(
601 mut node: Node,
602 identity_keypair: Arc<Keypair>,
603 ledger_path: &Path,
604 vote_account: &Pubkey,
605 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
606 cluster_entrypoints: Vec<ContactInfo>,
607 config: &ValidatorConfig,
608 should_check_duplicate_instance: bool,
609 rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
610 start_progress: Arc<RwLock<ValidatorStartProgress>>,
611 socket_addr_space: SocketAddrSpace,
612 tpu_config: ValidatorTpuConfig,
613 admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
614 ) -> Result<Self> {
615 let ValidatorTpuConfig {
616 use_quic,
617 vote_use_quic,
618 tpu_connection_pool_size,
619 tpu_enable_udp,
620 tpu_quic_server_config,
621 tpu_fwd_quic_server_config,
622 vote_quic_server_config,
623 } = tpu_config;
624
625 let start_time = Instant::now();
626
627 let thread_manager = ThreadManager::new(&config.thread_manager_config)?;
628 if rayon::ThreadPoolBuilder::new()
632 .thread_name(|i| format!("solRayonGlob{i:02}"))
633 .num_threads(config.rayon_global_threads.get())
634 .build_global()
635 .is_err()
636 {
637 warn!("Rayon global thread pool already initialized");
638 }
639
640 let id = identity_keypair.pubkey();
641 assert_eq!(&id, node.info.pubkey());
642
643 info!("identity pubkey: {id}");
644 info!("vote account pubkey: {vote_account}");
645
646 if !config.no_os_network_stats_reporting {
647 verify_net_stats_access().map_err(|e| {
648 ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
649 })?;
650 }
651
652 let mut bank_notification_senders = Vec::new();
653
654 let exit = Arc::new(AtomicBool::new(false));
655
656 let geyser_plugin_config_files = config
657 .on_start_geyser_plugin_config_files
658 .as_ref()
659 .map(Cow::Borrowed)
660 .or_else(|| {
661 config
662 .geyser_plugin_always_enabled
663 .then_some(Cow::Owned(vec![]))
664 });
665 let geyser_plugin_service =
666 if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
667 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
668 bank_notification_senders.push(confirmed_bank_sender);
669 let rpc_to_plugin_manager_receiver_and_exit =
670 rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
671 Some(
672 GeyserPluginService::new_with_receiver(
673 confirmed_bank_receiver,
674 config.geyser_plugin_always_enabled,
675 geyser_plugin_config_files.as_ref(),
676 rpc_to_plugin_manager_receiver_and_exit,
677 )
678 .map_err(|err| {
679 ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
680 })?,
681 )
682 } else {
683 None
684 };
685
686 if config.voting_disabled {
687 warn!("voting disabled");
688 authorized_voter_keypairs.write().unwrap().clear();
689 } else {
690 for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
691 warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
692 }
693 }
694
695 for cluster_entrypoint in &cluster_entrypoints {
696 info!("entrypoint: {:?}", cluster_entrypoint);
697 }
698
699 if solana_perf::perf_libs::api().is_some() {
700 info!("Initializing sigverify, this could take a while...");
701 } else {
702 info!("Initializing sigverify...");
703 }
704 sigverify::init();
705 info!("Initializing sigverify done.");
706
707 if !ledger_path.is_dir() {
708 return Err(anyhow!(
709 "ledger directory does not exist or is not accessible: {ledger_path:?}"
710 ));
711 }
712 let genesis_config = load_genesis(config, ledger_path)?;
713 metrics_config_sanity_check(genesis_config.cluster_type)?;
714
715 info!("Cleaning accounts paths..");
716 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
717 let mut timer = Measure::start("clean_accounts_paths");
718 cleanup_accounts_paths(config);
719 timer.stop();
720 info!("Cleaning accounts paths done. {timer}");
721
722 snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
723 snapshot_utils::purge_old_bank_snapshots_at_startup(
724 &config.snapshot_config.bank_snapshots_dir,
725 );
726
727 info!("Cleaning orphaned account snapshot directories..");
728 let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
729 clean_orphaned_account_snapshot_dirs(
730 &config.snapshot_config.bank_snapshots_dir,
731 &config.account_snapshot_paths,
732 )
733 .context("failed to clean orphaned account snapshot directories")?;
734 timer.stop();
735 info!("Cleaning orphaned account snapshot directories done. {timer}");
736
737 let accounts_hash_cache_path = config
739 .accounts_db_config
740 .as_ref()
741 .and_then(|config| config.accounts_hash_cache_path.as_ref())
742 .map(PathBuf::as_path)
743 .unwrap_or(ledger_path);
744 let old_accounts_hash_cache_dirs = [
745 ledger_path.join("calculate_accounts_hash_cache"),
746 accounts_hash_cache_path.join("full"),
747 accounts_hash_cache_path.join("incremental"),
748 accounts_hash_cache_path.join("transient"),
749 ];
750 for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs {
751 if old_accounts_hash_cache_dir.exists() {
752 move_and_async_delete_path(old_accounts_hash_cache_dir);
753 }
754 }
755
756 {
757 let exit = exit.clone();
758 config
759 .validator_exit
760 .write()
761 .unwrap()
762 .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
763 }
764
765 let accounts_update_notifier = geyser_plugin_service
766 .as_ref()
767 .and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier());
768
769 let transaction_notifier = geyser_plugin_service
770 .as_ref()
771 .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());
772
773 let entry_notifier = geyser_plugin_service
774 .as_ref()
775 .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());
776
777 let block_metadata_notifier = geyser_plugin_service
778 .as_ref()
779 .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
780
781 let slot_status_notifier = geyser_plugin_service
782 .as_ref()
783 .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier());
784
785 info!(
786 "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
787 entry_notifier: {}",
788 accounts_update_notifier.is_some(),
789 transaction_notifier.is_some(),
790 entry_notifier.is_some()
791 );
792
793 let system_monitor_service = Some(SystemMonitorService::new(
794 exit.clone(),
795 SystemMonitorStatsReportConfig {
796 report_os_memory_stats: !config.no_os_memory_stats_reporting,
797 report_os_network_stats: !config.no_os_network_stats_reporting,
798 report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
799 report_os_disk_stats: !config.no_os_disk_stats_reporting,
800 },
801 ));
802
803 let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();
804 let poh_timing_report_service =
805 PohTimingReportService::new(poh_timing_point_receiver, exit.clone());
806
807 let (
808 bank_forks,
809 blockstore,
810 original_blockstore_root,
811 ledger_signal_receiver,
812 leader_schedule_cache,
813 starting_snapshot_hashes,
814 TransactionHistoryServices {
815 transaction_status_sender,
816 transaction_status_service,
817 max_complete_transaction_status_slot,
818 max_complete_rewards_slot,
819 block_meta_sender,
820 block_meta_service,
821 },
822 blockstore_process_options,
823 blockstore_root_scan,
824 pruned_banks_receiver,
825 entry_notifier_service,
826 ) = load_blockstore(
827 config,
828 ledger_path,
829 &genesis_config,
830 exit.clone(),
831 &start_progress,
832 accounts_update_notifier,
833 transaction_notifier,
834 entry_notifier,
835 Some(poh_timing_point_sender.clone()),
836 )
837 .map_err(ValidatorError::Other)?;
838
839 if !config.no_poh_speed_test {
840 check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
841 }
842
843 let (root_slot, hard_forks) = {
844 let root_bank = bank_forks.read().unwrap().root_bank();
845 (root_bank.slot(), root_bank.hard_forks())
846 };
847 let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
848 info!(
849 "shred version: {shred_version}, hard forks: {:?}",
850 hard_forks
851 );
852
853 if let Some(expected_shred_version) = config.expected_shred_version {
854 if expected_shred_version != shred_version {
855 return Err(ValidatorError::ShredVersionMismatch {
856 actual: shred_version,
857 expected: expected_shred_version,
858 }
859 .into());
860 }
861 }
862
863 if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
864 config,
865 &blockstore,
866 root_slot,
867 &hard_forks,
868 )? {
869 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
870 cleanup_blockstore_incorrect_shred_versions(
871 &blockstore,
872 config,
873 start_slot,
874 shred_version,
875 )?;
876 } else {
877 info!("Skipping the blockstore check for shreds with incorrect version");
878 }
879
880 node.info.set_shred_version(shred_version);
881 node.info.set_wallclock(timestamp());
882 Self::print_node_info(&node);
883
884 let mut cluster_info = ClusterInfo::new(
885 node.info.clone(),
886 identity_keypair.clone(),
887 socket_addr_space,
888 );
889 cluster_info.set_contact_debug_interval(config.contact_debug_interval);
890 cluster_info.set_entrypoints(cluster_entrypoints);
891 cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
892 let cluster_info = Arc::new(cluster_info);
893
894 assert!(is_snapshot_config_valid(
895 &config.snapshot_config,
896 config.accounts_hash_interval_slots,
897 ));
898
899 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
900 let snapshot_packager_service = if config.snapshot_config.should_generate_snapshots() {
901 let enable_gossip_push = true;
902 let snapshot_packager_service = SnapshotPackagerService::new(
903 pending_snapshot_packages.clone(),
904 starting_snapshot_hashes,
905 exit.clone(),
906 cluster_info.clone(),
907 config.snapshot_config.clone(),
908 enable_gossip_push,
909 );
910 Some(snapshot_packager_service)
911 } else {
912 None
913 };
914
915 let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
916 let accounts_hash_verifier = AccountsHashVerifier::new(
917 accounts_package_sender.clone(),
918 accounts_package_receiver,
919 pending_snapshot_packages,
920 exit.clone(),
921 config.snapshot_config.clone(),
922 );
923
924 let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
925 let accounts_background_request_sender =
926 AbsRequestSender::new(snapshot_request_sender.clone());
927 let snapshot_request_handler = SnapshotRequestHandler {
928 snapshot_config: config.snapshot_config.clone(),
929 snapshot_request_sender,
930 snapshot_request_receiver,
931 accounts_package_sender,
932 };
933 let pruned_banks_request_handler = PrunedBanksRequestHandler {
934 pruned_banks_receiver,
935 };
936 let accounts_background_service = AccountsBackgroundService::new(
937 bank_forks.clone(),
938 exit.clone(),
939 AbsRequestHandlers {
940 snapshot_request_handler,
941 pruned_banks_request_handler,
942 },
943 config.accounts_db_test_hash_calculation,
944 );
945 info!(
946 "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}",
947 config.block_verification_method, config.block_production_method, config.transaction_struct
948 );
949
950 let (replay_vote_sender, replay_vote_receiver) = unbounded();
951
952 let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
955
956 let leader_schedule_cache = Arc::new(leader_schedule_cache);
957 let startup_verification_complete;
958 let (poh_recorder, entry_receiver, record_receiver) = {
959 let bank = &bank_forks.read().unwrap().working_bank();
960 startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
961 PohRecorder::new_with_clear_signal(
962 bank.tick_height(),
963 bank.last_blockhash(),
964 bank.clone(),
965 None,
966 bank.ticks_per_slot(),
967 config.delay_leader_block_for_pending_fork,
968 blockstore.clone(),
969 blockstore.get_new_shred_signal(0),
970 &leader_schedule_cache,
971 &genesis_config.poh_config,
972 Some(poh_timing_point_sender),
973 exit.clone(),
974 )
975 };
976 let poh_recorder = Arc::new(RwLock::new(poh_recorder));
977
978 let (banking_tracer, tracer_thread) =
979 BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
980 &blockstore.banking_trace_path(),
981 exit.clone(),
982 config.banking_trace_dir_byte_limit,
983 )))?;
984 if banking_tracer.is_enabled() {
985 info!(
986 "Enabled banking trace (dir_byte_limit: {})",
987 config.banking_trace_dir_byte_limit
988 );
989 } else {
990 info!("Disabled banking trace");
991 }
992 let banking_tracer_channels = banking_tracer.create_channels(false);
993
994 match &config.block_verification_method {
995 BlockVerificationMethod::BlockstoreProcessor => {
996 info!("no scheduler pool is installed for block verification...");
997 if let Some(count) = config.unified_scheduler_handler_threads {
998 warn!(
999 "--unified-scheduler-handler-threads={count} is ignored because unified \
1000 scheduler isn't enabled"
1001 );
1002 }
1003 }
1004 BlockVerificationMethod::UnifiedScheduler => {
1005 let scheduler_pool = DefaultSchedulerPool::new_dyn(
1006 config.unified_scheduler_handler_threads,
1007 config.runtime_config.log_messages_bytes_limit,
1008 transaction_status_sender.clone(),
1009 Some(replay_vote_sender.clone()),
1010 prioritization_fee_cache.clone(),
1011 );
1012 bank_forks
1013 .write()
1014 .unwrap()
1015 .install_scheduler_pool(scheduler_pool);
1016 }
1017 }
1018
1019 let entry_notification_sender = entry_notifier_service
1020 .as_ref()
1021 .map(|service| service.sender());
1022 let mut process_blockstore = ProcessBlockStore::new(
1023 &id,
1024 vote_account,
1025 &start_progress,
1026 &blockstore,
1027 original_blockstore_root,
1028 &bank_forks,
1029 &leader_schedule_cache,
1030 &blockstore_process_options,
1031 transaction_status_sender.as_ref(),
1032 block_meta_sender.clone(),
1033 entry_notification_sender,
1034 blockstore_root_scan,
1035 accounts_background_request_sender.clone(),
1036 config,
1037 );
1038
1039 maybe_warp_slot(
1040 config,
1041 &mut process_blockstore,
1042 ledger_path,
1043 &bank_forks,
1044 &leader_schedule_cache,
1045 &accounts_background_request_sender,
1046 )
1047 .map_err(ValidatorError::Other)?;
1048
1049 if config.process_ledger_before_services {
1050 process_blockstore
1051 .process()
1052 .map_err(ValidatorError::Other)?;
1053 }
1054 *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1055
1056 let sample_performance_service =
1057 if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1058 Some(SamplePerformanceService::new(
1059 &bank_forks,
1060 blockstore.clone(),
1061 exit.clone(),
1062 ))
1063 } else {
1064 None
1065 };
1066
1067 let mut block_commitment_cache = BlockCommitmentCache::default();
1068 let bank_forks_guard = bank_forks.read().unwrap();
1069 block_commitment_cache.initialize_slots(
1070 bank_forks_guard.working_bank().slot(),
1071 bank_forks_guard.root(),
1072 );
1073 drop(bank_forks_guard);
1074 let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1075
1076 let optimistically_confirmed_bank =
1077 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1078
1079 let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1080 exit.clone(),
1081 max_complete_transaction_status_slot.clone(),
1082 max_complete_rewards_slot.clone(),
1083 blockstore.clone(),
1084 bank_forks.clone(),
1085 block_commitment_cache.clone(),
1086 optimistically_confirmed_bank.clone(),
1087 &config.pubsub_config,
1088 None,
1089 ));
1090
1091 let max_slots = Arc::new(MaxSlots::default());
1092
1093 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1094
1095 let connection_cache = if use_quic {
1096 let connection_cache = ConnectionCache::new_with_client_options(
1097 "connection_cache_tpu_quic",
1098 tpu_connection_pool_size,
1099 None,
1100 Some((
1101 &identity_keypair,
1102 node.info
1103 .tpu(Protocol::UDP)
1104 .ok_or_else(|| {
1105 ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1106 })?
1107 .ip(),
1108 )),
1109 Some((&staked_nodes, &identity_keypair.pubkey())),
1110 );
1111 Arc::new(connection_cache)
1112 } else {
1113 Arc::new(ConnectionCache::with_udp(
1114 "connection_cache_tpu_udp",
1115 tpu_connection_pool_size,
1116 ))
1117 };
1118
1119 let vote_connection_cache = if vote_use_quic {
1120 let vote_connection_cache = ConnectionCache::new_with_client_options(
1121 "connection_cache_vote_quic",
1122 tpu_connection_pool_size,
1123 None, Some((
1125 &identity_keypair,
1126 node.info
1127 .tpu_vote(Protocol::QUIC)
1128 .ok_or_else(|| {
1129 ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1130 })?
1131 .ip(),
1132 )),
1133 Some((&staked_nodes, &identity_keypair.pubkey())),
1134 );
1135 Arc::new(vote_connection_cache)
1136 } else {
1137 Arc::new(ConnectionCache::with_udp(
1138 "connection_cache_vote_udp",
1139 tpu_connection_pool_size,
1140 ))
1141 };
1142
1143 let rpc_override_health_check =
1144 Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1145 let (
1146 json_rpc_service,
1147 pubsub_service,
1148 completed_data_sets_sender,
1149 completed_data_sets_service,
1150 rpc_completed_slots_service,
1151 optimistically_confirmed_bank_tracker,
1152 bank_notification_sender,
1153 ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1154 assert_eq!(
1155 node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1156 node.info
1157 .rpc_pubsub()
1158 .map(|addr| socket_addr_space.check(&addr))
1159 );
1160 let (bank_notification_sender, bank_notification_receiver) = unbounded();
1161 let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1162 Some(Arc::new(RwLock::new(bank_notification_senders)))
1163 } else {
1164 None
1165 };
1166
1167 let json_rpc_service = JsonRpcService::new(
1168 rpc_addr,
1169 config.rpc_config.clone(),
1170 Some(config.snapshot_config.clone()),
1171 bank_forks.clone(),
1172 block_commitment_cache.clone(),
1173 blockstore.clone(),
1174 cluster_info.clone(),
1175 Some(poh_recorder.clone()),
1176 genesis_config.hash(),
1177 ledger_path,
1178 config.validator_exit.clone(),
1179 exit.clone(),
1180 rpc_override_health_check.clone(),
1181 startup_verification_complete,
1182 optimistically_confirmed_bank.clone(),
1183 config.send_transaction_service_config.clone(),
1184 max_slots.clone(),
1185 leader_schedule_cache.clone(),
1186 connection_cache.clone(),
1187 max_complete_transaction_status_slot,
1188 max_complete_rewards_slot,
1189 prioritization_fee_cache.clone(),
1190 )
1191 .map_err(ValidatorError::Other)?;
1192
1193 let pubsub_service = if !config.rpc_config.full_api {
1194 None
1195 } else {
1196 let (trigger, pubsub_service) = PubSubService::new(
1197 config.pubsub_config.clone(),
1198 &rpc_subscriptions,
1199 rpc_pubsub_addr,
1200 );
1201 config
1202 .validator_exit
1203 .write()
1204 .unwrap()
1205 .register_exit(Box::new(move || trigger.cancel()));
1206
1207 Some(pubsub_service)
1208 };
1209
1210 let (completed_data_sets_sender, completed_data_sets_service) =
1211 if !config.rpc_config.full_api {
1212 (None, None)
1213 } else {
1214 let (completed_data_sets_sender, completed_data_sets_receiver) =
1215 bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1216 let completed_data_sets_service = CompletedDataSetsService::new(
1217 completed_data_sets_receiver,
1218 blockstore.clone(),
1219 rpc_subscriptions.clone(),
1220 exit.clone(),
1221 max_slots.clone(),
1222 );
1223 (
1224 Some(completed_data_sets_sender),
1225 Some(completed_data_sets_service),
1226 )
1227 };
1228
1229 let rpc_completed_slots_service =
1230 if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1231 let (completed_slots_sender, completed_slots_receiver) =
1232 bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1233 blockstore.add_completed_slots_signal(completed_slots_sender);
1234
1235 Some(RpcCompletedSlotsService::spawn(
1236 completed_slots_receiver,
1237 rpc_subscriptions.clone(),
1238 slot_status_notifier.clone(),
1239 exit.clone(),
1240 ))
1241 } else {
1242 None
1243 };
1244
1245 let optimistically_confirmed_bank_tracker =
1246 Some(OptimisticallyConfirmedBankTracker::new(
1247 bank_notification_receiver,
1248 exit.clone(),
1249 bank_forks.clone(),
1250 optimistically_confirmed_bank,
1251 rpc_subscriptions.clone(),
1252 confirmed_bank_subscribers,
1253 prioritization_fee_cache.clone(),
1254 ));
1255 let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1256 sender: bank_notification_sender,
1257 should_send_parents: geyser_plugin_service.is_some(),
1258 });
1259 (
1260 Some(json_rpc_service),
1261 pubsub_service,
1262 completed_data_sets_sender,
1263 completed_data_sets_service,
1264 rpc_completed_slots_service,
1265 optimistically_confirmed_bank_tracker,
1266 bank_notification_sender_config,
1267 )
1268 } else {
1269 (None, None, None, None, None, None, None)
1270 };
1271
1272 if config.halt_at_slot.is_some() {
1273 block_commitment_cache
1276 .write()
1277 .unwrap()
1278 .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1279
1280 warn!("Validator halted");
1282 *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1283 std::thread::park();
1284 }
1285 let ip_echo_server = match node.sockets.ip_echo {
1286 None => None,
1287 Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1288 tcp_listener,
1289 config.ip_echo_server_threads,
1290 Some(node.info.shred_version()),
1291 )),
1292 };
1293
1294 let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1295
1296 let stats_reporter_service =
1297 StatsReporterService::new(stats_reporter_receiver, exit.clone());
1298
1299 let gossip_service = GossipService::new(
1300 &cluster_info,
1301 Some(bank_forks.clone()),
1302 node.sockets.gossip,
1303 config.gossip_validators.clone(),
1304 should_check_duplicate_instance,
1305 Some(stats_reporter_sender.clone()),
1306 exit.clone(),
1307 );
1308 let serve_repair = ServeRepair::new(
1309 cluster_info.clone(),
1310 bank_forks.clone(),
1311 config.repair_whitelist.clone(),
1312 );
1313 let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1314 let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1315 let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1316 unbounded();
1317
1318 let waited_for_supermajority = wait_for_supermajority(
1319 config,
1320 Some(&mut process_blockstore),
1321 &bank_forks,
1322 &cluster_info,
1323 rpc_override_health_check,
1324 &start_progress,
1325 )?;
1326
1327 let blockstore_metric_report_service =
1328 BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1329
1330 let wait_for_vote_to_start_leader =
1331 !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1332
1333 let poh_service = PohService::new(
1334 poh_recorder.clone(),
1335 &genesis_config.poh_config,
1336 exit.clone(),
1337 bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1338 config.poh_pinned_cpu_core,
1339 config.poh_hashes_per_batch,
1340 record_receiver,
1341 );
1342 assert_eq!(
1343 blockstore.get_new_shred_signals_len(),
1344 1,
1345 "New shred signal for the TVU should be the same as the clear bank signal."
1346 );
1347
1348 let vote_tracker = Arc::<VoteTracker>::default();
1349
1350 let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1351 let (verified_vote_sender, verified_vote_receiver) = unbounded();
1352 let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1353 let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1354
1355 let entry_notification_sender = entry_notifier_service
1356 .as_ref()
1357 .map(|service| service.sender_cloned());
1358
1359 let current_runtime_handle = tokio::runtime::Handle::try_current();
1365 let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1366 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1367 .then(|| {
1368 tokio::runtime::Builder::new_multi_thread()
1369 .enable_all()
1370 .thread_name("solTurbineQuic")
1371 .build()
1372 .unwrap()
1373 });
1374 let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1375 let (
1376 turbine_quic_endpoint,
1377 turbine_quic_endpoint_sender,
1378 turbine_quic_endpoint_join_handle,
1379 ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1380 let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1381 (None, sender, None)
1382 } else {
1383 solana_turbine::quic_endpoint::new_quic_endpoint(
1384 turbine_quic_endpoint_runtime
1385 .as_ref()
1386 .map(TokioRuntime::handle)
1387 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1388 &identity_keypair,
1389 node.sockets.tvu_quic,
1390 turbine_quic_endpoint_sender,
1391 bank_forks.clone(),
1392 )
1393 .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1394 .unwrap()
1395 };
1396
1397 let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1399 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1400 .then(|| {
1401 tokio::runtime::Builder::new_multi_thread()
1402 .enable_all()
1403 .thread_name("solRepairQuic")
1404 .build()
1405 .unwrap()
1406 });
1407 let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1408 if genesis_config.cluster_type == ClusterType::MainnetBeta {
1409 (None, RepairQuicAsyncSenders::new_dummy(), None)
1410 } else {
1411 let repair_quic_sockets = RepairQuicSockets {
1412 repair_server_quic_socket: node.sockets.serve_repair_quic,
1413 repair_client_quic_socket: node.sockets.repair_quic,
1414 ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1415 };
1416 let repair_quic_senders = RepairQuicSenders {
1417 repair_request_quic_sender: repair_request_quic_sender.clone(),
1418 repair_response_quic_sender,
1419 ancestor_hashes_response_quic_sender,
1420 };
1421 repair::quic_endpoint::new_quic_endpoints(
1422 repair_quic_endpoints_runtime
1423 .as_ref()
1424 .map(TokioRuntime::handle)
1425 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1426 &identity_keypair,
1427 repair_quic_sockets,
1428 repair_quic_senders,
1429 bank_forks.clone(),
1430 )
1431 .map(|(endpoints, senders, join_handle)| {
1432 (Some(endpoints), senders, Some(join_handle))
1433 })
1434 .unwrap()
1435 };
1436 let serve_repair_service = ServeRepairService::new(
1437 serve_repair,
1438 repair_request_quic_sender,
1441 repair_request_quic_receiver,
1442 repair_quic_async_senders.repair_response_quic_sender,
1443 blockstore.clone(),
1444 node.sockets.serve_repair,
1445 socket_addr_space,
1446 stats_reporter_sender,
1447 exit.clone(),
1448 );
1449
1450 let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1451 let wen_restart_repair_slots = if in_wen_restart {
1452 Some(Arc::new(RwLock::new(Vec::new())))
1453 } else {
1454 None
1455 };
1456 let tower = match process_blockstore.process_to_create_tower() {
1457 Ok(tower) => {
1458 info!("Tower state: {:?}", tower);
1459 tower
1460 }
1461 Err(e) => {
1462 warn!(
1463 "Unable to retrieve tower: {:?} creating default tower....",
1464 e
1465 );
1466 Tower::default()
1467 }
1468 };
1469 let last_vote = tower.last_vote();
1470
1471 let outstanding_repair_requests =
1472 Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1473 let cluster_slots =
1474 Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
1475
1476 let tvu = Tvu::new(
1477 vote_account,
1478 authorized_voter_keypairs,
1479 &bank_forks,
1480 &cluster_info,
1481 TvuSockets {
1482 repair: node.sockets.repair.try_clone().unwrap(),
1483 retransmit: node.sockets.retransmit_sockets,
1484 fetch: node.sockets.tvu,
1485 ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1486 },
1487 blockstore.clone(),
1488 ledger_signal_receiver,
1489 &rpc_subscriptions,
1490 &poh_recorder,
1491 tower,
1492 config.tower_storage.clone(),
1493 &leader_schedule_cache,
1494 exit.clone(),
1495 block_commitment_cache,
1496 config.turbine_disabled.clone(),
1497 transaction_status_sender.clone(),
1498 block_meta_sender,
1499 entry_notification_sender.clone(),
1500 vote_tracker.clone(),
1501 retransmit_slots_sender,
1502 gossip_verified_vote_hash_receiver,
1503 verified_vote_receiver,
1504 replay_vote_sender.clone(),
1505 completed_data_sets_sender,
1506 bank_notification_sender.clone(),
1507 duplicate_confirmed_slots_receiver,
1508 TvuConfig {
1509 max_ledger_shreds: config.max_ledger_shreds,
1510 shred_version: node.info.shred_version(),
1511 repair_validators: config.repair_validators.clone(),
1512 repair_whitelist: config.repair_whitelist.clone(),
1513 wait_for_vote_to_start_leader,
1514 replay_forks_threads: config.replay_forks_threads,
1515 replay_transactions_threads: config.replay_transactions_threads,
1516 shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1517 },
1518 &max_slots,
1519 block_metadata_notifier,
1520 config.wait_to_vote_slot,
1521 accounts_background_request_sender.clone(),
1522 config.runtime_config.log_messages_bytes_limit,
1523 json_rpc_service.is_some().then_some(&connection_cache), &prioritization_fee_cache,
1525 banking_tracer.clone(),
1526 turbine_quic_endpoint_sender.clone(),
1527 turbine_quic_endpoint_receiver,
1528 repair_response_quic_receiver,
1529 repair_quic_async_senders.repair_request_quic_sender,
1530 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1531 ancestor_hashes_response_quic_receiver,
1532 outstanding_repair_requests.clone(),
1533 cluster_slots.clone(),
1534 wen_restart_repair_slots.clone(),
1535 slot_status_notifier,
1536 vote_connection_cache,
1537 )
1538 .map_err(ValidatorError::Other)?;
1539
1540 if in_wen_restart {
1541 info!("Waiting for wen_restart to finish");
1542 wait_for_wen_restart(WenRestartConfig {
1543 wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1544 wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1545 last_vote,
1546 blockstore: blockstore.clone(),
1547 cluster_info: cluster_info.clone(),
1548 bank_forks: bank_forks.clone(),
1549 wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1550 wait_for_supermajority_threshold_percent:
1551 WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1552 snapshot_config: config.snapshot_config.clone(),
1553 accounts_background_request_sender: accounts_background_request_sender.clone(),
1554 genesis_config_hash: genesis_config.hash(),
1555 exit: exit.clone(),
1556 })?;
1557 return Err(ValidatorError::WenRestartFinished.into());
1558 }
1559
1560 let (tpu, mut key_notifies) = Tpu::new(
1561 &cluster_info,
1562 &poh_recorder,
1563 entry_receiver,
1564 retransmit_slots_receiver,
1565 TpuSockets {
1566 transactions: node.sockets.tpu,
1567 transaction_forwards: node.sockets.tpu_forwards,
1568 vote: node.sockets.tpu_vote,
1569 broadcast: node.sockets.broadcast,
1570 transactions_quic: node.sockets.tpu_quic,
1571 transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1572 vote_quic: node.sockets.tpu_vote_quic,
1573 },
1574 &rpc_subscriptions,
1575 transaction_status_sender,
1576 entry_notification_sender,
1577 blockstore.clone(),
1578 &config.broadcast_stage_type,
1579 exit,
1580 node.info.shred_version(),
1581 vote_tracker,
1582 bank_forks.clone(),
1583 verified_vote_sender,
1584 gossip_verified_vote_hash_sender,
1585 replay_vote_receiver,
1586 replay_vote_sender,
1587 bank_notification_sender.map(|sender| sender.sender),
1588 config.tpu_coalesce,
1589 duplicate_confirmed_slot_sender,
1590 &connection_cache,
1591 turbine_quic_endpoint_sender,
1592 &identity_keypair,
1593 config.runtime_config.log_messages_bytes_limit,
1594 &staked_nodes,
1595 config.staked_nodes_overrides.clone(),
1596 banking_tracer_channels,
1597 tracer_thread,
1598 tpu_enable_udp,
1599 tpu_quic_server_config,
1600 tpu_fwd_quic_server_config,
1601 vote_quic_server_config,
1602 &prioritization_fee_cache,
1603 config.block_production_method.clone(),
1604 config.transaction_struct.clone(),
1605 config.enable_block_production_forwarding,
1606 config.generator_config.clone(),
1607 );
1608
1609 datapoint_info!(
1610 "validator-new",
1611 ("id", id.to_string(), String),
1612 ("version", solana_version::version!(), String),
1613 ("cluster_type", genesis_config.cluster_type as u32, i64),
1614 ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1615 ("waited_for_supermajority", waited_for_supermajority, bool),
1616 ("shred_version", shred_version as i64, i64),
1617 );
1618
1619 *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1620 key_notifies.push(connection_cache);
1621
1622 *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1623 bank_forks: bank_forks.clone(),
1624 cluster_info: cluster_info.clone(),
1625 vote_account: *vote_account,
1626 repair_whitelist: config.repair_whitelist.clone(),
1627 notifies: key_notifies,
1628 repair_socket: Arc::new(node.sockets.repair),
1629 outstanding_repair_requests,
1630 cluster_slots,
1631 });
1632
1633 Ok(Self {
1634 stats_reporter_service,
1635 gossip_service,
1636 serve_repair_service,
1637 json_rpc_service,
1638 pubsub_service,
1639 rpc_completed_slots_service,
1640 optimistically_confirmed_bank_tracker,
1641 transaction_status_service,
1642 block_meta_service,
1643 entry_notifier_service,
1644 system_monitor_service,
1645 sample_performance_service,
1646 poh_timing_report_service,
1647 snapshot_packager_service,
1648 completed_data_sets_service,
1649 tpu,
1650 tvu,
1651 poh_service,
1652 poh_recorder,
1653 ip_echo_server,
1654 validator_exit: config.validator_exit.clone(),
1655 cluster_info,
1656 bank_forks,
1657 blockstore,
1658 geyser_plugin_service,
1659 blockstore_metric_report_service,
1660 accounts_background_service,
1661 accounts_hash_verifier,
1662 turbine_quic_endpoint,
1663 turbine_quic_endpoint_runtime,
1664 turbine_quic_endpoint_join_handle,
1665 repair_quic_endpoints,
1666 repair_quic_endpoints_runtime,
1667 repair_quic_endpoints_join_handle,
1668 thread_manager,
1669 })
1670 }
1671
1672 pub fn exit(&mut self) {
1674 self.validator_exit.write().unwrap().exit();
1675
1676 self.blockstore.drop_signal();
1678 }
1679
1680 pub fn close(mut self) {
1681 self.exit();
1682 self.join();
1683 }
1684
1685 fn print_node_info(node: &Node) {
1686 info!("{:?}", node.info);
1687 info!(
1688 "local gossip address: {}",
1689 node.sockets.gossip.local_addr().unwrap()
1690 );
1691 info!(
1692 "local broadcast address: {}",
1693 node.sockets
1694 .broadcast
1695 .first()
1696 .unwrap()
1697 .local_addr()
1698 .unwrap()
1699 );
1700 info!(
1701 "local repair address: {}",
1702 node.sockets.repair.local_addr().unwrap()
1703 );
1704 info!(
1705 "local retransmit address: {}",
1706 node.sockets.retransmit_sockets[0].local_addr().unwrap()
1707 );
1708 }
1709
1710 pub fn join(self) {
1711 drop(self.bank_forks);
1712 drop(self.cluster_info);
1713
1714 self.poh_service.join().expect("poh_service");
1715 drop(self.poh_recorder);
1716
1717 if let Some(json_rpc_service) = self.json_rpc_service {
1718 json_rpc_service.join().expect("rpc_service");
1719 }
1720
1721 if let Some(pubsub_service) = self.pubsub_service {
1722 pubsub_service.join().expect("pubsub_service");
1723 }
1724
1725 if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1726 rpc_completed_slots_service
1727 .join()
1728 .expect("rpc_completed_slots_service");
1729 }
1730
1731 if let Some(optimistically_confirmed_bank_tracker) =
1732 self.optimistically_confirmed_bank_tracker
1733 {
1734 optimistically_confirmed_bank_tracker
1735 .join()
1736 .expect("optimistically_confirmed_bank_tracker");
1737 }
1738
1739 if let Some(transaction_status_service) = self.transaction_status_service {
1740 transaction_status_service
1741 .join()
1742 .expect("transaction_status_service");
1743 }
1744
1745 if let Some(block_meta_service) = self.block_meta_service {
1746 block_meta_service.join().expect("block_meta_service");
1747 }
1748
1749 if let Some(system_monitor_service) = self.system_monitor_service {
1750 system_monitor_service
1751 .join()
1752 .expect("system_monitor_service");
1753 }
1754
1755 if let Some(sample_performance_service) = self.sample_performance_service {
1756 sample_performance_service
1757 .join()
1758 .expect("sample_performance_service");
1759 }
1760
1761 if let Some(entry_notifier_service) = self.entry_notifier_service {
1762 entry_notifier_service
1763 .join()
1764 .expect("entry_notifier_service");
1765 }
1766
1767 if let Some(s) = self.snapshot_packager_service {
1768 s.join().expect("snapshot_packager_service");
1769 }
1770
1771 self.gossip_service.join().expect("gossip_service");
1772 self.repair_quic_endpoints
1773 .iter()
1774 .flatten()
1775 .for_each(repair::quic_endpoint::close_quic_endpoint);
1776 self.serve_repair_service
1777 .join()
1778 .expect("serve_repair_service");
1779 if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1780 self.repair_quic_endpoints_runtime
1781 .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1782 .transpose()
1783 .unwrap();
1784 }
1785 self.stats_reporter_service
1786 .join()
1787 .expect("stats_reporter_service");
1788 self.blockstore_metric_report_service
1789 .join()
1790 .expect("ledger_metric_report_service");
1791 self.accounts_background_service
1792 .join()
1793 .expect("accounts_background_service");
1794 self.accounts_hash_verifier
1795 .join()
1796 .expect("accounts_hash_verifier");
1797 if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1798 solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1799 }
1800 self.tpu.join().expect("tpu");
1801 self.tvu.join().expect("tvu");
1802 if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1803 self.turbine_quic_endpoint_runtime
1804 .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1805 .transpose()
1806 .unwrap();
1807 }
1808 if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1809 completed_data_sets_service
1810 .join()
1811 .expect("completed_data_sets_service");
1812 }
1813 if let Some(ip_echo_server) = self.ip_echo_server {
1814 ip_echo_server.shutdown_background();
1815 }
1816
1817 if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1818 geyser_plugin_service.join().expect("geyser_plugin_service");
1819 }
1820
1821 self.poh_timing_report_service
1822 .join()
1823 .expect("poh_timing_report_service");
1824 self.thread_manager.destroy();
1825 }
1826}
1827
1828fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1829 if let Some(account) = &bank.get_account(vote_account) {
1830 if let Some(vote_state) = vote_state::from(account) {
1831 return !vote_state.votes.is_empty();
1832 }
1833 }
1834 false
1835}
1836
1837fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1838 let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1839 warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1840 return Ok(());
1841 };
1842
1843 let ticks_per_slot = bank.ticks_per_slot();
1844 let hashes_per_slot = hashes_per_tick * ticks_per_slot;
1845 let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
1846
1847 let hash_time = compute_hash_time(hash_samples);
1848 let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
1849
1850 let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
1851 let target_hashes_per_second =
1852 (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
1853
1854 info!(
1855 "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
1856 second {target_hashes_per_second}"
1857 );
1858 if my_hashes_per_second < target_hashes_per_second {
1859 return Err(ValidatorError::PohTooSlow {
1860 mine: my_hashes_per_second,
1861 target: target_hashes_per_second,
1862 });
1863 }
1864
1865 Ok(())
1866}
1867
1868fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
1869 if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
1871 if wait_slot_for_supermajority == root_slot {
1872 return Some(wait_slot_for_supermajority);
1873 }
1874 }
1875
1876 None
1877}
1878
1879fn post_process_restored_tower(
1880 restored_tower: crate::consensus::Result<Tower>,
1881 validator_identity: &Pubkey,
1882 vote_account: &Pubkey,
1883 config: &ValidatorConfig,
1884 bank_forks: &BankForks,
1885) -> Result<Tower, String> {
1886 let mut should_require_tower = config.require_tower;
1887
1888 let restored_tower = restored_tower.and_then(|tower| {
1889 let root_bank = bank_forks.root_bank();
1890 let slot_history = root_bank.get_slot_history();
1891 let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
1893
1894 if let Some(hard_fork_restart_slot) =
1895 maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
1896 {
1897 let message =
1901 format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
1902 datapoint_error!("tower_error", ("error", message, String),);
1903 error!("{}", message);
1904
1905 should_require_tower = false;
1908 return Err(crate::consensus::TowerError::HardFork(
1909 hard_fork_restart_slot,
1910 ));
1911 }
1912
1913 if let Some(warp_slot) = config.warp_slot {
1914 should_require_tower = false;
1917 return Err(crate::consensus::TowerError::HardFork(warp_slot));
1918 }
1919
1920 tower
1921 });
1922
1923 let restored_tower = match restored_tower {
1924 Ok(tower) => tower,
1925 Err(err) => {
1926 let voting_has_been_active =
1927 active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
1928 if !err.is_file_missing() {
1929 datapoint_error!(
1930 "tower_error",
1931 ("error", format!("Unable to restore tower: {err}"), String),
1932 );
1933 }
1934 if should_require_tower && voting_has_been_active {
1935 return Err(format!(
1936 "Requested mandatory tower restore failed: {err}. And there is an existing \
1937 vote_account containing actual votes. Aborting due to possible conflicting \
1938 duplicate votes"
1939 ));
1940 }
1941 if err.is_file_missing() && !voting_has_been_active {
1942 info!(
1944 "Ignoring expected failed tower restore because this is the initial validator \
1945 start with the vote account..."
1946 );
1947 } else {
1948 error!(
1949 "Rebuilding a new tower from the latest vote account due to failed tower \
1950 restore: {}",
1951 err
1952 );
1953 }
1954
1955 Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
1956 }
1957 };
1958
1959 Ok(restored_tower)
1960}
1961
1962fn load_genesis(
1963 config: &ValidatorConfig,
1964 ledger_path: &Path,
1965) -> Result<GenesisConfig, ValidatorError> {
1966 let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
1967 .map_err(ValidatorError::OpenGenesisConfig)?;
1968
1969 let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
1972 let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
1973 let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
1974 assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
1975
1976 let genesis_hash = genesis_config.hash();
1977 info!("genesis hash: {}", genesis_hash);
1978
1979 if let Some(expected_genesis_hash) = config.expected_genesis_hash {
1980 if genesis_hash != expected_genesis_hash {
1981 return Err(ValidatorError::GenesisHashMismatch(
1982 genesis_hash,
1983 expected_genesis_hash,
1984 ));
1985 }
1986 }
1987
1988 Ok(genesis_config)
1989}
1990
1991#[allow(clippy::type_complexity)]
1992fn load_blockstore(
1993 config: &ValidatorConfig,
1994 ledger_path: &Path,
1995 genesis_config: &GenesisConfig,
1996 exit: Arc<AtomicBool>,
1997 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
1998 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1999 transaction_notifier: Option<TransactionNotifierArc>,
2000 entry_notifier: Option<EntryNotifierArc>,
2001 poh_timing_point_sender: Option<PohTimingSender>,
2002) -> Result<
2003 (
2004 Arc<RwLock<BankForks>>,
2005 Arc<Blockstore>,
2006 Slot,
2007 Receiver<bool>,
2008 LeaderScheduleCache,
2009 Option<StartingSnapshotHashes>,
2010 TransactionHistoryServices,
2011 blockstore_processor::ProcessOptions,
2012 BlockstoreRootScan,
2013 DroppedSlotsReceiver,
2014 Option<EntryNotifierService>,
2015 ),
2016 String,
2017> {
2018 info!("loading ledger from {:?}...", ledger_path);
2019 *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2020
2021 let mut blockstore =
2022 Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2023 .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2024
2025 let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2026 blockstore.add_new_shred_signal(ledger_signal_sender);
2027
2028 blockstore.shred_timing_point_sender = poh_timing_point_sender;
2029 let original_blockstore_root = blockstore.max_root();
2032
2033 let blockstore = Arc::new(blockstore);
2034 let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2035 let halt_at_slot = config
2036 .halt_at_slot
2037 .or_else(|| blockstore.highest_slot().unwrap_or(None));
2038
2039 let process_options = blockstore_processor::ProcessOptions {
2040 run_verification: config.run_verification,
2041 halt_at_slot,
2042 new_hard_forks: config.new_hard_forks.clone(),
2043 debug_keys: config.debug_keys.clone(),
2044 accounts_db_config: config.accounts_db_config.clone(),
2045 accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
2046 accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2047 accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2048 runtime_config: config.runtime_config.clone(),
2049 use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2050 ..blockstore_processor::ProcessOptions::default()
2051 };
2052
2053 let enable_rpc_transaction_history =
2054 config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2055 let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2056 let transaction_history_services =
2057 if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2058 initialize_rpc_transaction_history_services(
2059 blockstore.clone(),
2060 exit.clone(),
2061 enable_rpc_transaction_history,
2062 config.rpc_config.enable_extended_tx_metadata_storage,
2063 transaction_notifier,
2064 )
2065 } else {
2066 TransactionHistoryServices::default()
2067 };
2068
2069 let entry_notifier_service = entry_notifier
2070 .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2071
2072 let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2073 bank_forks_utils::load_bank_forks(
2074 genesis_config,
2075 &blockstore,
2076 config.account_paths.clone(),
2077 Some(&config.snapshot_config),
2078 &process_options,
2079 transaction_history_services.block_meta_sender.as_ref(),
2080 entry_notifier_service
2081 .as_ref()
2082 .map(|service| service.sender()),
2083 accounts_update_notifier,
2084 exit,
2085 )
2086 .map_err(|err| err.to_string())?;
2087
2088 let pruned_banks_receiver =
2094 AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2095
2096 leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2097 {
2098 let mut bank_forks = bank_forks.write().unwrap();
2099 bank_forks.set_snapshot_config(Some(config.snapshot_config.clone()));
2100 bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
2101 }
2102
2103 Ok((
2104 bank_forks,
2105 blockstore,
2106 original_blockstore_root,
2107 ledger_signal_receiver,
2108 leader_schedule_cache,
2109 starting_snapshot_hashes,
2110 transaction_history_services,
2111 process_options,
2112 blockstore_root_scan,
2113 pruned_banks_receiver,
2114 entry_notifier_service,
2115 ))
2116}
2117
2118pub struct ProcessBlockStore<'a> {
2119 id: &'a Pubkey,
2120 vote_account: &'a Pubkey,
2121 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2122 blockstore: &'a Blockstore,
2123 original_blockstore_root: Slot,
2124 bank_forks: &'a Arc<RwLock<BankForks>>,
2125 leader_schedule_cache: &'a LeaderScheduleCache,
2126 process_options: &'a blockstore_processor::ProcessOptions,
2127 transaction_status_sender: Option<&'a TransactionStatusSender>,
2128 block_meta_sender: Option<BlockMetaSender>,
2129 entry_notification_sender: Option<&'a EntryNotifierSender>,
2130 blockstore_root_scan: Option<BlockstoreRootScan>,
2131 accounts_background_request_sender: AbsRequestSender,
2132 config: &'a ValidatorConfig,
2133 tower: Option<Tower>,
2134}
2135
2136impl<'a> ProcessBlockStore<'a> {
2137 #[allow(clippy::too_many_arguments)]
2138 fn new(
2139 id: &'a Pubkey,
2140 vote_account: &'a Pubkey,
2141 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2142 blockstore: &'a Blockstore,
2143 original_blockstore_root: Slot,
2144 bank_forks: &'a Arc<RwLock<BankForks>>,
2145 leader_schedule_cache: &'a LeaderScheduleCache,
2146 process_options: &'a blockstore_processor::ProcessOptions,
2147 transaction_status_sender: Option<&'a TransactionStatusSender>,
2148 block_meta_sender: Option<BlockMetaSender>,
2149 entry_notification_sender: Option<&'a EntryNotifierSender>,
2150 blockstore_root_scan: BlockstoreRootScan,
2151 accounts_background_request_sender: AbsRequestSender,
2152 config: &'a ValidatorConfig,
2153 ) -> Self {
2154 Self {
2155 id,
2156 vote_account,
2157 start_progress,
2158 blockstore,
2159 original_blockstore_root,
2160 bank_forks,
2161 leader_schedule_cache,
2162 process_options,
2163 transaction_status_sender,
2164 block_meta_sender,
2165 entry_notification_sender,
2166 blockstore_root_scan: Some(blockstore_root_scan),
2167 accounts_background_request_sender,
2168 config,
2169 tower: None,
2170 }
2171 }
2172
2173 pub(crate) fn process(&mut self) -> Result<(), String> {
2174 if self.tower.is_none() {
2175 let previous_start_process = *self.start_progress.read().unwrap();
2176 *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2177
2178 let exit = Arc::new(AtomicBool::new(false));
2179 if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2180 let bank_forks = self.bank_forks.clone();
2181 let exit = exit.clone();
2182 let start_progress = self.start_progress.clone();
2183
2184 let _ = Builder::new()
2185 .name("solRptLdgrStat".to_string())
2186 .spawn(move || {
2187 while !exit.load(Ordering::Relaxed) {
2188 let slot = bank_forks.read().unwrap().working_bank().slot();
2189 *start_progress.write().unwrap() =
2190 ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2191 sleep(Duration::from_secs(2));
2192 }
2193 })
2194 .unwrap();
2195 }
2196 blockstore_processor::process_blockstore_from_root(
2197 self.blockstore,
2198 self.bank_forks,
2199 self.leader_schedule_cache,
2200 self.process_options,
2201 self.transaction_status_sender,
2202 self.block_meta_sender.as_ref(),
2203 self.entry_notification_sender,
2204 &self.accounts_background_request_sender,
2205 )
2206 .map_err(|err| {
2207 exit.store(true, Ordering::Relaxed);
2208 format!("Failed to load ledger: {err:?}")
2209 })?;
2210 exit.store(true, Ordering::Relaxed);
2211
2212 if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2213 blockstore_root_scan.join();
2214 }
2215
2216 self.tower = Some({
2217 let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2218 if let Ok(tower) = &restored_tower {
2219 reconcile_blockstore_roots_with_external_source(
2221 ExternalRootSource::Tower(tower.root()),
2222 self.blockstore,
2223 &mut self.original_blockstore_root,
2224 )
2225 .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2226 }
2227
2228 post_process_restored_tower(
2229 restored_tower,
2230 self.id,
2231 self.vote_account,
2232 self.config,
2233 &self.bank_forks.read().unwrap(),
2234 )?
2235 });
2236
2237 if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2238 self.config,
2239 self.bank_forks.read().unwrap().root(),
2240 ) {
2241 reconcile_blockstore_roots_with_external_source(
2244 ExternalRootSource::HardFork(hard_fork_restart_slot),
2245 self.blockstore,
2246 &mut self.original_blockstore_root,
2247 )
2248 .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2249 }
2250
2251 *self.start_progress.write().unwrap() = previous_start_process;
2252 }
2253 Ok(())
2254 }
2255
2256 pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2257 self.process()?;
2258 Ok(self.tower.unwrap())
2259 }
2260}
2261
2262fn maybe_warp_slot(
2263 config: &ValidatorConfig,
2264 process_blockstore: &mut ProcessBlockStore,
2265 ledger_path: &Path,
2266 bank_forks: &RwLock<BankForks>,
2267 leader_schedule_cache: &LeaderScheduleCache,
2268 accounts_background_request_sender: &AbsRequestSender,
2269) -> Result<(), String> {
2270 if let Some(warp_slot) = config.warp_slot {
2271 let mut bank_forks = bank_forks.write().unwrap();
2272
2273 let working_bank = bank_forks.working_bank();
2274
2275 if warp_slot <= working_bank.slot() {
2276 return Err(format!(
2277 "warp slot ({}) cannot be less than the working bank slot ({})",
2278 warp_slot,
2279 working_bank.slot()
2280 ));
2281 }
2282 info!("warping to slot {}", warp_slot);
2283
2284 let root_bank = bank_forks.root_bank();
2285
2286 root_bank.squash();
2290 root_bank.force_flush_accounts_cache();
2291
2292 bank_forks.insert(Bank::warp_from_parent(
2293 root_bank,
2294 &Pubkey::default(),
2295 warp_slot,
2296 solana_accounts_db::accounts_db::CalcAccountsHashDataSource::Storages,
2297 ));
2298 bank_forks
2299 .set_root(
2300 warp_slot,
2301 accounts_background_request_sender,
2302 Some(warp_slot),
2303 )
2304 .map_err(|err| err.to_string())?;
2305 leader_schedule_cache.set_root(&bank_forks.root_bank());
2306
2307 let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2308 ledger_path,
2309 &bank_forks.root_bank(),
2310 None,
2311 &config.snapshot_config.full_snapshot_archives_dir,
2312 &config.snapshot_config.incremental_snapshot_archives_dir,
2313 config.snapshot_config.archive_format,
2314 ) {
2315 Ok(archive_info) => archive_info,
2316 Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2317 };
2318 info!(
2319 "created snapshot: {}",
2320 full_snapshot_archive_info.path().display()
2321 );
2322
2323 drop(bank_forks);
2324 process_blockstore.process()?;
2327 }
2328 Ok(())
2329}
2330
2331fn should_cleanup_blockstore_incorrect_shred_versions(
2334 config: &ValidatorConfig,
2335 blockstore: &Blockstore,
2336 root_slot: Slot,
2337 hard_forks: &HardForks,
2338) -> Result<Option<Slot>, BlockstoreError> {
2339 let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2341 if maybe_cluster_restart_slot.is_some() {
2342 return Ok(Some(root_slot + 1));
2343 }
2344
2345 let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2347 return Ok(None);
2348 };
2349
2350 let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2352 return Ok(None);
2353 };
2354 let blockstore_min_slot = blockstore.lowest_slot();
2355 info!(
2356 "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2357 latest hard fork is {latest_hard_fork}"
2358 );
2359
2360 if latest_hard_fork < blockstore_min_slot {
2361 Ok(None)
2369 } else if latest_hard_fork < blockstore_max_slot {
2370 Ok(Some(latest_hard_fork + 1))
2381 } else {
2382 Ok(None)
2390 }
2391}
2392
2393fn scan_blockstore_for_incorrect_shred_version(
2396 blockstore: &Blockstore,
2397 start_slot: Slot,
2398 expected_shred_version: u16,
2399) -> Result<Option<u16>, BlockstoreError> {
2400 const TIMEOUT: Duration = Duration::from_secs(60);
2401 let timer = Instant::now();
2402 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2404
2405 info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2406 for (slot, _meta) in slot_meta_iterator {
2407 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2408 for shred in &shreds {
2409 if shred.version() != expected_shred_version {
2410 return Ok(Some(shred.version()));
2411 }
2412 }
2413 if timer.elapsed() > TIMEOUT {
2414 info!("Didn't find incorrect shreds after 60 seconds, aborting");
2415 break;
2416 }
2417 }
2418 Ok(None)
2419}
2420
2421fn cleanup_blockstore_incorrect_shred_versions(
2424 blockstore: &Blockstore,
2425 config: &ValidatorConfig,
2426 start_slot: Slot,
2427 expected_shred_version: u16,
2428) -> Result<(), BlockstoreError> {
2429 let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2430 blockstore,
2431 start_slot,
2432 expected_shred_version,
2433 )?;
2434 let Some(incorrect_shred_version) = incorrect_shred_version else {
2435 info!("Only shreds with the correct version were found in the blockstore");
2436 return Ok(());
2437 };
2438
2439 let end_slot = blockstore.highest_slot()?.unwrap();
2441
2442 let backup_folder = format!(
2445 "{}_backup_{}_{}_{}",
2446 BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, incorrect_shred_version, start_slot, end_slot
2447 );
2448 match Blockstore::open_with_options(
2449 &blockstore.ledger_path().join(backup_folder),
2450 config.blockstore_options.clone(),
2451 ) {
2452 Ok(backup_blockstore) => {
2453 info!("Backing up slots from {start_slot} to {end_slot}");
2454 let mut timer = Measure::start("blockstore backup");
2455
2456 const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2457 let mut print_timer = Instant::now();
2458 let mut num_slots_copied = 0;
2459 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2460 for (slot, _meta) in slot_meta_iterator {
2461 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2462 let _ = backup_blockstore.insert_shreds(shreds, None, true);
2463 num_slots_copied += 1;
2464
2465 if print_timer.elapsed() > PRINT_INTERVAL {
2466 info!("Backed up {num_slots_copied} slots thus far");
2467 print_timer = Instant::now();
2468 }
2469 }
2470
2471 timer.stop();
2472 info!("Backing up slots done. {timer}");
2473 }
2474 Err(err) => {
2475 warn!("Unable to backup shreds with incorrect shred version: {err}");
2476 }
2477 }
2478
2479 info!("Purging slots {start_slot} to {end_slot} from blockstore");
2480 let mut timer = Measure::start("blockstore purge");
2481 blockstore.purge_from_next_slots(start_slot, end_slot);
2482 blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2483 timer.stop();
2484 info!("Purging slots done. {timer}");
2485
2486 Ok(())
2487}
2488
2489fn initialize_rpc_transaction_history_services(
2490 blockstore: Arc<Blockstore>,
2491 exit: Arc<AtomicBool>,
2492 enable_rpc_transaction_history: bool,
2493 enable_extended_tx_metadata_storage: bool,
2494 transaction_notifier: Option<TransactionNotifierArc>,
2495) -> TransactionHistoryServices {
2496 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2497 let (transaction_status_sender, transaction_status_receiver) = unbounded();
2498 let transaction_status_sender = Some(TransactionStatusSender {
2499 sender: transaction_status_sender,
2500 });
2501 let transaction_status_service = Some(TransactionStatusService::new(
2502 transaction_status_receiver,
2503 max_complete_transaction_status_slot.clone(),
2504 enable_rpc_transaction_history,
2505 transaction_notifier,
2506 blockstore.clone(),
2507 enable_extended_tx_metadata_storage,
2508 exit.clone(),
2509 ));
2510
2511 let max_complete_rewards_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2512 let (block_meta_sender, block_meta_receiver) = unbounded();
2513 let block_meta_sender = Some(block_meta_sender);
2514 let block_meta_service = Some(BlockMetaService::new(
2515 block_meta_receiver,
2516 blockstore,
2517 max_complete_rewards_slot.clone(),
2518 exit,
2519 ));
2520 TransactionHistoryServices {
2521 transaction_status_sender,
2522 transaction_status_service,
2523 max_complete_transaction_status_slot,
2524 max_complete_rewards_slot,
2525 block_meta_sender,
2526 block_meta_service,
2527 }
2528}
2529
2530#[derive(Error, Debug)]
2531pub enum ValidatorError {
2532 #[error("Bad expected bank hash")]
2533 BadExpectedBankHash,
2534
2535 #[error("blockstore error: {0}")]
2536 Blockstore(#[source] BlockstoreError),
2537
2538 #[error("genesis hash mismatch: actual={0}, expected={1}")]
2539 GenesisHashMismatch(Hash, Hash),
2540
2541 #[error("Ledger does not have enough data to wait for supermajority")]
2542 NotEnoughLedgerData,
2543
2544 #[error("failed to open genesis: {0}")]
2545 OpenGenesisConfig(#[source] OpenGenesisConfigError),
2546
2547 #[error("{0}")]
2548 Other(String),
2549
2550 #[error(
2551 "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2552 )]
2553 PohTooSlow { mine: u64, target: u64 },
2554
2555 #[error("shred version mismatch: actual {actual}, expected {expected}")]
2556 ShredVersionMismatch { actual: u16, expected: u16 },
2557
2558 #[error(transparent)]
2559 TraceError(#[from] TraceError),
2560
2561 #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2562 WenRestartFinished,
2563}
2564
2565fn wait_for_supermajority(
2572 config: &ValidatorConfig,
2573 process_blockstore: Option<&mut ProcessBlockStore>,
2574 bank_forks: &RwLock<BankForks>,
2575 cluster_info: &ClusterInfo,
2576 rpc_override_health_check: Arc<AtomicBool>,
2577 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2578) -> Result<bool, ValidatorError> {
2579 match config.wait_for_supermajority {
2580 None => Ok(false),
2581 Some(wait_for_supermajority_slot) => {
2582 if let Some(process_blockstore) = process_blockstore {
2583 process_blockstore
2584 .process()
2585 .map_err(ValidatorError::Other)?;
2586 }
2587
2588 let bank = bank_forks.read().unwrap().working_bank();
2589 match wait_for_supermajority_slot.cmp(&bank.slot()) {
2590 std::cmp::Ordering::Less => return Ok(false),
2591 std::cmp::Ordering::Greater => {
2592 error!(
2593 "Ledger does not have enough data to wait for supermajority, please \
2594 enable snapshot fetch. Has {} needs {}",
2595 bank.slot(),
2596 wait_for_supermajority_slot
2597 );
2598 return Err(ValidatorError::NotEnoughLedgerData);
2599 }
2600 _ => {}
2601 }
2602
2603 if let Some(expected_bank_hash) = config.expected_bank_hash {
2604 if bank.hash() != expected_bank_hash {
2605 error!(
2606 "Bank hash({}) does not match expected value: {}",
2607 bank.hash(),
2608 expected_bank_hash
2609 );
2610 return Err(ValidatorError::BadExpectedBankHash);
2611 }
2612 }
2613
2614 for i in 1.. {
2615 let logging = i % 10 == 1;
2616 if logging {
2617 info!(
2618 "Waiting for {}% of activated stake at slot {} to be in gossip...",
2619 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2620 bank.slot()
2621 );
2622 }
2623
2624 let gossip_stake_percent =
2625 get_stake_percent_in_gossip(&bank, cluster_info, logging);
2626
2627 *start_progress.write().unwrap() =
2628 ValidatorStartProgress::WaitingForSupermajority {
2629 slot: wait_for_supermajority_slot,
2630 gossip_stake_percent,
2631 };
2632
2633 if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2634 info!(
2635 "Supermajority reached, {}% active stake detected, starting up now.",
2636 gossip_stake_percent,
2637 );
2638 break;
2639 }
2640 rpc_override_health_check.store(true, Ordering::Relaxed);
2644 sleep(Duration::new(1, 0));
2645 }
2646 rpc_override_health_check.store(false, Ordering::Relaxed);
2647 Ok(true)
2648 }
2649 }
2650}
2651
2652fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2654 let mut online_stake = 0;
2655 let mut wrong_shred_stake = 0;
2656 let mut wrong_shred_nodes = vec![];
2657 let mut offline_stake = 0;
2658 let mut offline_nodes = vec![];
2659
2660 let mut total_activated_stake = 0;
2661 let now = timestamp();
2662 let peers: HashMap<_, _> = cluster_info
2666 .all_tvu_peers()
2667 .into_iter()
2668 .filter(|node| {
2669 let age = now.saturating_sub(node.wallclock());
2670 age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2672 })
2673 .map(|node| (*node.pubkey(), node))
2674 .collect();
2675 let my_shred_version = cluster_info.my_shred_version();
2676 let my_id = cluster_info.id();
2677
2678 for (activated_stake, vote_account) in bank.vote_accounts().values() {
2679 let activated_stake = *activated_stake;
2680 total_activated_stake += activated_stake;
2681
2682 if activated_stake == 0 {
2683 continue;
2684 }
2685 let vote_state_node_pubkey = *vote_account.node_pubkey();
2686
2687 if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2688 if peer.shred_version() == my_shred_version {
2689 trace!(
2690 "observed {} in gossip, (activated_stake={})",
2691 vote_state_node_pubkey,
2692 activated_stake
2693 );
2694 online_stake += activated_stake;
2695 } else {
2696 wrong_shred_stake += activated_stake;
2697 wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2698 }
2699 } else if vote_state_node_pubkey == my_id {
2700 online_stake += activated_stake; } else {
2702 offline_stake += activated_stake;
2703 offline_nodes.push((activated_stake, vote_state_node_pubkey));
2704 }
2705 }
2706
2707 let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2708 if log {
2709 info!(
2710 "{:.3}% of active stake visible in gossip",
2711 online_stake_percentage
2712 );
2713
2714 if !wrong_shred_nodes.is_empty() {
2715 info!(
2716 "{:.3}% of active stake has the wrong shred version in gossip",
2717 (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2718 );
2719 wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in wrong_shred_nodes {
2721 info!(
2722 " {:.3}% - {}",
2723 (stake as f64 / total_activated_stake as f64) * 100.,
2724 identity
2725 );
2726 }
2727 }
2728
2729 if !offline_nodes.is_empty() {
2730 info!(
2731 "{:.3}% of active stake is not visible in gossip",
2732 (offline_stake as f64 / total_activated_stake as f64) * 100.
2733 );
2734 offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in offline_nodes {
2736 info!(
2737 " {:.3}% - {}",
2738 (stake as f64 / total_activated_stake as f64) * 100.,
2739 identity
2740 );
2741 }
2742 }
2743 }
2744
2745 online_stake_percentage as u64
2746}
2747
2748fn cleanup_accounts_paths(config: &ValidatorConfig) {
2749 for account_path in &config.account_paths {
2750 move_and_async_delete_path_contents(account_path);
2751 }
2752 if let Some(shrink_paths) = config
2753 .accounts_db_config
2754 .as_ref()
2755 .and_then(|config| config.shrink_paths.as_ref())
2756 {
2757 for shrink_path in shrink_paths {
2758 move_and_async_delete_path_contents(shrink_path);
2759 }
2760 }
2761}
2762
2763pub fn is_snapshot_config_valid(
2764 snapshot_config: &SnapshotConfig,
2765 accounts_hash_interval_slots: Slot,
2766) -> bool {
2767 if !snapshot_config.should_generate_snapshots() {
2769 return true;
2770 }
2771
2772 let full_snapshot_interval_slots = snapshot_config.full_snapshot_archive_interval_slots;
2773 let incremental_snapshot_interval_slots =
2774 snapshot_config.incremental_snapshot_archive_interval_slots;
2775
2776 let is_incremental_config_valid =
2777 if incremental_snapshot_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL {
2778 true
2779 } else {
2780 incremental_snapshot_interval_slots >= accounts_hash_interval_slots
2781 && incremental_snapshot_interval_slots % accounts_hash_interval_slots == 0
2782 && full_snapshot_interval_slots > incremental_snapshot_interval_slots
2783 };
2784
2785 full_snapshot_interval_slots >= accounts_hash_interval_slots
2786 && full_snapshot_interval_slots % accounts_hash_interval_slots == 0
2787 && is_incremental_config_valid
2788}
2789
2790#[cfg(test)]
2791mod tests {
2792 use {
2793 super::*,
2794 crossbeam_channel::{bounded, RecvTimeoutError},
2795 solana_entry::entry,
2796 solana_gossip::contact_info::ContactInfo,
2797 solana_ledger::{
2798 blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2799 get_tmp_ledger_path_auto_delete,
2800 },
2801 solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
2802 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2803 std::{fs::remove_dir_all, thread, time::Duration},
2804 };
2805
2806 #[test]
2807 fn validator_exit() {
2808 solana_logger::setup();
2809 let leader_keypair = Keypair::new();
2810 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2811
2812 let validator_keypair = Keypair::new();
2813 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2814 let genesis_config =
2815 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2816 .genesis_config;
2817 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2818
2819 let voting_keypair = Arc::new(Keypair::new());
2820 let config = ValidatorConfig {
2821 rpc_addrs: Some((
2822 validator_node.info.rpc().unwrap(),
2823 validator_node.info.rpc_pubsub().unwrap(),
2824 )),
2825 ..ValidatorConfig::default_for_test()
2826 };
2827 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2828 let validator = Validator::new(
2829 validator_node,
2830 Arc::new(validator_keypair),
2831 &validator_ledger_path,
2832 &voting_keypair.pubkey(),
2833 Arc::new(RwLock::new(vec![voting_keypair])),
2834 vec![leader_node.info],
2835 &config,
2836 true, None, start_progress.clone(),
2839 SocketAddrSpace::Unspecified,
2840 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2841 Arc::new(RwLock::new(None)),
2842 )
2843 .expect("assume successful validator start");
2844 assert_eq!(
2845 *start_progress.read().unwrap(),
2846 ValidatorStartProgress::Running
2847 );
2848 validator.close();
2849 remove_dir_all(validator_ledger_path).unwrap();
2850 }
2851
2852 #[test]
2853 fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2854 solana_logger::setup();
2855
2856 let ledger_path = get_tmp_ledger_path_auto_delete!();
2857 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2858
2859 let mut validator_config = ValidatorConfig::default_for_test();
2860 let mut hard_forks = HardForks::default();
2861 let mut root_slot;
2862
2863 root_slot = 10;
2865 validator_config.wait_for_supermajority = Some(root_slot);
2866 assert_eq!(
2867 should_cleanup_blockstore_incorrect_shred_versions(
2868 &validator_config,
2869 &blockstore,
2870 root_slot,
2871 &hard_forks
2872 )
2873 .unwrap(),
2874 Some(root_slot + 1)
2875 );
2876
2877 root_slot = 15;
2880 assert_eq!(
2881 should_cleanup_blockstore_incorrect_shred_versions(
2882 &validator_config,
2883 &blockstore,
2884 root_slot,
2885 &hard_forks
2886 )
2887 .unwrap(),
2888 None,
2889 );
2890
2891 hard_forks.register(10);
2894 assert_eq!(
2895 should_cleanup_blockstore_incorrect_shred_versions(
2896 &validator_config,
2897 &blockstore,
2898 root_slot,
2899 &hard_forks
2900 )
2901 .unwrap(),
2902 None,
2903 );
2904
2905 let entries = entry::create_ticks(1, 0, Hash::default());
2907 for i in 20..35 {
2908 let shreds = blockstore::entries_to_test_shreds(
2909 &entries,
2910 i, i - 1, true, 1, true, );
2916 blockstore.insert_shreds(shreds, None, true).unwrap();
2917 }
2918
2919 assert_eq!(
2921 should_cleanup_blockstore_incorrect_shred_versions(
2922 &validator_config,
2923 &blockstore,
2924 root_slot,
2925 &hard_forks
2926 )
2927 .unwrap(),
2928 None,
2929 );
2930
2931 root_slot = 25;
2934 hard_forks.register(root_slot);
2935 validator_config.wait_for_supermajority = Some(root_slot);
2936 assert_eq!(
2937 should_cleanup_blockstore_incorrect_shred_versions(
2938 &validator_config,
2939 &blockstore,
2940 root_slot,
2941 &hard_forks
2942 )
2943 .unwrap(),
2944 Some(root_slot + 1),
2945 );
2946 validator_config.wait_for_supermajority = None;
2947 assert_eq!(
2948 should_cleanup_blockstore_incorrect_shred_versions(
2949 &validator_config,
2950 &blockstore,
2951 root_slot,
2952 &hard_forks
2953 )
2954 .unwrap(),
2955 Some(root_slot + 1),
2956 );
2957
2958 root_slot = 30;
2961 let latest_hard_fork = hard_forks.iter().last().unwrap().0;
2962 assert_eq!(
2963 should_cleanup_blockstore_incorrect_shred_versions(
2964 &validator_config,
2965 &blockstore,
2966 root_slot,
2967 &hard_forks
2968 )
2969 .unwrap(),
2970 Some(latest_hard_fork + 1),
2971 );
2972
2973 blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
2976 assert_eq!(
2977 should_cleanup_blockstore_incorrect_shred_versions(
2978 &validator_config,
2979 &blockstore,
2980 root_slot,
2981 &hard_forks
2982 )
2983 .unwrap(),
2984 None,
2985 );
2986 }
2987
2988 #[test]
2989 fn test_cleanup_blockstore_incorrect_shred_versions() {
2990 solana_logger::setup();
2991
2992 let validator_config = ValidatorConfig::default_for_test();
2993 let ledger_path = get_tmp_ledger_path_auto_delete!();
2994 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2995
2996 let entries = entry::create_ticks(1, 0, Hash::default());
2997 for i in 1..10 {
2998 let shreds = blockstore::entries_to_test_shreds(
2999 &entries,
3000 i, i - 1, true, 1, true, );
3006 blockstore.insert_shreds(shreds, None, true).unwrap();
3007 }
3008
3009 cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3011 assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3013 for i in 5..10 {
3014 assert!(blockstore
3015 .get_data_shreds_for_slot(i, 0)
3016 .unwrap()
3017 .is_empty());
3018 }
3019 }
3020
3021 #[test]
3022 fn validator_parallel_exit() {
3023 let leader_keypair = Keypair::new();
3024 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3025 let genesis_config =
3026 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3027 .genesis_config;
3028
3029 let mut ledger_paths = vec![];
3030 let mut validators: Vec<Validator> = (0..2)
3031 .map(|_| {
3032 let validator_keypair = Keypair::new();
3033 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3034 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3035 ledger_paths.push(validator_ledger_path.clone());
3036 let vote_account_keypair = Keypair::new();
3037 let config = ValidatorConfig {
3038 rpc_addrs: Some((
3039 validator_node.info.rpc().unwrap(),
3040 validator_node.info.rpc_pubsub().unwrap(),
3041 )),
3042 ..ValidatorConfig::default_for_test()
3043 };
3044 Validator::new(
3045 validator_node,
3046 Arc::new(validator_keypair),
3047 &validator_ledger_path,
3048 &vote_account_keypair.pubkey(),
3049 Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3050 vec![leader_node.info.clone()],
3051 &config,
3052 true, None, Arc::new(RwLock::new(ValidatorStartProgress::default())),
3055 SocketAddrSpace::Unspecified,
3056 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3057 Arc::new(RwLock::new(None)),
3058 )
3059 .expect("assume successful validator start")
3060 })
3061 .collect();
3062
3063 validators.iter_mut().for_each(|v| v.exit());
3065
3066 let (sender, receiver) = bounded(0);
3068 let _ = thread::spawn(move || {
3069 validators.into_iter().for_each(|validator| {
3070 validator.join();
3071 });
3072 sender.send(()).unwrap();
3073 });
3074
3075 let timeout = Duration::from_secs(60);
3076 if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3077 panic!("timeout for shutting down validators",);
3078 }
3079
3080 for path in ledger_paths {
3081 remove_dir_all(path).unwrap();
3082 }
3083 }
3084
3085 #[test]
3086 fn test_wait_for_supermajority() {
3087 solana_logger::setup();
3088 use solana_sdk::hash::hash;
3089 let node_keypair = Arc::new(Keypair::new());
3090 let cluster_info = ClusterInfo::new(
3091 ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3092 node_keypair,
3093 SocketAddrSpace::Unspecified,
3094 );
3095
3096 let (genesis_config, _mint_keypair) = create_genesis_config(1);
3097 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3098 let mut config = ValidatorConfig::default_for_test();
3099 let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3100 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3101
3102 assert!(!wait_for_supermajority(
3103 &config,
3104 None,
3105 &bank_forks,
3106 &cluster_info,
3107 rpc_override_health_check.clone(),
3108 &start_progress,
3109 )
3110 .unwrap());
3111
3112 config.wait_for_supermajority = Some(1);
3114 matches!(
3115 wait_for_supermajority(
3116 &config,
3117 None,
3118 &bank_forks,
3119 &cluster_info,
3120 rpc_override_health_check.clone(),
3121 &start_progress,
3122 ),
3123 Err(ValidatorError::NotEnoughLedgerData),
3124 );
3125
3126 let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3128 bank_forks.read().unwrap().root_bank(),
3129 &Pubkey::default(),
3130 1,
3131 ));
3132 config.wait_for_supermajority = Some(0);
3133 assert!(!wait_for_supermajority(
3134 &config,
3135 None,
3136 &bank_forks,
3137 &cluster_info,
3138 rpc_override_health_check.clone(),
3139 &start_progress,
3140 )
3141 .unwrap());
3142
3143 config.wait_for_supermajority = Some(1);
3145 config.expected_bank_hash = Some(hash(&[1]));
3146 matches!(
3147 wait_for_supermajority(
3148 &config,
3149 None,
3150 &bank_forks,
3151 &cluster_info,
3152 rpc_override_health_check,
3153 &start_progress,
3154 ),
3155 Err(ValidatorError::BadExpectedBankHash),
3156 );
3157 }
3158
3159 #[test]
3160 fn test_interval_check() {
3161 fn new_snapshot_config(
3162 full_snapshot_archive_interval_slots: Slot,
3163 incremental_snapshot_archive_interval_slots: Slot,
3164 ) -> SnapshotConfig {
3165 SnapshotConfig {
3166 full_snapshot_archive_interval_slots,
3167 incremental_snapshot_archive_interval_slots,
3168 ..SnapshotConfig::default()
3169 }
3170 }
3171
3172 assert!(is_snapshot_config_valid(
3173 &new_snapshot_config(300, 200),
3174 100
3175 ));
3176
3177 let default_accounts_hash_interval =
3178 snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS;
3179 assert!(is_snapshot_config_valid(
3180 &new_snapshot_config(
3181 snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3182 snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS
3183 ),
3184 default_accounts_hash_interval,
3185 ));
3186 assert!(is_snapshot_config_valid(
3187 &new_snapshot_config(
3188 snapshot_bank_utils::DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3189 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3190 ),
3191 default_accounts_hash_interval
3192 ));
3193 assert!(is_snapshot_config_valid(
3194 &new_snapshot_config(
3195 snapshot_bank_utils::DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
3196 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3197 ),
3198 default_accounts_hash_interval
3199 ));
3200 assert!(is_snapshot_config_valid(
3201 &new_snapshot_config(
3202 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3203 DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
3204 ),
3205 Slot::MAX
3206 ));
3207
3208 assert!(!is_snapshot_config_valid(&new_snapshot_config(0, 100), 100));
3209 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 0), 100));
3210 assert!(!is_snapshot_config_valid(
3211 &new_snapshot_config(42, 100),
3212 100
3213 ));
3214 assert!(!is_snapshot_config_valid(
3215 &new_snapshot_config(100, 42),
3216 100
3217 ));
3218 assert!(!is_snapshot_config_valid(
3219 &new_snapshot_config(100, 100),
3220 100
3221 ));
3222 assert!(!is_snapshot_config_valid(
3223 &new_snapshot_config(100, 200),
3224 100
3225 ));
3226 assert!(!is_snapshot_config_valid(
3227 &new_snapshot_config(444, 200),
3228 100
3229 ));
3230 assert!(!is_snapshot_config_valid(
3231 &new_snapshot_config(400, 222),
3232 100
3233 ));
3234
3235 assert!(is_snapshot_config_valid(
3236 &SnapshotConfig::new_load_only(),
3237 100
3238 ));
3239 assert!(is_snapshot_config_valid(
3240 &SnapshotConfig {
3241 full_snapshot_archive_interval_slots: 41,
3242 incremental_snapshot_archive_interval_slots: 37,
3243 ..SnapshotConfig::new_load_only()
3244 },
3245 100
3246 ));
3247 assert!(is_snapshot_config_valid(
3248 &SnapshotConfig {
3249 full_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3250 incremental_snapshot_archive_interval_slots: DISABLED_SNAPSHOT_ARCHIVE_INTERVAL,
3251 ..SnapshotConfig::new_load_only()
3252 },
3253 100
3254 ));
3255 }
3256
3257 fn target_tick_duration() -> Duration {
3258 let target_tick_duration_us = solana_sdk::clock::DEFAULT_MS_PER_SLOT * 1000
3266 / solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
3267 assert_eq!(target_tick_duration_us, 6250);
3268 Duration::from_micros(target_tick_duration_us)
3269 }
3270
3271 #[test]
3272 fn test_poh_speed() {
3273 solana_logger::setup();
3274 let poh_config = PohConfig {
3275 target_tick_duration: target_tick_duration(),
3276 hashes_per_tick: Some(100 * solana_sdk::clock::DEFAULT_HASHES_PER_TICK),
3278 ..PohConfig::default()
3279 };
3280 let genesis_config = GenesisConfig {
3281 poh_config,
3282 ..GenesisConfig::default()
3283 };
3284 let bank = Bank::new_for_tests(&genesis_config);
3285 assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3286 }
3287
3288 #[test]
3289 fn test_poh_speed_no_hashes_per_tick() {
3290 solana_logger::setup();
3291 let poh_config = PohConfig {
3292 target_tick_duration: target_tick_duration(),
3293 hashes_per_tick: None,
3294 ..PohConfig::default()
3295 };
3296 let genesis_config = GenesisConfig {
3297 poh_config,
3298 ..GenesisConfig::default()
3299 };
3300 let bank = Bank::new_for_tests(&genesis_config);
3301 check_poh_speed(&bank, Some(10_000)).unwrap();
3302 }
3303}