1pub use solana_perf::report_target_features;
4use {
5 crate::{
6 admin_rpc_post_init::{AdminRpcRequestMetadataPostInit, KeyUpdaterType, KeyUpdaters},
7 banking_stage::{
8 transaction_scheduler::scheduler_controller::SchedulerConfig, BankingStage,
9 },
10 banking_trace::{self, BankingTracer, TraceError},
11 cluster_info_vote_listener::VoteTracker,
12 completed_data_sets_service::CompletedDataSetsService,
13 consensus::{
14 reconcile_blockstore_roots_with_external_source,
15 tower_storage::{NullTowerStorage, TowerStorage},
16 ExternalRootSource, Tower,
17 },
18 repair::{
19 self,
20 quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets},
21 repair_handler::RepairHandlerType,
22 serve_repair_service::ServeRepairService,
23 },
24 resource_limits::{
25 adjust_nofile_limit, validate_memlock_limit_for_disk_io, ResourceLimitError,
26 },
27 sample_performance_service::SamplePerformanceService,
28 sigverify,
29 snapshot_packager_service::SnapshotPackagerService,
30 stats_reporter_service::StatsReporterService,
31 system_monitor_service::{
32 verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
33 },
34 tpu::{ForwardingClientOption, Tpu, TpuSockets},
35 tvu::{Tvu, TvuConfig, TvuSockets},
36 },
37 agave_snapshots::{
38 snapshot_archive_info::SnapshotArchiveInfoGetter as _, snapshot_config::SnapshotConfig,
39 snapshot_hash::StartingSnapshotHashes, SnapshotInterval,
40 },
41 anyhow::{anyhow, Context, Result},
42 crossbeam_channel::{bounded, unbounded, Receiver},
43 quinn::Endpoint,
44 serde::{Deserialize, Serialize},
45 solana_account::ReadableAccount,
46 solana_accounts_db::{
47 accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
48 accounts_update_notifier_interface::AccountsUpdateNotifier,
49 utils::move_and_async_delete_path_contents,
50 },
51 solana_client::{
52 client_option::ClientOption,
53 connection_cache::{ConnectionCache, Protocol},
54 },
55 solana_clock::Slot,
56 solana_cluster_type::ClusterType,
57 solana_entry::poh::compute_hash_time,
58 solana_epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
59 solana_genesis_config::GenesisConfig,
60 solana_genesis_utils::{
61 open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
62 },
63 solana_geyser_plugin_manager::{
64 geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
65 },
66 solana_gossip::{
67 cluster_info::{
68 ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
69 DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
70 },
71 contact_info::ContactInfo,
72 crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
73 gossip_service::GossipService,
74 node::{Node, NodeMultihoming},
75 },
76 solana_hard_forks::HardForks,
77 solana_hash::Hash,
78 solana_keypair::Keypair,
79 solana_ledger::{
80 bank_forks_utils,
81 blockstore::{
82 Blockstore, BlockstoreError, PurgeType, MAX_COMPLETED_SLOTS_IN_CHANNEL,
83 MAX_REPLAY_WAKE_UP_SIGNALS,
84 },
85 blockstore_metric_report_service::BlockstoreMetricReportService,
86 blockstore_options::{BlockstoreOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL},
87 blockstore_processor::{self, TransactionStatusSender},
88 entry_notifier_interface::EntryNotifierArc,
89 entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
90 leader_schedule::FixedSchedule,
91 leader_schedule_cache::LeaderScheduleCache,
92 use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
93 },
94 solana_measure::measure::Measure,
95 solana_metrics::{datapoint_info, metrics::metrics_config_sanity_check},
96 solana_poh::{
97 poh_controller::PohController,
98 poh_recorder::PohRecorder,
99 poh_service::{self, PohService},
100 record_channels::record_channels,
101 transaction_recorder::TransactionRecorder,
102 },
103 solana_pubkey::Pubkey,
104 solana_rayon_threadlimit::get_thread_count,
105 solana_rpc::{
106 max_slots::MaxSlots,
107 optimistically_confirmed_bank_tracker::{
108 BankNotificationSenderConfig, OptimisticallyConfirmedBank,
109 OptimisticallyConfirmedBankTracker,
110 },
111 rpc::JsonRpcConfig,
112 rpc_completed_slots_service::RpcCompletedSlotsService,
113 rpc_pubsub_service::{PubSubConfig, PubSubService},
114 rpc_service::{JsonRpcService, JsonRpcServiceConfig},
115 rpc_subscriptions::RpcSubscriptions,
116 transaction_notifier_interface::TransactionNotifierArc,
117 transaction_status_service::TransactionStatusService,
118 },
119 solana_runtime::{
120 accounts_background_service::{
121 AbsRequestHandlers, AccountsBackgroundService, DroppedSlotsReceiver,
122 PendingSnapshotPackages, PrunedBanksRequestHandler, SnapshotRequestHandler,
123 },
124 bank::Bank,
125 bank_forks::BankForks,
126 commitment::BlockCommitmentCache,
127 dependency_tracker::DependencyTracker,
128 prioritization_fee_cache::PrioritizationFeeCache,
129 runtime_config::RuntimeConfig,
130 snapshot_bank_utils,
131 snapshot_controller::SnapshotController,
132 snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
133 },
134 solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
135 solana_shred_version::compute_shred_version,
136 solana_signer::Signer,
137 solana_streamer::{
138 nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
139 quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
140 socket::SocketAddrSpace,
141 streamer::StakedNodes,
142 },
143 solana_time_utils::timestamp,
144 solana_tpu_client::tpu_client::{
145 DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
146 },
147 solana_turbine::{
148 self,
149 broadcast_stage::BroadcastStageType,
150 xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitter},
151 },
152 solana_unified_scheduler_pool::DefaultSchedulerPool,
153 solana_validator_exit::Exit,
154 solana_vote_program::vote_state::VoteStateV4,
155 solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
156 std::{
157 borrow::Cow,
158 collections::{HashMap, HashSet},
159 net::{IpAddr, SocketAddr},
160 num::{NonZeroU64, NonZeroUsize},
161 path::{Path, PathBuf},
162 str::FromStr,
163 sync::{
164 atomic::{AtomicBool, AtomicU64, Ordering},
165 Arc, Mutex, RwLock,
166 },
167 thread::{sleep, Builder, JoinHandle},
168 time::{Duration, Instant},
169 },
170 strum::VariantNames,
171 strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
172 thiserror::Error,
173 tokio::runtime::Runtime as TokioRuntime,
174 tokio_util::sync::CancellationToken,
175};
176
177const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
178const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
179const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
183 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
184
185#[derive(
186 Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display,
187)]
188#[strum(serialize_all = "kebab-case")]
189pub enum BlockVerificationMethod {
190 BlockstoreProcessor,
191 #[default]
192 UnifiedScheduler,
193}
194
195impl BlockVerificationMethod {
196 pub const fn cli_names() -> &'static [&'static str] {
197 Self::VARIANTS
198 }
199
200 pub fn cli_message() -> &'static str {
201 "Switch transaction scheduling method for verifying ledger entries"
202 }
203}
204
205#[derive(
206 Clone,
207 Debug,
208 EnumString,
209 EnumVariantNames,
210 Default,
211 IntoStaticStr,
212 Display,
213 Serialize,
214 Deserialize,
215 PartialEq,
216 Eq,
217)]
218#[strum(serialize_all = "kebab-case")]
219#[serde(rename_all = "kebab-case")]
220pub enum BlockProductionMethod {
221 CentralScheduler,
222 #[default]
223 CentralSchedulerGreedy,
224}
225
226impl BlockProductionMethod {
227 pub const fn cli_names() -> &'static [&'static str] {
228 Self::VARIANTS
229 }
230
231 pub fn cli_message() -> &'static str {
232 "Switch transaction scheduling method for producing ledger entries"
233 }
234}
235
236#[derive(
237 Clone,
238 Debug,
239 EnumString,
240 EnumVariantNames,
241 Default,
242 IntoStaticStr,
243 Display,
244 Serialize,
245 Deserialize,
246 PartialEq,
247 Eq,
248)]
249#[strum(serialize_all = "kebab-case")]
250#[serde(rename_all = "kebab-case")]
251pub enum TransactionStructure {
252 Sdk,
253 #[default]
254 View,
255}
256
257impl TransactionStructure {
258 pub const fn cli_names() -> &'static [&'static str] {
259 Self::VARIANTS
260 }
261
262 pub fn cli_message() -> &'static str {
263 "DEPRECATED: has no impact on banking stage; will be removed in a future version"
264 }
265}
266
267#[derive(
268 Clone, Debug, EnumVariantNames, IntoStaticStr, Display, Serialize, Deserialize, PartialEq, Eq,
269)]
270#[strum(serialize_all = "kebab-case")]
271#[serde(rename_all = "kebab-case")]
272pub enum SchedulerPacing {
273 Disabled,
274 FillTimeMillis(NonZeroU64),
275}
276
277impl FromStr for SchedulerPacing {
278 type Err = String;
279
280 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
281 if s.eq_ignore_ascii_case("disabled") {
282 Ok(SchedulerPacing::Disabled)
283 } else {
284 match s.parse::<u64>() {
285 Ok(v) if v > 0 => Ok(SchedulerPacing::FillTimeMillis(
286 NonZeroU64::new(v).ok_or_else(|| "value must be non-zero".to_string())?,
287 )),
288 _ => Err("value must be a positive integer or 'disabled'".to_string()),
289 }
290 }
291 }
292}
293
294impl SchedulerPacing {
295 pub fn fill_time(&self) -> Option<Duration> {
296 match self {
297 SchedulerPacing::Disabled => None,
298 SchedulerPacing::FillTimeMillis(millis) => Some(Duration::from_millis(millis.get())),
299 }
300 }
301}
302
303#[derive(Clone, Debug)]
305pub struct GeneratorConfig {
306 pub accounts_path: String,
307 pub starting_keypairs: Arc<Vec<Keypair>>,
308}
309
310pub struct ValidatorConfig {
311 pub halt_at_slot: Option<Slot>,
312 pub expected_genesis_hash: Option<Hash>,
313 pub expected_bank_hash: Option<Hash>,
314 pub expected_shred_version: Option<u16>,
315 pub voting_disabled: bool,
316 pub account_paths: Vec<PathBuf>,
317 pub account_snapshot_paths: Vec<PathBuf>,
318 pub rpc_config: JsonRpcConfig,
319 pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
321 pub geyser_plugin_always_enabled: bool,
322 pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, pub pubsub_config: PubSubConfig,
324 pub snapshot_config: SnapshotConfig,
325 pub max_ledger_shreds: Option<u64>,
326 pub blockstore_options: BlockstoreOptions,
327 pub broadcast_stage_type: BroadcastStageType,
328 pub turbine_disabled: Arc<AtomicBool>,
329 pub fixed_leader_schedule: Option<FixedSchedule>,
330 pub wait_for_supermajority: Option<Slot>,
331 pub new_hard_forks: Option<Vec<Slot>>,
332 pub known_validators: Option<HashSet<Pubkey>>, pub repair_validators: Option<HashSet<Pubkey>>, pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, pub gossip_validators: Option<HashSet<Pubkey>>, pub max_genesis_archive_unpacked_size: u64,
337 pub run_verification: bool,
340 pub require_tower: bool,
341 pub tower_storage: Arc<dyn TowerStorage>,
342 pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
343 pub contact_debug_interval: u64,
344 pub contact_save_interval: u64,
345 pub send_transaction_service_config: SendTransactionServiceConfig,
346 pub no_poh_speed_test: bool,
347 pub no_os_memory_stats_reporting: bool,
348 pub no_os_network_stats_reporting: bool,
349 pub no_os_cpu_stats_reporting: bool,
350 pub no_os_disk_stats_reporting: bool,
351 pub enforce_ulimit_nofile: bool,
352 pub poh_pinned_cpu_core: usize,
353 pub poh_hashes_per_batch: u64,
354 pub process_ledger_before_services: bool,
355 pub accounts_db_config: AccountsDbConfig,
356 pub warp_slot: Option<Slot>,
357 pub accounts_db_skip_shrink: bool,
358 pub accounts_db_force_initial_clean: bool,
359 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
360 pub validator_exit: Arc<RwLock<Exit>>,
361 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
362 pub no_wait_for_vote_to_start_leader: bool,
363 pub wait_to_vote_slot: Option<Slot>,
364 pub runtime_config: RuntimeConfig,
365 pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
366 pub block_verification_method: BlockVerificationMethod,
367 pub block_production_method: BlockProductionMethod,
368 pub block_production_num_workers: NonZeroUsize,
369 pub block_production_scheduler_config: SchedulerConfig,
370 pub enable_block_production_forwarding: bool,
371 pub generator_config: Option<GeneratorConfig>,
372 pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
373 pub wen_restart_proto_path: Option<PathBuf>,
374 pub wen_restart_coordinator: Option<Pubkey>,
375 pub unified_scheduler_handler_threads: Option<usize>,
376 pub ip_echo_server_threads: NonZeroUsize,
377 pub rayon_global_threads: NonZeroUsize,
378 pub replay_forks_threads: NonZeroUsize,
379 pub replay_transactions_threads: NonZeroUsize,
380 pub tvu_shred_sigverify_threads: NonZeroUsize,
381 pub delay_leader_block_for_pending_fork: bool,
382 pub use_tpu_client_next: bool,
383 pub retransmit_xdp: Option<XdpConfig>,
384 pub repair_handler_type: RepairHandlerType,
385}
386
387impl ValidatorConfig {
388 pub fn default_for_test() -> Self {
389 let max_thread_count =
390 NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
391
392 Self {
393 halt_at_slot: None,
394 expected_genesis_hash: None,
395 expected_bank_hash: None,
396 expected_shred_version: None,
397 voting_disabled: false,
398 max_ledger_shreds: None,
399 blockstore_options: BlockstoreOptions::default_for_tests(),
400 account_paths: Vec::new(),
401 account_snapshot_paths: Vec::new(),
402 rpc_config: JsonRpcConfig::default_for_test(),
403 on_start_geyser_plugin_config_files: None,
404 geyser_plugin_always_enabled: false,
405 rpc_addrs: None,
406 pubsub_config: PubSubConfig::default(),
407 snapshot_config: SnapshotConfig::new_load_only(),
408 broadcast_stage_type: BroadcastStageType::Standard,
409 turbine_disabled: Arc::<AtomicBool>::default(),
410 fixed_leader_schedule: None,
411 wait_for_supermajority: None,
412 new_hard_forks: None,
413 known_validators: None,
414 repair_validators: None,
415 repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
416 gossip_validators: None,
417 max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
418 run_verification: true,
419 require_tower: false,
420 tower_storage: Arc::new(NullTowerStorage::default()),
421 debug_keys: None,
422 contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
423 contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
424 send_transaction_service_config: SendTransactionServiceConfig::default(),
425 no_poh_speed_test: true,
426 no_os_memory_stats_reporting: true,
427 no_os_network_stats_reporting: true,
428 no_os_cpu_stats_reporting: true,
429 no_os_disk_stats_reporting: true,
430 enforce_ulimit_nofile: false,
432 poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
433 poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
434 process_ledger_before_services: false,
435 warp_slot: None,
436 accounts_db_skip_shrink: false,
437 accounts_db_force_initial_clean: false,
438 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
439 validator_exit: Arc::new(RwLock::new(Exit::default())),
440 validator_exit_backpressure: HashMap::default(),
441 no_wait_for_vote_to_start_leader: true,
442 accounts_db_config: ACCOUNTS_DB_CONFIG_FOR_TESTING,
443 wait_to_vote_slot: None,
444 runtime_config: RuntimeConfig::default(),
445 banking_trace_dir_byte_limit: 0,
446 block_verification_method: BlockVerificationMethod::default(),
447 block_production_method: BlockProductionMethod::default(),
448 block_production_num_workers: BankingStage::default_num_workers(),
449 block_production_scheduler_config: SchedulerConfig::default(),
450 enable_block_production_forwarding: true,
452 generator_config: None,
453 use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
454 wen_restart_proto_path: None,
455 wen_restart_coordinator: None,
456 unified_scheduler_handler_threads: None,
457 ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
458 rayon_global_threads: max_thread_count,
459 replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
460 replay_transactions_threads: max_thread_count,
461 tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
462 .expect("thread count is non-zero"),
463 delay_leader_block_for_pending_fork: false,
464 use_tpu_client_next: true,
465 retransmit_xdp: None,
466 repair_handler_type: RepairHandlerType::default(),
467 }
468 }
469
470 pub fn enable_default_rpc_block_subscribe(&mut self) {
471 let pubsub_config = PubSubConfig {
472 enable_block_subscription: true,
473 ..PubSubConfig::default()
474 };
475 let rpc_config = JsonRpcConfig {
476 enable_rpc_transaction_history: true,
477 ..JsonRpcConfig::default_for_test()
478 };
479
480 self.pubsub_config = pubsub_config;
481 self.rpc_config = rpc_config;
482 }
483}
484
485#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
489pub enum ValidatorStartProgress {
490 #[default]
491 Initializing, SearchingForRpcService,
493 DownloadingSnapshot {
494 slot: Slot,
495 rpc_addr: SocketAddr,
496 },
497 CleaningBlockStore,
498 CleaningAccounts,
499 LoadingLedger,
500 ProcessingLedger {
501 slot: Slot,
502 max_slot: Slot,
503 },
504 StartingServices,
505 Halted, WaitingForSupermajority {
507 slot: Slot,
508 gossip_stake_percent: u64,
509 },
510
511 Running,
514}
515
516struct BlockstoreRootScan {
517 thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
518}
519
520impl BlockstoreRootScan {
521 fn new(config: &ValidatorConfig, blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
522 let thread = if config.rpc_addrs.is_some()
523 && config.rpc_config.enable_rpc_transaction_history
524 && config.rpc_config.rpc_scan_and_fix_roots
525 {
526 Some(
527 Builder::new()
528 .name("solBStoreRtScan".to_string())
529 .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
530 .unwrap(),
531 )
532 } else {
533 None
534 };
535 Self { thread }
536 }
537
538 fn join(self) {
539 if let Some(blockstore_root_scan) = self.thread {
540 if let Err(err) = blockstore_root_scan.join() {
541 warn!("blockstore_root_scan failed to join {err:?}");
542 }
543 }
544 }
545}
546
547#[derive(Default)]
548struct TransactionHistoryServices {
549 transaction_status_sender: Option<TransactionStatusSender>,
550 transaction_status_service: Option<TransactionStatusService>,
551 max_complete_transaction_status_slot: Arc<AtomicU64>,
552}
553
554pub struct ValidatorTpuConfig {
556 pub use_quic: bool,
558 pub vote_use_quic: bool,
560 pub tpu_connection_pool_size: usize,
562 pub tpu_enable_udp: bool,
564 pub tpu_quic_server_config: SwQosQuicStreamerConfig,
566 pub tpu_fwd_quic_server_config: SwQosQuicStreamerConfig,
568 pub vote_quic_server_config: SimpleQosQuicStreamerConfig,
570}
571
572impl ValidatorTpuConfig {
573 pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
576 let tpu_quic_server_config = SwQosQuicStreamerConfig {
577 quic_streamer_config: QuicStreamerConfig {
578 max_connections_per_ipaddr_per_min: 32,
579 accumulator_channel_size: 100_000, ..Default::default()
581 },
582 qos_config: SwQosConfig::default(),
583 };
584
585 let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
586 quic_streamer_config: QuicStreamerConfig {
587 max_connections_per_ipaddr_per_min: 32,
588 max_unstaked_connections: 0,
589 accumulator_channel_size: 100_000, ..Default::default()
591 },
592 qos_config: SwQosConfig::default(),
593 };
594
595 let vote_quic_server_config = SimpleQosQuicStreamerConfig {
597 quic_streamer_config: QuicStreamerConfig {
598 max_connections_per_ipaddr_per_min: 32,
599 max_unstaked_connections: 0,
600 accumulator_channel_size: 100_000, ..Default::default()
602 },
603 qos_config: SimpleQosConfig::default(),
604 };
605
606 ValidatorTpuConfig {
607 use_quic: DEFAULT_TPU_USE_QUIC,
608 vote_use_quic: DEFAULT_VOTE_USE_QUIC,
609 tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
610 tpu_enable_udp,
611 tpu_quic_server_config,
612 tpu_fwd_quic_server_config,
613 vote_quic_server_config,
614 }
615 }
616}
617
618pub struct Validator {
619 validator_exit: Arc<RwLock<Exit>>,
620 json_rpc_service: Option<JsonRpcService>,
621 pubsub_service: Option<PubSubService>,
622 rpc_completed_slots_service: Option<JoinHandle<()>>,
623 optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
624 transaction_status_service: Option<TransactionStatusService>,
625 entry_notifier_service: Option<EntryNotifierService>,
626 system_monitor_service: Option<SystemMonitorService>,
627 sample_performance_service: Option<SamplePerformanceService>,
628 stats_reporter_service: StatsReporterService,
629 gossip_service: GossipService,
630 serve_repair_service: ServeRepairService,
631 completed_data_sets_service: Option<CompletedDataSetsService>,
632 snapshot_packager_service: Option<SnapshotPackagerService>,
633 poh_recorder: Arc<RwLock<PohRecorder>>,
634 poh_service: PohService,
635 tpu: Tpu,
636 tvu: Tvu,
637 ip_echo_server: Option<solana_net_utils::IpEchoServer>,
638 pub cluster_info: Arc<ClusterInfo>,
639 pub bank_forks: Arc<RwLock<BankForks>>,
640 pub blockstore: Arc<Blockstore>,
641 geyser_plugin_service: Option<GeyserPluginService>,
642 blockstore_metric_report_service: BlockstoreMetricReportService,
643 accounts_background_service: AccountsBackgroundService,
644 turbine_quic_endpoint: Option<Endpoint>,
645 turbine_quic_endpoint_runtime: Option<TokioRuntime>,
646 turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
647 repair_quic_endpoints: Option<[Endpoint; 3]>,
648 repair_quic_endpoints_runtime: Option<TokioRuntime>,
649 repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
650 xdp_retransmitter: Option<XdpRetransmitter>,
651 _tpu_client_next_runtime: Option<TokioRuntime>,
655}
656
657impl Validator {
658 #[allow(clippy::too_many_arguments)]
659 pub fn new(
660 mut node: Node,
661 identity_keypair: Arc<Keypair>,
662 ledger_path: &Path,
663 vote_account: &Pubkey,
664 authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
665 cluster_entrypoints: Vec<ContactInfo>,
666 config: &ValidatorConfig,
667 should_check_duplicate_instance: bool,
668 rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
669 start_progress: Arc<RwLock<ValidatorStartProgress>>,
670 socket_addr_space: SocketAddrSpace,
671 tpu_config: ValidatorTpuConfig,
672 admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
673 ) -> Result<Self> {
674 #[cfg(debug_assertions)]
675 const DEBUG_ASSERTION_STATUS: &str = "enabled";
676 #[cfg(not(debug_assertions))]
677 const DEBUG_ASSERTION_STATUS: &str = "disabled";
678 info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
679
680 let ValidatorTpuConfig {
681 use_quic,
682 vote_use_quic,
683 tpu_connection_pool_size,
684 tpu_enable_udp,
685 tpu_quic_server_config,
686 tpu_fwd_quic_server_config,
687 vote_quic_server_config,
688 } = tpu_config;
689
690 let start_time = Instant::now();
691
692 adjust_nofile_limit(config.enforce_ulimit_nofile)?;
693
694 if rayon::ThreadPoolBuilder::new()
698 .thread_name(|i| format!("solRayonGlob{i:02}"))
699 .num_threads(config.rayon_global_threads.get())
700 .build_global()
701 .is_err()
702 {
703 warn!("Rayon global thread pool already initialized");
704 }
705
706 let id = identity_keypair.pubkey();
707 assert_eq!(&id, node.info.pubkey());
708
709 info!("identity pubkey: {id}");
710 info!("vote account pubkey: {vote_account}");
711
712 if !config.no_os_network_stats_reporting {
713 verify_net_stats_access().map_err(|e| {
714 ValidatorError::Other(format!("Failed to access network stats: {e:?}"))
715 })?;
716 }
717
718 let mut bank_notification_senders = Vec::new();
719
720 let exit = Arc::new(AtomicBool::new(false));
721
722 let geyser_plugin_config_files = config
723 .on_start_geyser_plugin_config_files
724 .as_ref()
725 .map(Cow::Borrowed)
726 .or_else(|| {
727 config
728 .geyser_plugin_always_enabled
729 .then_some(Cow::Owned(vec![]))
730 });
731 let geyser_plugin_service =
732 if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
733 let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
734 bank_notification_senders.push(confirmed_bank_sender);
735 let rpc_to_plugin_manager_receiver_and_exit =
736 rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
737 Some(
738 GeyserPluginService::new_with_receiver(
739 confirmed_bank_receiver,
740 config.geyser_plugin_always_enabled,
741 geyser_plugin_config_files.as_ref(),
742 rpc_to_plugin_manager_receiver_and_exit,
743 )
744 .map_err(|err| {
745 ValidatorError::Other(format!("Failed to load the Geyser plugin: {err:?}"))
746 })?,
747 )
748 } else {
749 None
750 };
751
752 if config.voting_disabled {
753 warn!("voting disabled");
754 authorized_voter_keypairs.write().unwrap().clear();
755 } else {
756 for authorized_voter_keypair in authorized_voter_keypairs.read().unwrap().iter() {
757 warn!("authorized voter: {}", authorized_voter_keypair.pubkey());
758 }
759 }
760
761 for cluster_entrypoint in &cluster_entrypoints {
762 info!("entrypoint: {cluster_entrypoint:?}");
763 }
764
765 if solana_perf::perf_libs::api().is_some() {
766 info!("Initializing sigverify, this could take a while...");
767 } else {
768 info!("Initializing sigverify...");
769 }
770 sigverify::init();
771 info!("Initializing sigverify done.");
772
773 validate_memlock_limit_for_disk_io(config.accounts_db_config.memlock_budget_size)?;
774
775 if !ledger_path.is_dir() {
776 return Err(anyhow!(
777 "ledger directory does not exist or is not accessible: {ledger_path:?}"
778 ));
779 }
780 let genesis_config = load_genesis(config, ledger_path)?;
781 metrics_config_sanity_check(genesis_config.cluster_type)?;
782
783 info!("Cleaning accounts paths..");
784 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
785 let mut timer = Measure::start("clean_accounts_paths");
786 cleanup_accounts_paths(config);
787 timer.stop();
788 info!("Cleaning accounts paths done. {timer}");
789
790 snapshot_utils::purge_incomplete_bank_snapshots(&config.snapshot_config.bank_snapshots_dir);
791 snapshot_utils::purge_old_bank_snapshots_at_startup(
792 &config.snapshot_config.bank_snapshots_dir,
793 );
794
795 info!("Cleaning orphaned account snapshot directories..");
796 let mut timer = Measure::start("clean_orphaned_account_snapshot_dirs");
797 clean_orphaned_account_snapshot_dirs(
798 &config.snapshot_config.bank_snapshots_dir,
799 &config.account_snapshot_paths,
800 )
801 .context("failed to clean orphaned account snapshot directories")?;
802 timer.stop();
803 info!("Cleaning orphaned account snapshot directories done. {timer}");
804
805 let cancel = CancellationToken::new();
807 {
808 let exit = exit.clone();
809 config
810 .validator_exit
811 .write()
812 .unwrap()
813 .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
814 let cancel = cancel.clone();
815 config
816 .validator_exit
817 .write()
818 .unwrap()
819 .register_exit(Box::new(move || cancel.cancel()));
820 }
821
822 let (
823 accounts_update_notifier,
824 transaction_notifier,
825 entry_notifier,
826 block_metadata_notifier,
827 slot_status_notifier,
828 ) = if let Some(service) = &geyser_plugin_service {
829 (
830 service.get_accounts_update_notifier(),
831 service.get_transaction_notifier(),
832 service.get_entry_notifier(),
833 service.get_block_metadata_notifier(),
834 service.get_slot_status_notifier(),
835 )
836 } else {
837 (None, None, None, None, None)
838 };
839
840 info!(
841 "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \
842 entry_notifier: {}",
843 accounts_update_notifier.is_some(),
844 transaction_notifier.is_some(),
845 entry_notifier.is_some()
846 );
847
848 let system_monitor_service = Some(SystemMonitorService::new(
849 exit.clone(),
850 SystemMonitorStatsReportConfig {
851 report_os_memory_stats: !config.no_os_memory_stats_reporting,
852 report_os_network_stats: !config.no_os_network_stats_reporting,
853 report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
854 report_os_disk_stats: !config.no_os_disk_stats_reporting,
855 },
856 ));
857
858 let dependency_tracker = Arc::new(DependencyTracker::default());
859
860 let (
861 bank_forks,
862 blockstore,
863 original_blockstore_root,
864 ledger_signal_receiver,
865 leader_schedule_cache,
866 starting_snapshot_hashes,
867 TransactionHistoryServices {
868 transaction_status_sender,
869 transaction_status_service,
870 max_complete_transaction_status_slot,
871 },
872 blockstore_process_options,
873 blockstore_root_scan,
874 pruned_banks_receiver,
875 entry_notifier_service,
876 ) = load_blockstore(
877 config,
878 ledger_path,
879 &genesis_config,
880 exit.clone(),
881 &start_progress,
882 accounts_update_notifier,
883 transaction_notifier,
884 entry_notifier,
885 config
886 .rpc_addrs
887 .is_some()
888 .then(|| dependency_tracker.clone()),
889 )
890 .map_err(ValidatorError::Other)?;
891
892 if !config.no_poh_speed_test {
893 check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
894 }
895
896 let (root_slot, hard_forks) = {
897 let root_bank = bank_forks.read().unwrap().root_bank();
898 (root_bank.slot(), root_bank.hard_forks())
899 };
900 let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
901 info!("shred version: {shred_version}, hard forks: {hard_forks:?}");
902
903 if let Some(expected_shred_version) = config.expected_shred_version {
904 if expected_shred_version != shred_version {
905 return Err(ValidatorError::ShredVersionMismatch {
906 actual: shred_version,
907 expected: expected_shred_version,
908 }
909 .into());
910 }
911 }
912
913 if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
914 config,
915 &blockstore,
916 root_slot,
917 &hard_forks,
918 )? {
919 *start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
920 cleanup_blockstore_incorrect_shred_versions(
921 &blockstore,
922 config,
923 start_slot,
924 shred_version,
925 )?;
926 } else {
927 info!("Skipping the blockstore check for shreds with incorrect version");
928 }
929
930 node.info.set_shred_version(shred_version);
931 node.info.set_wallclock(timestamp());
932 Self::print_node_info(&node);
933
934 let mut cluster_info = ClusterInfo::new(
935 node.info.clone(),
936 identity_keypair.clone(),
937 socket_addr_space,
938 );
939 cluster_info.set_contact_debug_interval(config.contact_debug_interval);
940 if let Some(known_validators) = &config.known_validators {
941 cluster_info
942 .set_trim_keep_pubkeys(known_validators.iter().copied())
943 .expect("set_trim_keep_pubkeys should succeed as ClusterInfo was just created");
944 }
945 cluster_info.set_entrypoints(cluster_entrypoints);
946 cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
947 cluster_info.set_bind_ip_addrs(node.bind_ip_addrs.clone());
948 let cluster_info = Arc::new(cluster_info);
949 let node_multihoming = Arc::new(NodeMultihoming::from(&node));
950
951 assert!(is_snapshot_config_valid(&config.snapshot_config));
952
953 let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
954 let snapshot_controller = Arc::new(SnapshotController::new(
955 snapshot_request_sender.clone(),
956 config.snapshot_config.clone(),
957 bank_forks.read().unwrap().root(),
958 ));
959
960 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
961 let snapshot_packager_service = if snapshot_controller
962 .snapshot_config()
963 .should_generate_snapshots()
964 {
965 let exit_backpressure = config
966 .validator_exit_backpressure
967 .get(SnapshotPackagerService::NAME)
968 .cloned();
969 let enable_gossip_push = true;
970 let snapshot_packager_service = SnapshotPackagerService::new(
971 pending_snapshot_packages.clone(),
972 starting_snapshot_hashes,
973 exit.clone(),
974 exit_backpressure,
975 cluster_info.clone(),
976 snapshot_controller.clone(),
977 enable_gossip_push,
978 );
979 Some(snapshot_packager_service)
980 } else {
981 None
982 };
983
984 let snapshot_request_handler = SnapshotRequestHandler {
985 snapshot_controller: snapshot_controller.clone(),
986 snapshot_request_receiver,
987 pending_snapshot_packages,
988 };
989 let pruned_banks_request_handler = PrunedBanksRequestHandler {
990 pruned_banks_receiver,
991 };
992 let accounts_background_service = AccountsBackgroundService::new(
993 bank_forks.clone(),
994 exit.clone(),
995 AbsRequestHandlers {
996 snapshot_request_handler,
997 pruned_banks_request_handler,
998 },
999 );
1000 info!(
1001 "Using: block-verification-method: {}, block-production-method: {}",
1002 config.block_verification_method, config.block_production_method,
1003 );
1004
1005 let (replay_vote_sender, replay_vote_receiver) = unbounded();
1006
1007 let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
1010
1011 let leader_schedule_cache = Arc::new(leader_schedule_cache);
1012 let (poh_recorder, entry_receiver) = {
1013 let bank = &bank_forks.read().unwrap().working_bank();
1014 PohRecorder::new_with_clear_signal(
1015 bank.tick_height(),
1016 bank.last_blockhash(),
1017 bank.clone(),
1018 None,
1019 bank.ticks_per_slot(),
1020 config.delay_leader_block_for_pending_fork,
1021 blockstore.clone(),
1022 blockstore.get_new_shred_signal(0),
1023 &leader_schedule_cache,
1024 &genesis_config.poh_config,
1025 exit.clone(),
1026 )
1027 };
1028 let (record_sender, record_receiver) = record_channels(transaction_status_sender.is_some());
1029 let transaction_recorder = TransactionRecorder::new(record_sender);
1030 let poh_recorder = Arc::new(RwLock::new(poh_recorder));
1031 let (poh_controller, poh_service_message_receiver) = PohController::new();
1032
1033 let (banking_tracer, tracer_thread) =
1034 BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some((
1035 &blockstore.banking_trace_path(),
1036 exit.clone(),
1037 config.banking_trace_dir_byte_limit,
1038 )))?;
1039 if banking_tracer.is_enabled() {
1040 info!(
1041 "Enabled banking trace (dir_byte_limit: {})",
1042 config.banking_trace_dir_byte_limit
1043 );
1044 } else {
1045 info!("Disabled banking trace");
1046 }
1047 let banking_tracer_channels = banking_tracer.create_channels(false);
1048
1049 match &config.block_verification_method {
1050 BlockVerificationMethod::BlockstoreProcessor => {
1051 info!("no scheduler pool is installed for block verification...");
1052 if let Some(count) = config.unified_scheduler_handler_threads {
1053 warn!(
1054 "--unified-scheduler-handler-threads={count} is ignored because unified \
1055 scheduler isn't enabled"
1056 );
1057 }
1058 }
1059 BlockVerificationMethod::UnifiedScheduler => {
1060 let scheduler_pool = DefaultSchedulerPool::new_dyn(
1061 config.unified_scheduler_handler_threads,
1062 config.runtime_config.log_messages_bytes_limit,
1063 transaction_status_sender.clone(),
1064 Some(replay_vote_sender.clone()),
1065 prioritization_fee_cache.clone(),
1066 );
1067 bank_forks
1068 .write()
1069 .unwrap()
1070 .install_scheduler_pool(scheduler_pool);
1071 }
1072 }
1073
1074 let entry_notification_sender = entry_notifier_service
1075 .as_ref()
1076 .map(|service| service.sender());
1077 let mut process_blockstore = ProcessBlockStore::new(
1078 &id,
1079 vote_account,
1080 &start_progress,
1081 &blockstore,
1082 original_blockstore_root,
1083 &bank_forks,
1084 &leader_schedule_cache,
1085 &blockstore_process_options,
1086 transaction_status_sender.as_ref(),
1087 entry_notification_sender,
1088 blockstore_root_scan,
1089 &snapshot_controller,
1090 config,
1091 );
1092
1093 maybe_warp_slot(
1094 config,
1095 &mut process_blockstore,
1096 ledger_path,
1097 &bank_forks,
1098 &leader_schedule_cache,
1099 &snapshot_controller,
1100 )
1101 .map_err(ValidatorError::Other)?;
1102
1103 if config.process_ledger_before_services {
1104 process_blockstore
1105 .process()
1106 .map_err(ValidatorError::Other)?;
1107 }
1108 *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
1109
1110 let sample_performance_service =
1111 if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
1112 Some(SamplePerformanceService::new(
1113 &bank_forks,
1114 blockstore.clone(),
1115 exit.clone(),
1116 ))
1117 } else {
1118 None
1119 };
1120
1121 let mut block_commitment_cache = BlockCommitmentCache::default();
1122 let bank_forks_guard = bank_forks.read().unwrap();
1123 block_commitment_cache.initialize_slots(
1124 bank_forks_guard.working_bank().slot(),
1125 bank_forks_guard.root(),
1126 );
1127 drop(bank_forks_guard);
1128 let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
1129
1130 let optimistically_confirmed_bank =
1131 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1132
1133 let max_slots = Arc::new(MaxSlots::default());
1134
1135 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1136
1137 let mut tpu_transactions_forwards_client_sockets =
1138 Some(node.sockets.tpu_transaction_forwarding_clients);
1139 let connection_cache = match (config.use_tpu_client_next, use_quic) {
1140 (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
1141 "connection_cache_tpu_quic",
1142 tpu_connection_pool_size,
1143 Some({
1144 let socketbox: Box<[_; 1]> = tpu_transactions_forwards_client_sockets
1147 .take()
1148 .unwrap()
1149 .try_into()
1150 .expect("Multihoming support for connection cache is not available");
1151 let [sock] = *socketbox;
1152 sock
1153 }),
1154 Some((
1155 &identity_keypair,
1156 node.info
1157 .tpu(Protocol::UDP)
1158 .ok_or_else(|| {
1159 ValidatorError::Other(String::from("Invalid UDP address for TPU"))
1160 })?
1161 .ip(),
1162 )),
1163 Some((&staked_nodes, &identity_keypair.pubkey())),
1164 ))),
1165 (false, false) => Some(Arc::new(ConnectionCache::with_udp(
1166 "connection_cache_tpu_udp",
1167 tpu_connection_pool_size,
1168 ))),
1169 (true, _) => None,
1170 };
1171
1172 let vote_connection_cache = if vote_use_quic {
1173 let vote_connection_cache = ConnectionCache::new_with_client_options(
1174 "connection_cache_vote_quic",
1175 tpu_connection_pool_size,
1176 Some(node.sockets.quic_vote_client),
1177 Some((
1178 &identity_keypair,
1179 node.info
1180 .tpu_vote(Protocol::QUIC)
1181 .ok_or_else(|| {
1182 ValidatorError::Other(String::from("Invalid QUIC address for TPU Vote"))
1183 })?
1184 .ip(),
1185 )),
1186 Some((&staked_nodes, &identity_keypair.pubkey())),
1187 );
1188 Arc::new(vote_connection_cache)
1189 } else {
1190 Arc::new(ConnectionCache::with_udp(
1191 "connection_cache_vote_udp",
1192 tpu_connection_pool_size,
1193 ))
1194 };
1195
1196 let current_runtime_handle = tokio::runtime::Handle::try_current();
1202 let tpu_client_next_runtime =
1203 (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1204 tokio::runtime::Builder::new_multi_thread()
1205 .enable_all()
1206 .worker_threads(2)
1207 .thread_name("solTpuClientRt")
1208 .build()
1209 .unwrap()
1210 });
1211
1212 let rpc_override_health_check =
1213 Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
1214 let (
1215 json_rpc_service,
1216 rpc_subscriptions,
1217 pubsub_service,
1218 completed_data_sets_sender,
1219 completed_data_sets_service,
1220 rpc_completed_slots_service,
1221 optimistically_confirmed_bank_tracker,
1222 bank_notification_sender,
1223 ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
1224 assert_eq!(
1225 node.info.rpc().map(|addr| socket_addr_space.check(&addr)),
1226 node.info
1227 .rpc_pubsub()
1228 .map(|addr| socket_addr_space.check(&addr))
1229 );
1230 let (bank_notification_sender, bank_notification_receiver) = unbounded();
1231 let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
1232 Some(Arc::new(RwLock::new(bank_notification_senders)))
1233 } else {
1234 None
1235 };
1236
1237 let client_option = if config.use_tpu_client_next {
1238 let runtime_handle = tpu_client_next_runtime
1239 .as_ref()
1240 .map(TokioRuntime::handle)
1241 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1242 ClientOption::TpuClientNext(
1243 Arc::as_ref(&identity_keypair),
1244 node.sockets.rpc_sts_client,
1245 runtime_handle.clone(),
1246 cancel.clone(),
1247 )
1248 } else {
1249 let Some(connection_cache) = &connection_cache else {
1250 panic!("ConnectionCache should exist by construction.");
1251 };
1252 ClientOption::ConnectionCache(connection_cache.clone())
1253 };
1254 let rpc_svc_config = JsonRpcServiceConfig {
1255 rpc_addr,
1256 rpc_config: config.rpc_config.clone(),
1257 snapshot_config: Some(snapshot_controller.snapshot_config().clone()),
1258 bank_forks: bank_forks.clone(),
1259 block_commitment_cache: block_commitment_cache.clone(),
1260 blockstore: blockstore.clone(),
1261 cluster_info: cluster_info.clone(),
1262 poh_recorder: Some(poh_recorder.clone()),
1263 genesis_hash: genesis_config.hash(),
1264 ledger_path: ledger_path.to_path_buf(),
1265 validator_exit: config.validator_exit.clone(),
1266 exit: exit.clone(),
1267 override_health_check: rpc_override_health_check.clone(),
1268 optimistically_confirmed_bank: optimistically_confirmed_bank.clone(),
1269 send_transaction_service_config: config.send_transaction_service_config.clone(),
1270 max_slots: max_slots.clone(),
1271 leader_schedule_cache: leader_schedule_cache.clone(),
1272 max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
1273 prioritization_fee_cache: prioritization_fee_cache.clone(),
1274 client_option,
1275 };
1276 let json_rpc_service =
1277 JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
1278 let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
1279 exit.clone(),
1280 max_complete_transaction_status_slot,
1281 blockstore.clone(),
1282 bank_forks.clone(),
1283 block_commitment_cache.clone(),
1284 optimistically_confirmed_bank.clone(),
1285 &config.pubsub_config,
1286 None,
1287 ));
1288 let pubsub_service = if !config.rpc_config.full_api {
1289 None
1290 } else {
1291 let (trigger, pubsub_service) = PubSubService::new(
1292 config.pubsub_config.clone(),
1293 &rpc_subscriptions,
1294 rpc_pubsub_addr,
1295 );
1296 config
1297 .validator_exit
1298 .write()
1299 .unwrap()
1300 .register_exit(Box::new(move || trigger.cancel()));
1301
1302 Some(pubsub_service)
1303 };
1304
1305 let (completed_data_sets_sender, completed_data_sets_service) =
1306 if !config.rpc_config.full_api {
1307 (None, None)
1308 } else {
1309 let (completed_data_sets_sender, completed_data_sets_receiver) =
1310 bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
1311 let completed_data_sets_service = CompletedDataSetsService::new(
1312 completed_data_sets_receiver,
1313 blockstore.clone(),
1314 rpc_subscriptions.clone(),
1315 exit.clone(),
1316 max_slots.clone(),
1317 );
1318 (
1319 Some(completed_data_sets_sender),
1320 Some(completed_data_sets_service),
1321 )
1322 };
1323
1324 let rpc_completed_slots_service =
1325 if config.rpc_config.full_api || geyser_plugin_service.is_some() {
1326 let (completed_slots_sender, completed_slots_receiver) =
1327 bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
1328 blockstore.add_completed_slots_signal(completed_slots_sender);
1329
1330 Some(RpcCompletedSlotsService::spawn(
1331 completed_slots_receiver,
1332 rpc_subscriptions.clone(),
1333 slot_status_notifier.clone(),
1334 exit.clone(),
1335 ))
1336 } else {
1337 None
1338 };
1339
1340 let dependency_tracker = transaction_status_sender
1341 .is_some()
1342 .then_some(dependency_tracker);
1343 let optimistically_confirmed_bank_tracker =
1344 Some(OptimisticallyConfirmedBankTracker::new(
1345 bank_notification_receiver,
1346 exit.clone(),
1347 bank_forks.clone(),
1348 optimistically_confirmed_bank,
1349 rpc_subscriptions.clone(),
1350 confirmed_bank_subscribers,
1351 prioritization_fee_cache.clone(),
1352 dependency_tracker.clone(),
1353 ));
1354 let bank_notification_sender_config = Some(BankNotificationSenderConfig {
1355 sender: bank_notification_sender,
1356 should_send_parents: geyser_plugin_service.is_some(),
1357 dependency_tracker,
1358 });
1359 (
1360 Some(json_rpc_service),
1361 Some(rpc_subscriptions),
1362 pubsub_service,
1363 completed_data_sets_sender,
1364 completed_data_sets_service,
1365 rpc_completed_slots_service,
1366 optimistically_confirmed_bank_tracker,
1367 bank_notification_sender_config,
1368 )
1369 } else {
1370 (None, None, None, None, None, None, None, None)
1371 };
1372
1373 if config.halt_at_slot.is_some() {
1374 block_commitment_cache
1377 .write()
1378 .unwrap()
1379 .set_highest_super_majority_root(bank_forks.read().unwrap().root());
1380
1381 warn!("Validator halted");
1383 *start_progress.write().unwrap() = ValidatorStartProgress::Halted;
1384 std::thread::park();
1385 }
1386 let ip_echo_server = match node.sockets.ip_echo {
1387 None => None,
1388 Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
1389 tcp_listener,
1390 config.ip_echo_server_threads,
1391 Some(node.info.shred_version()),
1392 )),
1393 };
1394
1395 let (stats_reporter_sender, stats_reporter_receiver) = unbounded();
1396
1397 let stats_reporter_service =
1398 StatsReporterService::new(stats_reporter_receiver, exit.clone());
1399
1400 let gossip_service = GossipService::new(
1401 &cluster_info,
1402 Some(bank_forks.clone()),
1403 node.sockets.gossip.clone(),
1404 config.gossip_validators.clone(),
1405 should_check_duplicate_instance,
1406 Some(stats_reporter_sender.clone()),
1407 exit.clone(),
1408 );
1409 let serve_repair = config.repair_handler_type.create_serve_repair(
1410 blockstore.clone(),
1411 cluster_info.clone(),
1412 bank_forks.read().unwrap().sharable_banks(),
1413 config.repair_whitelist.clone(),
1414 );
1415 let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded();
1416 let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded();
1417 let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) =
1418 unbounded();
1419
1420 let waited_for_supermajority = wait_for_supermajority(
1421 config,
1422 Some(&mut process_blockstore),
1423 &bank_forks,
1424 &cluster_info,
1425 rpc_override_health_check,
1426 &start_progress,
1427 )?;
1428
1429 let blockstore_metric_report_service =
1430 BlockstoreMetricReportService::new(blockstore.clone(), exit.clone());
1431
1432 let wait_for_vote_to_start_leader =
1433 !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
1434
1435 let poh_service = PohService::new(
1436 poh_recorder.clone(),
1437 &genesis_config.poh_config,
1438 exit.clone(),
1439 bank_forks.read().unwrap().root_bank().ticks_per_slot(),
1440 config.poh_pinned_cpu_core,
1441 config.poh_hashes_per_batch,
1442 record_receiver,
1443 poh_service_message_receiver,
1444 );
1445 assert_eq!(
1446 blockstore.get_new_shred_signals_len(),
1447 1,
1448 "New shred signal for the TVU should be the same as the clear bank signal."
1449 );
1450
1451 let vote_tracker = Arc::<VoteTracker>::default();
1452
1453 let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
1454 let (verified_vote_sender, verified_vote_receiver) = unbounded();
1455 let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
1456 let (duplicate_confirmed_slot_sender, duplicate_confirmed_slots_receiver) = unbounded();
1457
1458 let entry_notification_sender = entry_notifier_service
1459 .as_ref()
1460 .map(|service| service.sender_cloned());
1461
1462 let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
1463 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1464 .then(|| {
1465 tokio::runtime::Builder::new_multi_thread()
1466 .enable_all()
1467 .thread_name("solTurbineQuic")
1468 .build()
1469 .unwrap()
1470 });
1471 let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
1472 let (
1473 turbine_quic_endpoint,
1474 turbine_quic_endpoint_sender,
1475 turbine_quic_endpoint_join_handle,
1476 ) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
1477 let (sender, _receiver) = tokio::sync::mpsc::channel(1);
1478 (None, sender, None)
1479 } else {
1480 solana_turbine::quic_endpoint::new_quic_endpoint(
1481 turbine_quic_endpoint_runtime
1482 .as_ref()
1483 .map(TokioRuntime::handle)
1484 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1485 &identity_keypair,
1486 node.sockets.tvu_quic,
1487 turbine_quic_endpoint_sender,
1488 bank_forks.clone(),
1489 )
1490 .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
1491 .unwrap()
1492 };
1493
1494 let repair_quic_endpoints_runtime = (current_runtime_handle.is_err()
1496 && genesis_config.cluster_type != ClusterType::MainnetBeta)
1497 .then(|| {
1498 tokio::runtime::Builder::new_multi_thread()
1499 .enable_all()
1500 .thread_name("solRepairQuic")
1501 .build()
1502 .unwrap()
1503 });
1504 let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) =
1505 if genesis_config.cluster_type == ClusterType::MainnetBeta {
1506 (None, RepairQuicAsyncSenders::new_dummy(), None)
1507 } else {
1508 let repair_quic_sockets = RepairQuicSockets {
1509 repair_server_quic_socket: node.sockets.serve_repair_quic,
1510 repair_client_quic_socket: node.sockets.repair_quic,
1511 ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
1512 };
1513 let repair_quic_senders = RepairQuicSenders {
1514 repair_request_quic_sender: repair_request_quic_sender.clone(),
1515 repair_response_quic_sender,
1516 ancestor_hashes_response_quic_sender,
1517 };
1518 repair::quic_endpoint::new_quic_endpoints(
1519 repair_quic_endpoints_runtime
1520 .as_ref()
1521 .map(TokioRuntime::handle)
1522 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
1523 &identity_keypair,
1524 repair_quic_sockets,
1525 repair_quic_senders,
1526 bank_forks.clone(),
1527 )
1528 .map(|(endpoints, senders, join_handle)| {
1529 (Some(endpoints), senders, Some(join_handle))
1530 })
1531 .unwrap()
1532 };
1533 let serve_repair_service = ServeRepairService::new(
1534 serve_repair,
1535 repair_request_quic_sender,
1538 repair_request_quic_receiver,
1539 repair_quic_async_senders.repair_response_quic_sender,
1540 node.sockets.serve_repair,
1541 socket_addr_space,
1542 stats_reporter_sender,
1543 exit.clone(),
1544 );
1545
1546 let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
1547 let wen_restart_repair_slots = if in_wen_restart {
1548 Some(Arc::new(RwLock::new(Vec::new())))
1549 } else {
1550 None
1551 };
1552 let tower = match process_blockstore.process_to_create_tower() {
1553 Ok(tower) => {
1554 info!("Tower state: {tower:?}");
1555 tower
1556 }
1557 Err(e) => {
1558 warn!("Unable to retrieve tower: {e:?} creating default tower....");
1559 Tower::default()
1560 }
1561 };
1562 let last_vote = tower.last_vote();
1563
1564 let outstanding_repair_requests =
1565 Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
1566 let root_bank = bank_forks.read().unwrap().root_bank();
1567 let cluster_slots = Arc::new({
1568 crate::cluster_slots_service::cluster_slots::ClusterSlots::new(
1569 &root_bank,
1570 &cluster_info,
1571 )
1572 });
1573
1574 let connection_cache_for_warmup =
1576 if json_rpc_service.is_some() && connection_cache.is_some() {
1577 connection_cache.as_ref()
1578 } else {
1579 None
1580 };
1581 let (xdp_retransmitter, xdp_sender) =
1582 if let Some(xdp_config) = config.retransmit_xdp.clone() {
1583 let src_port = node.sockets.retransmit_sockets[0]
1584 .local_addr()
1585 .expect("failed to get local address")
1586 .port();
1587 let src_ip = match node.bind_ip_addrs.active() {
1588 IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
1589 IpAddr::V4(_unspecified) => xdp_config
1590 .interface
1591 .as_ref()
1592 .and_then(|iface| master_ip_if_bonded(iface)),
1593 _ => panic!("IPv6 not supported"),
1594 };
1595 let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip)
1596 .expect("failed to create xdp retransmitter");
1597 (Some(rtx), Some(sender))
1598 } else {
1599 (None, None)
1600 };
1601
1602 let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet
1604 || genesis_config.cluster_type == ClusterType::Development
1605 {
1606 node.sockets.alpenglow
1607 } else {
1608 None
1609 };
1610
1611 let tvu = Tvu::new(
1612 vote_account,
1613 authorized_voter_keypairs,
1614 &bank_forks,
1615 &cluster_info,
1616 TvuSockets {
1617 repair: node.sockets.repair.try_clone().unwrap(),
1618 retransmit: node.sockets.retransmit_sockets,
1619 fetch: node.sockets.tvu,
1620 ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
1621 alpenglow: alpenglow_socket,
1622 },
1623 blockstore.clone(),
1624 ledger_signal_receiver,
1625 rpc_subscriptions.clone(),
1626 &poh_recorder,
1627 poh_controller,
1628 tower,
1629 config.tower_storage.clone(),
1630 &leader_schedule_cache,
1631 exit.clone(),
1632 block_commitment_cache,
1633 config.turbine_disabled.clone(),
1634 transaction_status_sender.clone(),
1635 entry_notification_sender.clone(),
1636 vote_tracker.clone(),
1637 retransmit_slots_sender,
1638 gossip_verified_vote_hash_receiver,
1639 verified_vote_receiver,
1640 replay_vote_sender.clone(),
1641 completed_data_sets_sender,
1642 bank_notification_sender.clone(),
1643 duplicate_confirmed_slots_receiver,
1644 TvuConfig {
1645 max_ledger_shreds: config.max_ledger_shreds,
1646 shred_version: node.info.shred_version(),
1647 repair_validators: config.repair_validators.clone(),
1648 repair_whitelist: config.repair_whitelist.clone(),
1649 wait_for_vote_to_start_leader,
1650 replay_forks_threads: config.replay_forks_threads,
1651 replay_transactions_threads: config.replay_transactions_threads,
1652 shred_sigverify_threads: config.tvu_shred_sigverify_threads,
1653 xdp_sender: xdp_sender.clone(),
1654 },
1655 &max_slots,
1656 block_metadata_notifier,
1657 config.wait_to_vote_slot,
1658 Some(snapshot_controller.clone()),
1659 config.runtime_config.log_messages_bytes_limit,
1660 connection_cache_for_warmup,
1661 &prioritization_fee_cache,
1662 banking_tracer.clone(),
1663 turbine_quic_endpoint_sender.clone(),
1664 turbine_quic_endpoint_receiver,
1665 repair_response_quic_receiver,
1666 repair_quic_async_senders.repair_request_quic_sender,
1667 repair_quic_async_senders.ancestor_hashes_request_quic_sender,
1668 ancestor_hashes_response_quic_receiver,
1669 outstanding_repair_requests.clone(),
1670 cluster_slots.clone(),
1671 wen_restart_repair_slots.clone(),
1672 slot_status_notifier,
1673 vote_connection_cache,
1674 )
1675 .map_err(ValidatorError::Other)?;
1676
1677 if in_wen_restart {
1678 info!("Waiting for wen_restart to finish");
1679 wait_for_wen_restart(WenRestartConfig {
1680 wen_restart_path: config.wen_restart_proto_path.clone().unwrap(),
1681 wen_restart_coordinator: config.wen_restart_coordinator.unwrap(),
1682 last_vote,
1683 blockstore: blockstore.clone(),
1684 cluster_info: cluster_info.clone(),
1685 bank_forks: bank_forks.clone(),
1686 wen_restart_repair_slots: wen_restart_repair_slots.clone(),
1687 wait_for_supermajority_threshold_percent:
1688 WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
1689 snapshot_controller: Some(snapshot_controller.clone()),
1690 abs_status: accounts_background_service.status().clone(),
1691 genesis_config_hash: genesis_config.hash(),
1692 exit: exit.clone(),
1693 })?;
1694 return Err(ValidatorError::WenRestartFinished.into());
1695 }
1696
1697 let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
1698 let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
1699 ForwardingClientOption::ConnectionCache(connection_cache.clone())
1700 } else {
1701 let runtime_handle = tpu_client_next_runtime
1702 .as_ref()
1703 .map(TokioRuntime::handle)
1704 .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1705 ForwardingClientOption::TpuClientNext((
1706 Arc::as_ref(&identity_keypair),
1707 tpu_transactions_forwards_client_sockets.take().unwrap(),
1708 runtime_handle.clone(),
1709 cancel.clone(),
1710 node_multihoming.clone(),
1711 ))
1712 };
1713 let tpu = Tpu::new_with_client(
1714 &cluster_info,
1715 &poh_recorder,
1716 transaction_recorder,
1717 entry_receiver,
1718 retransmit_slots_receiver,
1719 TpuSockets {
1720 transactions: node.sockets.tpu,
1721 transaction_forwards: node.sockets.tpu_forwards,
1722 vote: node.sockets.tpu_vote,
1723 broadcast: node.sockets.broadcast,
1724 transactions_quic: node.sockets.tpu_quic,
1725 transactions_forwards_quic: node.sockets.tpu_forwards_quic,
1726 vote_quic: node.sockets.tpu_vote_quic,
1727 vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
1728 vortexor_receivers: node.sockets.vortexor_receivers,
1729 },
1730 rpc_subscriptions.clone(),
1731 transaction_status_sender,
1732 entry_notification_sender,
1733 blockstore.clone(),
1734 &config.broadcast_stage_type,
1735 xdp_sender,
1736 exit,
1737 node.info.shred_version(),
1738 vote_tracker,
1739 bank_forks.clone(),
1740 verified_vote_sender,
1741 gossip_verified_vote_hash_sender,
1742 replay_vote_receiver,
1743 replay_vote_sender,
1744 bank_notification_sender,
1745 duplicate_confirmed_slot_sender,
1746 forwarding_tpu_client,
1747 turbine_quic_endpoint_sender,
1748 &identity_keypair,
1749 config.runtime_config.log_messages_bytes_limit,
1750 &staked_nodes,
1751 config.staked_nodes_overrides.clone(),
1752 banking_tracer_channels,
1753 tracer_thread,
1754 tpu_enable_udp,
1755 tpu_quic_server_config,
1756 tpu_fwd_quic_server_config,
1757 vote_quic_server_config,
1758 &prioritization_fee_cache,
1759 config.block_production_method.clone(),
1760 config.block_production_num_workers,
1761 config.block_production_scheduler_config.clone(),
1762 config.enable_block_production_forwarding,
1763 config.generator_config.clone(),
1764 key_notifiers.clone(),
1765 cancel,
1766 );
1767
1768 datapoint_info!(
1769 "validator-new",
1770 ("id", id.to_string(), String),
1771 ("version", solana_version::version!(), String),
1772 ("cluster_type", genesis_config.cluster_type as u32, i64),
1773 ("elapsed_ms", start_time.elapsed().as_millis() as i64, i64),
1774 ("waited_for_supermajority", waited_for_supermajority, bool),
1775 ("shred_version", shred_version as i64, i64),
1776 );
1777
1778 *start_progress.write().unwrap() = ValidatorStartProgress::Running;
1779 if config.use_tpu_client_next {
1780 if let Some(json_rpc_service) = &json_rpc_service {
1781 key_notifiers.write().unwrap().add(
1782 KeyUpdaterType::RpcService,
1783 json_rpc_service.get_client_key_updater(),
1784 );
1785 }
1786 }
1789
1790 *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
1791 bank_forks: bank_forks.clone(),
1792 cluster_info: cluster_info.clone(),
1793 vote_account: *vote_account,
1794 repair_whitelist: config.repair_whitelist.clone(),
1795 notifies: key_notifiers,
1796 repair_socket: Arc::new(node.sockets.repair),
1797 outstanding_repair_requests,
1798 cluster_slots,
1799 node: Some(node_multihoming),
1800 banking_stage: tpu.banking_stage(),
1801 });
1802
1803 Ok(Self {
1804 stats_reporter_service,
1805 gossip_service,
1806 serve_repair_service,
1807 json_rpc_service,
1808 pubsub_service,
1809 rpc_completed_slots_service,
1810 optimistically_confirmed_bank_tracker,
1811 transaction_status_service,
1812 entry_notifier_service,
1813 system_monitor_service,
1814 sample_performance_service,
1815 snapshot_packager_service,
1816 completed_data_sets_service,
1817 tpu,
1818 tvu,
1819 poh_service,
1820 poh_recorder,
1821 ip_echo_server,
1822 validator_exit: config.validator_exit.clone(),
1823 cluster_info,
1824 bank_forks,
1825 blockstore,
1826 geyser_plugin_service,
1827 blockstore_metric_report_service,
1828 accounts_background_service,
1829 turbine_quic_endpoint,
1830 turbine_quic_endpoint_runtime,
1831 turbine_quic_endpoint_join_handle,
1832 repair_quic_endpoints,
1833 repair_quic_endpoints_runtime,
1834 repair_quic_endpoints_join_handle,
1835 xdp_retransmitter,
1836 _tpu_client_next_runtime: tpu_client_next_runtime,
1837 })
1838 }
1839
1840 pub fn exit(&mut self) {
1842 self.validator_exit.write().unwrap().exit();
1843
1844 self.blockstore.drop_signal();
1846 }
1847
1848 pub fn close(mut self) {
1849 self.exit();
1850 self.join();
1851 }
1852
1853 fn print_node_info(node: &Node) {
1854 info!("{:?}", node.info);
1855 info!(
1856 "local gossip address: {}",
1857 node.sockets.gossip[0].local_addr().unwrap()
1858 );
1859 info!(
1860 "local broadcast address: {}",
1861 node.sockets
1862 .broadcast
1863 .first()
1864 .unwrap()
1865 .local_addr()
1866 .unwrap()
1867 );
1868 info!(
1869 "local repair address: {}",
1870 node.sockets.repair.local_addr().unwrap()
1871 );
1872 info!(
1873 "local retransmit address: {}",
1874 node.sockets.retransmit_sockets[0].local_addr().unwrap()
1875 );
1876 }
1877
1878 pub fn join(self) {
1879 drop(self.bank_forks);
1880 drop(self.cluster_info);
1881
1882 self.poh_service.join().expect("poh_service");
1883 drop(self.poh_recorder);
1884
1885 if let Some(json_rpc_service) = self.json_rpc_service {
1886 json_rpc_service.join().expect("rpc_service");
1887 }
1888
1889 if let Some(pubsub_service) = self.pubsub_service {
1890 pubsub_service.join().expect("pubsub_service");
1891 }
1892
1893 if let Some(rpc_completed_slots_service) = self.rpc_completed_slots_service {
1894 rpc_completed_slots_service
1895 .join()
1896 .expect("rpc_completed_slots_service");
1897 }
1898
1899 if let Some(optimistically_confirmed_bank_tracker) =
1900 self.optimistically_confirmed_bank_tracker
1901 {
1902 optimistically_confirmed_bank_tracker
1903 .join()
1904 .expect("optimistically_confirmed_bank_tracker");
1905 }
1906
1907 if let Some(transaction_status_service) = self.transaction_status_service {
1908 transaction_status_service
1909 .join()
1910 .expect("transaction_status_service");
1911 }
1912
1913 if let Some(system_monitor_service) = self.system_monitor_service {
1914 system_monitor_service
1915 .join()
1916 .expect("system_monitor_service");
1917 }
1918
1919 if let Some(sample_performance_service) = self.sample_performance_service {
1920 sample_performance_service
1921 .join()
1922 .expect("sample_performance_service");
1923 }
1924
1925 if let Some(entry_notifier_service) = self.entry_notifier_service {
1926 entry_notifier_service
1927 .join()
1928 .expect("entry_notifier_service");
1929 }
1930
1931 if let Some(s) = self.snapshot_packager_service {
1932 s.join().expect("snapshot_packager_service");
1933 }
1934
1935 self.gossip_service.join().expect("gossip_service");
1936 self.repair_quic_endpoints
1937 .iter()
1938 .flatten()
1939 .for_each(repair::quic_endpoint::close_quic_endpoint);
1940 self.serve_repair_service
1941 .join()
1942 .expect("serve_repair_service");
1943 if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle {
1944 self.repair_quic_endpoints_runtime
1945 .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle))
1946 .transpose()
1947 .unwrap();
1948 }
1949 self.stats_reporter_service
1950 .join()
1951 .expect("stats_reporter_service");
1952 self.blockstore_metric_report_service
1953 .join()
1954 .expect("ledger_metric_report_service");
1955 self.accounts_background_service
1956 .join()
1957 .expect("accounts_background_service");
1958 if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
1959 solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
1960 }
1961 if let Some(xdp_retransmitter) = self.xdp_retransmitter {
1962 xdp_retransmitter.join().expect("xdp_retransmitter");
1963 }
1964 self.tpu.join().expect("tpu");
1965 self.tvu.join().expect("tvu");
1966 if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
1967 self.turbine_quic_endpoint_runtime
1968 .map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
1969 .transpose()
1970 .unwrap();
1971 }
1972 if let Some(completed_data_sets_service) = self.completed_data_sets_service {
1973 completed_data_sets_service
1974 .join()
1975 .expect("completed_data_sets_service");
1976 }
1977 if let Some(ip_echo_server) = self.ip_echo_server {
1978 ip_echo_server.shutdown_background();
1979 }
1980
1981 if let Some(geyser_plugin_service) = self.geyser_plugin_service {
1982 geyser_plugin_service.join().expect("geyser_plugin_service");
1983 }
1984 }
1985}
1986
1987fn active_vote_account_exists_in_bank(bank: &Bank, vote_account: &Pubkey) -> bool {
1988 if let Some(account) = &bank.get_account(vote_account) {
1989 if let Ok(vote_state) = VoteStateV4::deserialize(account.data(), vote_account) {
1990 return !vote_state.votes.is_empty();
1991 }
1992 }
1993 false
1994}
1995
1996fn check_poh_speed(bank: &Bank, maybe_hash_samples: Option<u64>) -> Result<(), ValidatorError> {
1997 let Some(hashes_per_tick) = bank.hashes_per_tick() else {
1998 warn!("Unable to read hashes per tick from Bank, skipping PoH speed check");
1999 return Ok(());
2000 };
2001
2002 let ticks_per_slot = bank.ticks_per_slot();
2003 let hashes_per_slot = hashes_per_tick * ticks_per_slot;
2004 let hash_samples = maybe_hash_samples.unwrap_or(hashes_per_slot);
2005
2006 let hash_time = compute_hash_time(hash_samples);
2007 let my_hashes_per_second = (hash_samples as f64 / hash_time.as_secs_f64()) as u64;
2008
2009 let target_slot_duration = Duration::from_nanos(bank.ns_per_slot as u64);
2010 let target_hashes_per_second =
2011 (hashes_per_slot as f64 / target_slot_duration.as_secs_f64()) as u64;
2012
2013 info!(
2014 "PoH speed check: computed hashes per second {my_hashes_per_second}, target hashes per \
2015 second {target_hashes_per_second}"
2016 );
2017 if my_hashes_per_second < target_hashes_per_second {
2018 return Err(ValidatorError::PohTooSlow {
2019 mine: my_hashes_per_second,
2020 target: target_hashes_per_second,
2021 });
2022 }
2023
2024 Ok(())
2025}
2026
2027fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
2028 if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
2030 if wait_slot_for_supermajority == root_slot {
2031 return Some(wait_slot_for_supermajority);
2032 }
2033 }
2034
2035 None
2036}
2037
2038fn post_process_restored_tower(
2039 restored_tower: crate::consensus::Result<Tower>,
2040 validator_identity: &Pubkey,
2041 vote_account: &Pubkey,
2042 config: &ValidatorConfig,
2043 bank_forks: &BankForks,
2044) -> Result<Tower, String> {
2045 let mut should_require_tower = config.require_tower;
2046
2047 let restored_tower = restored_tower.and_then(|tower| {
2048 let root_bank = bank_forks.root_bank();
2049 let slot_history = root_bank.get_slot_history();
2050 let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
2052
2053 if let Some(hard_fork_restart_slot) =
2054 maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
2055 {
2056 let message =
2060 format!("Hard fork is detected; discarding tower restoration result: {tower:?}");
2061 datapoint_error!("tower_error", ("error", message, String),);
2062 error!("{message}");
2063
2064 should_require_tower = false;
2067 return Err(crate::consensus::TowerError::HardFork(
2068 hard_fork_restart_slot,
2069 ));
2070 }
2071
2072 if let Some(warp_slot) = config.warp_slot {
2073 should_require_tower = false;
2076 return Err(crate::consensus::TowerError::HardFork(warp_slot));
2077 }
2078
2079 tower
2080 });
2081
2082 let restored_tower = match restored_tower {
2083 Ok(tower) => tower,
2084 Err(err) => {
2085 let voting_has_been_active =
2086 active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
2087 if !err.is_file_missing() {
2088 datapoint_error!(
2089 "tower_error",
2090 ("error", format!("Unable to restore tower: {err}"), String),
2091 );
2092 }
2093 if should_require_tower && voting_has_been_active {
2094 return Err(format!(
2095 "Requested mandatory tower restore failed: {err}. And there is an existing \
2096 vote_account containing actual votes. Aborting due to possible conflicting \
2097 duplicate votes"
2098 ));
2099 }
2100 if err.is_file_missing() && !voting_has_been_active {
2101 info!(
2103 "Ignoring expected failed tower restore because this is the initial validator \
2104 start with the vote account..."
2105 );
2106 } else {
2107 error!(
2108 "Rebuilding a new tower from the latest vote account due to failed tower \
2109 restore: {err}"
2110 );
2111 }
2112
2113 Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
2114 }
2115 };
2116
2117 Ok(restored_tower)
2118}
2119
2120fn load_genesis(
2121 config: &ValidatorConfig,
2122 ledger_path: &Path,
2123) -> Result<GenesisConfig, ValidatorError> {
2124 let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size)
2125 .map_err(ValidatorError::OpenGenesisConfig)?;
2126
2127 let leader_schedule_slot_offset = genesis_config.epoch_schedule.leader_schedule_slot_offset;
2130 let slots_per_epoch = genesis_config.epoch_schedule.slots_per_epoch;
2131 let leader_epoch_offset = leader_schedule_slot_offset.div_ceil(slots_per_epoch);
2132 assert!(leader_epoch_offset <= MAX_LEADER_SCHEDULE_EPOCH_OFFSET);
2133
2134 let genesis_hash = genesis_config.hash();
2135 info!("genesis hash: {genesis_hash}");
2136
2137 if let Some(expected_genesis_hash) = config.expected_genesis_hash {
2138 if genesis_hash != expected_genesis_hash {
2139 return Err(ValidatorError::GenesisHashMismatch(
2140 genesis_hash,
2141 expected_genesis_hash,
2142 ));
2143 }
2144 }
2145
2146 Ok(genesis_config)
2147}
2148
2149#[allow(clippy::type_complexity)]
2150fn load_blockstore(
2151 config: &ValidatorConfig,
2152 ledger_path: &Path,
2153 genesis_config: &GenesisConfig,
2154 exit: Arc<AtomicBool>,
2155 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2156 accounts_update_notifier: Option<AccountsUpdateNotifier>,
2157 transaction_notifier: Option<TransactionNotifierArc>,
2158 entry_notifier: Option<EntryNotifierArc>,
2159 dependency_tracker: Option<Arc<DependencyTracker>>,
2160) -> Result<
2161 (
2162 Arc<RwLock<BankForks>>,
2163 Arc<Blockstore>,
2164 Slot,
2165 Receiver<bool>,
2166 LeaderScheduleCache,
2167 Option<StartingSnapshotHashes>,
2168 TransactionHistoryServices,
2169 blockstore_processor::ProcessOptions,
2170 BlockstoreRootScan,
2171 DroppedSlotsReceiver,
2172 Option<EntryNotifierService>,
2173 ),
2174 String,
2175> {
2176 info!("loading ledger from {ledger_path:?}...");
2177 *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2178
2179 let blockstore = Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
2180 .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;
2181
2182 let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
2183 blockstore.add_new_shred_signal(ledger_signal_sender);
2184
2185 let original_blockstore_root = blockstore.max_root();
2188
2189 let blockstore = Arc::new(blockstore);
2190 let blockstore_root_scan = BlockstoreRootScan::new(config, blockstore.clone(), exit.clone());
2191 let halt_at_slot = config
2192 .halt_at_slot
2193 .or_else(|| blockstore.highest_slot().unwrap_or(None));
2194
2195 let process_options = blockstore_processor::ProcessOptions {
2196 run_verification: config.run_verification,
2197 halt_at_slot,
2198 new_hard_forks: config.new_hard_forks.clone(),
2199 debug_keys: config.debug_keys.clone(),
2200 accounts_db_config: config.accounts_db_config.clone(),
2201 accounts_db_skip_shrink: config.accounts_db_skip_shrink,
2202 accounts_db_force_initial_clean: config.accounts_db_force_initial_clean,
2203 runtime_config: config.runtime_config.clone(),
2204 use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
2205 ..blockstore_processor::ProcessOptions::default()
2206 };
2207
2208 let enable_rpc_transaction_history =
2209 config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
2210 let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
2211 let transaction_history_services =
2212 if enable_rpc_transaction_history || is_plugin_transaction_history_required {
2213 initialize_rpc_transaction_history_services(
2214 blockstore.clone(),
2215 exit.clone(),
2216 enable_rpc_transaction_history,
2217 config.rpc_config.enable_extended_tx_metadata_storage,
2218 transaction_notifier,
2219 dependency_tracker,
2220 )
2221 } else {
2222 TransactionHistoryServices::default()
2223 };
2224
2225 let entry_notifier_service = entry_notifier
2226 .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
2227
2228 let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
2229 bank_forks_utils::load_bank_forks(
2230 genesis_config,
2231 &blockstore,
2232 config.account_paths.clone(),
2233 &config.snapshot_config,
2234 &process_options,
2235 transaction_history_services
2236 .transaction_status_sender
2237 .as_ref(),
2238 entry_notifier_service
2239 .as_ref()
2240 .map(|service| service.sender()),
2241 accounts_update_notifier,
2242 exit,
2243 )
2244 .map_err(|err| err.to_string())?;
2245
2246 let pruned_banks_receiver =
2252 AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
2253
2254 leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
2255
2256 Ok((
2257 bank_forks,
2258 blockstore,
2259 original_blockstore_root,
2260 ledger_signal_receiver,
2261 leader_schedule_cache,
2262 starting_snapshot_hashes,
2263 transaction_history_services,
2264 process_options,
2265 blockstore_root_scan,
2266 pruned_banks_receiver,
2267 entry_notifier_service,
2268 ))
2269}
2270
2271pub struct ProcessBlockStore<'a> {
2272 id: &'a Pubkey,
2273 vote_account: &'a Pubkey,
2274 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2275 blockstore: &'a Blockstore,
2276 original_blockstore_root: Slot,
2277 bank_forks: &'a Arc<RwLock<BankForks>>,
2278 leader_schedule_cache: &'a LeaderScheduleCache,
2279 process_options: &'a blockstore_processor::ProcessOptions,
2280 transaction_status_sender: Option<&'a TransactionStatusSender>,
2281 entry_notification_sender: Option<&'a EntryNotifierSender>,
2282 blockstore_root_scan: Option<BlockstoreRootScan>,
2283 snapshot_controller: &'a SnapshotController,
2284 config: &'a ValidatorConfig,
2285 tower: Option<Tower>,
2286}
2287
2288impl<'a> ProcessBlockStore<'a> {
2289 #[allow(clippy::too_many_arguments)]
2290 fn new(
2291 id: &'a Pubkey,
2292 vote_account: &'a Pubkey,
2293 start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
2294 blockstore: &'a Blockstore,
2295 original_blockstore_root: Slot,
2296 bank_forks: &'a Arc<RwLock<BankForks>>,
2297 leader_schedule_cache: &'a LeaderScheduleCache,
2298 process_options: &'a blockstore_processor::ProcessOptions,
2299 transaction_status_sender: Option<&'a TransactionStatusSender>,
2300 entry_notification_sender: Option<&'a EntryNotifierSender>,
2301 blockstore_root_scan: BlockstoreRootScan,
2302 snapshot_controller: &'a SnapshotController,
2303 config: &'a ValidatorConfig,
2304 ) -> Self {
2305 Self {
2306 id,
2307 vote_account,
2308 start_progress,
2309 blockstore,
2310 original_blockstore_root,
2311 bank_forks,
2312 leader_schedule_cache,
2313 process_options,
2314 transaction_status_sender,
2315 entry_notification_sender,
2316 blockstore_root_scan: Some(blockstore_root_scan),
2317 snapshot_controller,
2318 config,
2319 tower: None,
2320 }
2321 }
2322
2323 pub(crate) fn process(&mut self) -> Result<(), String> {
2324 if self.tower.is_none() {
2325 let previous_start_process = *self.start_progress.read().unwrap();
2326 *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
2327
2328 let exit = Arc::new(AtomicBool::new(false));
2329 if let Ok(Some(max_slot)) = self.blockstore.highest_slot() {
2330 let bank_forks = self.bank_forks.clone();
2331 let exit = exit.clone();
2332 let start_progress = self.start_progress.clone();
2333
2334 let _ = Builder::new()
2335 .name("solRptLdgrStat".to_string())
2336 .spawn(move || {
2337 while !exit.load(Ordering::Relaxed) {
2338 let slot = bank_forks.read().unwrap().working_bank().slot();
2339 *start_progress.write().unwrap() =
2340 ValidatorStartProgress::ProcessingLedger { slot, max_slot };
2341 sleep(Duration::from_secs(2));
2342 }
2343 })
2344 .unwrap();
2345 }
2346 blockstore_processor::process_blockstore_from_root(
2347 self.blockstore,
2348 self.bank_forks,
2349 self.leader_schedule_cache,
2350 self.process_options,
2351 self.transaction_status_sender,
2352 self.entry_notification_sender,
2353 Some(self.snapshot_controller),
2354 )
2355 .map_err(|err| {
2356 exit.store(true, Ordering::Relaxed);
2357 format!("Failed to load ledger: {err:?}")
2358 })?;
2359 exit.store(true, Ordering::Relaxed);
2360
2361 if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
2362 blockstore_root_scan.join();
2363 }
2364
2365 self.tower = Some({
2366 let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
2367 if let Ok(tower) = &restored_tower {
2368 reconcile_blockstore_roots_with_external_source(
2370 ExternalRootSource::Tower(tower.root()),
2371 self.blockstore,
2372 &mut self.original_blockstore_root,
2373 )
2374 .map_err(|err| format!("Failed to reconcile blockstore with tower: {err:?}"))?;
2375 }
2376
2377 post_process_restored_tower(
2378 restored_tower,
2379 self.id,
2380 self.vote_account,
2381 self.config,
2382 &self.bank_forks.read().unwrap(),
2383 )?
2384 });
2385
2386 if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
2387 self.config,
2388 self.bank_forks.read().unwrap().root(),
2389 ) {
2390 reconcile_blockstore_roots_with_external_source(
2393 ExternalRootSource::HardFork(hard_fork_restart_slot),
2394 self.blockstore,
2395 &mut self.original_blockstore_root,
2396 )
2397 .map_err(|err| format!("Failed to reconcile blockstore with hard fork: {err:?}"))?;
2398 }
2399
2400 *self.start_progress.write().unwrap() = previous_start_process;
2401 }
2402 Ok(())
2403 }
2404
2405 pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
2406 self.process()?;
2407 Ok(self.tower.unwrap())
2408 }
2409}
2410
2411fn maybe_warp_slot(
2412 config: &ValidatorConfig,
2413 process_blockstore: &mut ProcessBlockStore,
2414 ledger_path: &Path,
2415 bank_forks: &RwLock<BankForks>,
2416 leader_schedule_cache: &LeaderScheduleCache,
2417 snapshot_controller: &SnapshotController,
2418) -> Result<(), String> {
2419 if let Some(warp_slot) = config.warp_slot {
2420 let mut bank_forks = bank_forks.write().unwrap();
2421
2422 let working_bank = bank_forks.working_bank();
2423
2424 if warp_slot <= working_bank.slot() {
2425 return Err(format!(
2426 "warp slot ({}) cannot be less than the working bank slot ({})",
2427 warp_slot,
2428 working_bank.slot()
2429 ));
2430 }
2431 info!("warping to slot {warp_slot}");
2432
2433 let root_bank = bank_forks.root_bank();
2434
2435 root_bank.squash();
2439 root_bank.force_flush_accounts_cache();
2440
2441 bank_forks.insert(Bank::warp_from_parent(
2442 root_bank,
2443 &Pubkey::default(),
2444 warp_slot,
2445 ));
2446 bank_forks.set_root(warp_slot, Some(snapshot_controller), Some(warp_slot));
2447 leader_schedule_cache.set_root(&bank_forks.root_bank());
2448
2449 let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(
2450 ledger_path,
2451 &bank_forks.root_bank(),
2452 None,
2453 &config.snapshot_config.full_snapshot_archives_dir,
2454 &config.snapshot_config.incremental_snapshot_archives_dir,
2455 config.snapshot_config.archive_format,
2456 ) {
2457 Ok(archive_info) => archive_info,
2458 Err(e) => return Err(format!("Unable to create snapshot: {e}")),
2459 };
2460 info!(
2461 "created snapshot: {}",
2462 full_snapshot_archive_info.path().display()
2463 );
2464
2465 drop(bank_forks);
2466 process_blockstore.process()?;
2469 }
2470 Ok(())
2471}
2472
2473fn should_cleanup_blockstore_incorrect_shred_versions(
2476 config: &ValidatorConfig,
2477 blockstore: &Blockstore,
2478 root_slot: Slot,
2479 hard_forks: &HardForks,
2480) -> Result<Option<Slot>, BlockstoreError> {
2481 let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
2483 if maybe_cluster_restart_slot.is_some() {
2484 return Ok(Some(root_slot + 1));
2485 }
2486
2487 let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
2489 return Ok(None);
2490 };
2491
2492 let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
2494 return Ok(None);
2495 };
2496 let blockstore_min_slot = blockstore.lowest_slot();
2497 info!(
2498 "Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
2499 latest hard fork is {latest_hard_fork}"
2500 );
2501
2502 if latest_hard_fork < blockstore_min_slot {
2503 Ok(None)
2511 } else if latest_hard_fork < blockstore_max_slot {
2512 Ok(Some(latest_hard_fork + 1))
2523 } else {
2524 Ok(None)
2532 }
2533}
2534
2535fn scan_blockstore_for_incorrect_shred_version(
2538 blockstore: &Blockstore,
2539 start_slot: Slot,
2540 expected_shred_version: u16,
2541) -> Result<Option<u16>, BlockstoreError> {
2542 const TIMEOUT: Duration = Duration::from_secs(60);
2543 let timer = Instant::now();
2544 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2546
2547 info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
2548 for (slot, _meta) in slot_meta_iterator {
2549 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2550 for shred in &shreds {
2551 if shred.version() != expected_shred_version {
2552 return Ok(Some(shred.version()));
2553 }
2554 }
2555 if timer.elapsed() > TIMEOUT {
2556 info!("Didn't find incorrect shreds after 60 seconds, aborting");
2557 break;
2558 }
2559 }
2560 Ok(None)
2561}
2562
2563fn cleanup_blockstore_incorrect_shred_versions(
2566 blockstore: &Blockstore,
2567 config: &ValidatorConfig,
2568 start_slot: Slot,
2569 expected_shred_version: u16,
2570) -> Result<(), BlockstoreError> {
2571 let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
2572 blockstore,
2573 start_slot,
2574 expected_shred_version,
2575 )?;
2576 let Some(incorrect_shred_version) = incorrect_shred_version else {
2577 info!("Only shreds with the correct version were found in the blockstore");
2578 return Ok(());
2579 };
2580
2581 let end_slot = blockstore.highest_slot()?.unwrap();
2583
2584 let backup_folder = format!(
2587 "{BLOCKSTORE_DIRECTORY_ROCKS_LEVEL}_backup_{incorrect_shred_version}_{start_slot}_{end_slot}"
2588 );
2589 match Blockstore::open_with_options(
2590 &blockstore.ledger_path().join(backup_folder),
2591 config.blockstore_options.clone(),
2592 ) {
2593 Ok(backup_blockstore) => {
2594 info!("Backing up slots from {start_slot} to {end_slot}");
2595 let mut timer = Measure::start("blockstore backup");
2596
2597 const PRINT_INTERVAL: Duration = Duration::from_secs(5);
2598 let mut print_timer = Instant::now();
2599 let mut num_slots_copied = 0;
2600 let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;
2601 for (slot, _meta) in slot_meta_iterator {
2602 let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
2603 let shreds = shreds.into_iter().map(Cow::Owned);
2604 let _ = backup_blockstore.insert_cow_shreds(shreds, None, true);
2605 num_slots_copied += 1;
2606
2607 if print_timer.elapsed() > PRINT_INTERVAL {
2608 info!("Backed up {num_slots_copied} slots thus far");
2609 print_timer = Instant::now();
2610 }
2611 }
2612
2613 timer.stop();
2614 info!("Backing up slots done. {timer}");
2615 }
2616 Err(err) => {
2617 warn!("Unable to backup shreds with incorrect shred version: {err}");
2618 }
2619 }
2620
2621 info!("Purging slots {start_slot} to {end_slot} from blockstore");
2622 let mut timer = Measure::start("blockstore purge");
2623 blockstore.purge_from_next_slots(start_slot, end_slot);
2624 blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
2625 timer.stop();
2626 info!("Purging slots done. {timer}");
2627
2628 Ok(())
2629}
2630
2631fn initialize_rpc_transaction_history_services(
2632 blockstore: Arc<Blockstore>,
2633 exit: Arc<AtomicBool>,
2634 enable_rpc_transaction_history: bool,
2635 enable_extended_tx_metadata_storage: bool,
2636 transaction_notifier: Option<TransactionNotifierArc>,
2637 dependency_tracker: Option<Arc<DependencyTracker>>,
2638) -> TransactionHistoryServices {
2639 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
2640 let (transaction_status_sender, transaction_status_receiver) = unbounded();
2641 let transaction_status_sender = Some(TransactionStatusSender {
2642 sender: transaction_status_sender,
2643 dependency_tracker: dependency_tracker.clone(),
2644 });
2645 let transaction_status_service = Some(TransactionStatusService::new(
2646 transaction_status_receiver,
2647 max_complete_transaction_status_slot.clone(),
2648 enable_rpc_transaction_history,
2649 transaction_notifier,
2650 blockstore.clone(),
2651 enable_extended_tx_metadata_storage,
2652 dependency_tracker,
2653 exit.clone(),
2654 ));
2655
2656 TransactionHistoryServices {
2657 transaction_status_sender,
2658 transaction_status_service,
2659 max_complete_transaction_status_slot,
2660 }
2661}
2662
2663#[derive(Error, Debug)]
2664pub enum ValidatorError {
2665 #[error("bank hash mismatch: actual={0}, expected={1}")]
2666 BankHashMismatch(Hash, Hash),
2667
2668 #[error("blockstore error: {0}")]
2669 Blockstore(#[source] BlockstoreError),
2670
2671 #[error("genesis hash mismatch: actual={0}, expected={1}")]
2672 GenesisHashMismatch(Hash, Hash),
2673
2674 #[error(
2675 "ledger does not have enough data to wait for supermajority: current slot={0}, needed \
2676 slot={1}"
2677 )]
2678 NotEnoughLedgerData(Slot, Slot),
2679
2680 #[error("failed to open genesis: {0}")]
2681 OpenGenesisConfig(#[source] OpenGenesisConfigError),
2682
2683 #[error("{0}")]
2684 Other(String),
2685
2686 #[error(
2687 "PoH hashes/second rate is slower than the cluster target: mine {mine}, cluster {target}"
2688 )]
2689 PohTooSlow { mine: u64, target: u64 },
2690
2691 #[error(transparent)]
2692 ResourceLimitError(#[from] ResourceLimitError),
2693
2694 #[error("shred version mismatch: actual {actual}, expected {expected}")]
2695 ShredVersionMismatch { actual: u16, expected: u16 },
2696
2697 #[error(transparent)]
2698 TraceError(#[from] TraceError),
2699
2700 #[error("Wen Restart finished, please continue with --wait-for-supermajority")]
2701 WenRestartFinished,
2702}
2703
2704fn wait_for_supermajority(
2711 config: &ValidatorConfig,
2712 process_blockstore: Option<&mut ProcessBlockStore>,
2713 bank_forks: &RwLock<BankForks>,
2714 cluster_info: &ClusterInfo,
2715 rpc_override_health_check: Arc<AtomicBool>,
2716 start_progress: &Arc<RwLock<ValidatorStartProgress>>,
2717) -> Result<bool, ValidatorError> {
2718 match config.wait_for_supermajority {
2719 None => Ok(false),
2720 Some(wait_for_supermajority_slot) => {
2721 if let Some(process_blockstore) = process_blockstore {
2722 process_blockstore
2723 .process()
2724 .map_err(ValidatorError::Other)?;
2725 }
2726
2727 let bank = bank_forks.read().unwrap().working_bank();
2728 match wait_for_supermajority_slot.cmp(&bank.slot()) {
2729 std::cmp::Ordering::Less => return Ok(false),
2730 std::cmp::Ordering::Greater => {
2731 return Err(ValidatorError::NotEnoughLedgerData(
2732 bank.slot(),
2733 wait_for_supermajority_slot,
2734 ));
2735 }
2736 _ => {}
2737 }
2738
2739 if let Some(expected_bank_hash) = config.expected_bank_hash {
2740 if bank.hash() != expected_bank_hash {
2741 return Err(ValidatorError::BankHashMismatch(
2742 bank.hash(),
2743 expected_bank_hash,
2744 ));
2745 }
2746 }
2747
2748 for i in 1.. {
2749 let logging = i % 10 == 1;
2750 if logging {
2751 info!(
2752 "Waiting for {}% of activated stake at slot {} to be in gossip...",
2753 WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
2754 bank.slot()
2755 );
2756 }
2757
2758 let gossip_stake_percent =
2759 get_stake_percent_in_gossip(&bank, cluster_info, logging);
2760
2761 *start_progress.write().unwrap() =
2762 ValidatorStartProgress::WaitingForSupermajority {
2763 slot: wait_for_supermajority_slot,
2764 gossip_stake_percent,
2765 };
2766
2767 if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
2768 info!(
2769 "Supermajority reached, {gossip_stake_percent}% active stake detected, \
2770 starting up now.",
2771 );
2772 break;
2773 }
2774 rpc_override_health_check.store(true, Ordering::Relaxed);
2778 sleep(Duration::new(1, 0));
2779 }
2780 rpc_override_health_check.store(false, Ordering::Relaxed);
2781 Ok(true)
2782 }
2783 }
2784}
2785
2786fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
2788 let mut online_stake = 0;
2789 let mut wrong_shred_stake = 0;
2790 let mut wrong_shred_nodes = vec![];
2791 let mut offline_stake = 0;
2792 let mut offline_nodes = vec![];
2793
2794 let mut total_activated_stake = 0;
2795 let now = timestamp();
2796 let peers: HashMap<_, _> = cluster_info
2800 .tvu_peers(ContactInfo::clone)
2801 .into_iter()
2802 .filter(|node| {
2803 let age = now.saturating_sub(node.wallclock());
2804 age < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
2806 })
2807 .map(|node| (*node.pubkey(), node))
2808 .collect();
2809 let my_shred_version = cluster_info.my_shred_version();
2810 let my_id = cluster_info.id();
2811
2812 for (activated_stake, vote_account) in bank.vote_accounts().values() {
2813 let activated_stake = *activated_stake;
2814 total_activated_stake += activated_stake;
2815
2816 if activated_stake == 0 {
2817 continue;
2818 }
2819 let vote_state_node_pubkey = *vote_account.node_pubkey();
2820
2821 if let Some(peer) = peers.get(&vote_state_node_pubkey) {
2822 if peer.shred_version() == my_shred_version {
2823 trace!(
2824 "observed {vote_state_node_pubkey} in gossip, \
2825 (activated_stake={activated_stake})"
2826 );
2827 online_stake += activated_stake;
2828 } else {
2829 wrong_shred_stake += activated_stake;
2830 wrong_shred_nodes.push((activated_stake, vote_state_node_pubkey));
2831 }
2832 } else if vote_state_node_pubkey == my_id {
2833 online_stake += activated_stake; } else {
2835 offline_stake += activated_stake;
2836 offline_nodes.push((activated_stake, vote_state_node_pubkey));
2837 }
2838 }
2839
2840 let online_stake_percentage = (online_stake as f64 / total_activated_stake as f64) * 100.;
2841 if log {
2842 info!("{online_stake_percentage:.3}% of active stake visible in gossip");
2843
2844 if !wrong_shred_nodes.is_empty() {
2845 info!(
2846 "{:.3}% of active stake has the wrong shred version in gossip",
2847 (wrong_shred_stake as f64 / total_activated_stake as f64) * 100.,
2848 );
2849 wrong_shred_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in wrong_shred_nodes {
2851 info!(
2852 " {:.3}% - {}",
2853 (stake as f64 / total_activated_stake as f64) * 100.,
2854 identity
2855 );
2856 }
2857 }
2858
2859 if !offline_nodes.is_empty() {
2860 info!(
2861 "{:.3}% of active stake is not visible in gossip",
2862 (offline_stake as f64 / total_activated_stake as f64) * 100.
2863 );
2864 offline_nodes.sort_by(|b, a| a.0.cmp(&b.0)); for (stake, identity) in offline_nodes {
2866 info!(
2867 " {:.3}% - {}",
2868 (stake as f64 / total_activated_stake as f64) * 100.,
2869 identity
2870 );
2871 }
2872 }
2873 datapoint_info!(
2874 "wfsm_gossip",
2875 ("online_stake", online_stake, i64),
2876 ("offline_stake", offline_stake, i64),
2877 ("total_activated_stake", total_activated_stake, i64),
2878 );
2879 }
2880
2881 online_stake_percentage as u64
2882}
2883
2884fn cleanup_accounts_paths(config: &ValidatorConfig) {
2885 for account_path in &config.account_paths {
2886 move_and_async_delete_path_contents(account_path);
2887 }
2888 if let Some(shrink_paths) = &config.accounts_db_config.shrink_paths {
2889 for shrink_path in shrink_paths {
2890 move_and_async_delete_path_contents(shrink_path);
2891 }
2892 }
2893}
2894
2895pub fn is_snapshot_config_valid(snapshot_config: &SnapshotConfig) -> bool {
2896 if !snapshot_config.should_generate_snapshots() {
2898 return true;
2899 }
2900
2901 let SnapshotInterval::Slots(full_snapshot_interval_slots) =
2902 snapshot_config.full_snapshot_archive_interval
2903 else {
2904 return false;
2906 };
2907
2908 match snapshot_config.incremental_snapshot_archive_interval {
2909 SnapshotInterval::Disabled => true,
2910 SnapshotInterval::Slots(incremental_snapshot_interval_slots) => {
2911 full_snapshot_interval_slots > incremental_snapshot_interval_slots
2912 }
2913 }
2914}
2915
2916#[cfg(test)]
2917mod tests {
2918 use {
2919 super::*,
2920 crossbeam_channel::{bounded, RecvTimeoutError},
2921 solana_entry::entry,
2922 solana_genesis_config::create_genesis_config,
2923 solana_gossip::contact_info::ContactInfo,
2924 solana_ledger::{
2925 blockstore, create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader,
2926 get_tmp_ledger_path_auto_delete,
2927 },
2928 solana_poh_config::PohConfig,
2929 solana_sha256_hasher::hash,
2930 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
2931 std::{fs::remove_dir_all, num::NonZeroU64, thread, time::Duration},
2932 };
2933
2934 #[test]
2935 fn validator_exit() {
2936 agave_logger::setup();
2937 let leader_keypair = Keypair::new();
2938 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
2939
2940 let validator_keypair = Keypair::new();
2941 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
2942 let genesis_config =
2943 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
2944 .genesis_config;
2945 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
2946
2947 let voting_keypair = Arc::new(Keypair::new());
2948 let config = ValidatorConfig {
2949 rpc_addrs: Some((
2950 validator_node.info.rpc().unwrap(),
2951 validator_node.info.rpc_pubsub().unwrap(),
2952 )),
2953 ..ValidatorConfig::default_for_test()
2954 };
2955 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
2956 let validator = Validator::new(
2957 validator_node,
2958 Arc::new(validator_keypair),
2959 &validator_ledger_path,
2960 &voting_keypair.pubkey(),
2961 Arc::new(RwLock::new(vec![voting_keypair])),
2962 vec![leader_node.info],
2963 &config,
2964 true, None, start_progress.clone(),
2967 SocketAddrSpace::Unspecified,
2968 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
2969 Arc::new(RwLock::new(None)),
2970 )
2971 .expect("assume successful validator start");
2972 assert_eq!(
2973 *start_progress.read().unwrap(),
2974 ValidatorStartProgress::Running
2975 );
2976 validator.close();
2977 remove_dir_all(validator_ledger_path).unwrap();
2978 }
2979
2980 #[test]
2981 fn test_should_cleanup_blockstore_incorrect_shred_versions() {
2982 agave_logger::setup();
2983
2984 let ledger_path = get_tmp_ledger_path_auto_delete!();
2985 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
2986
2987 let mut validator_config = ValidatorConfig::default_for_test();
2988 let mut hard_forks = HardForks::default();
2989 let mut root_slot;
2990
2991 root_slot = 10;
2993 validator_config.wait_for_supermajority = Some(root_slot);
2994 assert_eq!(
2995 should_cleanup_blockstore_incorrect_shred_versions(
2996 &validator_config,
2997 &blockstore,
2998 root_slot,
2999 &hard_forks
3000 )
3001 .unwrap(),
3002 Some(root_slot + 1)
3003 );
3004
3005 root_slot = 15;
3008 assert_eq!(
3009 should_cleanup_blockstore_incorrect_shred_versions(
3010 &validator_config,
3011 &blockstore,
3012 root_slot,
3013 &hard_forks
3014 )
3015 .unwrap(),
3016 None,
3017 );
3018
3019 hard_forks.register(10);
3022 assert_eq!(
3023 should_cleanup_blockstore_incorrect_shred_versions(
3024 &validator_config,
3025 &blockstore,
3026 root_slot,
3027 &hard_forks
3028 )
3029 .unwrap(),
3030 None,
3031 );
3032
3033 let entries = entry::create_ticks(1, 0, Hash::default());
3035 for i in 20..35 {
3036 let shreds = blockstore::entries_to_test_shreds(
3037 &entries,
3038 i, i - 1, true, 1, );
3043 blockstore.insert_shreds(shreds, None, true).unwrap();
3044 }
3045
3046 assert_eq!(
3048 should_cleanup_blockstore_incorrect_shred_versions(
3049 &validator_config,
3050 &blockstore,
3051 root_slot,
3052 &hard_forks
3053 )
3054 .unwrap(),
3055 None,
3056 );
3057
3058 root_slot = 25;
3061 hard_forks.register(root_slot);
3062 validator_config.wait_for_supermajority = Some(root_slot);
3063 assert_eq!(
3064 should_cleanup_blockstore_incorrect_shred_versions(
3065 &validator_config,
3066 &blockstore,
3067 root_slot,
3068 &hard_forks
3069 )
3070 .unwrap(),
3071 Some(root_slot + 1),
3072 );
3073 validator_config.wait_for_supermajority = None;
3074 assert_eq!(
3075 should_cleanup_blockstore_incorrect_shred_versions(
3076 &validator_config,
3077 &blockstore,
3078 root_slot,
3079 &hard_forks
3080 )
3081 .unwrap(),
3082 Some(root_slot + 1),
3083 );
3084
3085 root_slot = 30;
3088 let latest_hard_fork = hard_forks.iter().last().unwrap().0;
3089 assert_eq!(
3090 should_cleanup_blockstore_incorrect_shred_versions(
3091 &validator_config,
3092 &blockstore,
3093 root_slot,
3094 &hard_forks
3095 )
3096 .unwrap(),
3097 Some(latest_hard_fork + 1),
3098 );
3099
3100 blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
3103 assert_eq!(
3104 should_cleanup_blockstore_incorrect_shred_versions(
3105 &validator_config,
3106 &blockstore,
3107 root_slot,
3108 &hard_forks
3109 )
3110 .unwrap(),
3111 None,
3112 );
3113 }
3114
3115 #[test]
3116 fn test_cleanup_blockstore_incorrect_shred_versions() {
3117 agave_logger::setup();
3118
3119 let validator_config = ValidatorConfig::default_for_test();
3120 let ledger_path = get_tmp_ledger_path_auto_delete!();
3121 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
3122
3123 let entries = entry::create_ticks(1, 0, Hash::default());
3124 for i in 1..10 {
3125 let shreds = blockstore::entries_to_test_shreds(
3126 &entries,
3127 i, i - 1, true, 1, );
3132 blockstore.insert_shreds(shreds, None, true).unwrap();
3133 }
3134
3135 cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
3137 assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
3139 for i in 5..10 {
3140 assert!(blockstore
3141 .get_data_shreds_for_slot(i, 0)
3142 .unwrap()
3143 .is_empty());
3144 }
3145 }
3146
3147 #[test]
3148 fn validator_parallel_exit() {
3149 let leader_keypair = Keypair::new();
3150 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
3151 let genesis_config =
3152 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
3153 .genesis_config;
3154
3155 let mut ledger_paths = vec![];
3156 let mut validators: Vec<Validator> = (0..2)
3157 .map(|_| {
3158 let validator_keypair = Keypair::new();
3159 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
3160 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
3161 ledger_paths.push(validator_ledger_path.clone());
3162 let vote_account_keypair = Keypair::new();
3163 let config = ValidatorConfig {
3164 rpc_addrs: Some((
3165 validator_node.info.rpc().unwrap(),
3166 validator_node.info.rpc_pubsub().unwrap(),
3167 )),
3168 ..ValidatorConfig::default_for_test()
3169 };
3170 Validator::new(
3171 validator_node,
3172 Arc::new(validator_keypair),
3173 &validator_ledger_path,
3174 &vote_account_keypair.pubkey(),
3175 Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
3176 vec![leader_node.info.clone()],
3177 &config,
3178 true, None, Arc::new(RwLock::new(ValidatorStartProgress::default())),
3181 SocketAddrSpace::Unspecified,
3182 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
3183 Arc::new(RwLock::new(None)),
3184 )
3185 .expect("assume successful validator start")
3186 })
3187 .collect();
3188
3189 validators.iter_mut().for_each(|v| v.exit());
3191
3192 let (sender, receiver) = bounded(0);
3194 let _ = thread::spawn(move || {
3195 validators.into_iter().for_each(|validator| {
3196 validator.join();
3197 });
3198 sender.send(()).unwrap();
3199 });
3200
3201 let timeout = Duration::from_secs(60);
3202 if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
3203 panic!("timeout for shutting down validators",);
3204 }
3205
3206 for path in ledger_paths {
3207 remove_dir_all(path).unwrap();
3208 }
3209 }
3210
3211 #[test]
3212 fn test_wait_for_supermajority() {
3213 agave_logger::setup();
3214 let node_keypair = Arc::new(Keypair::new());
3215 let cluster_info = ClusterInfo::new(
3216 ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
3217 node_keypair,
3218 SocketAddrSpace::Unspecified,
3219 );
3220
3221 let (genesis_config, _mint_keypair) = create_genesis_config(1);
3222 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
3223 let mut config = ValidatorConfig::default_for_test();
3224 let rpc_override_health_check = Arc::new(AtomicBool::new(false));
3225 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
3226
3227 assert!(!wait_for_supermajority(
3228 &config,
3229 None,
3230 &bank_forks,
3231 &cluster_info,
3232 rpc_override_health_check.clone(),
3233 &start_progress,
3234 )
3235 .unwrap());
3236
3237 config.wait_for_supermajority = Some(1);
3239 assert!(matches!(
3240 wait_for_supermajority(
3241 &config,
3242 None,
3243 &bank_forks,
3244 &cluster_info,
3245 rpc_override_health_check.clone(),
3246 &start_progress,
3247 ),
3248 Err(ValidatorError::NotEnoughLedgerData(_, _)),
3249 ));
3250
3251 let bank_forks = BankForks::new_rw_arc(Bank::new_from_parent(
3253 bank_forks.read().unwrap().root_bank(),
3254 &Pubkey::default(),
3255 1,
3256 ));
3257 config.wait_for_supermajority = Some(0);
3258 assert!(!wait_for_supermajority(
3259 &config,
3260 None,
3261 &bank_forks,
3262 &cluster_info,
3263 rpc_override_health_check.clone(),
3264 &start_progress,
3265 )
3266 .unwrap());
3267
3268 config.wait_for_supermajority = Some(1);
3270 config.expected_bank_hash = Some(hash(&[1]));
3271 assert!(matches!(
3272 wait_for_supermajority(
3273 &config,
3274 None,
3275 &bank_forks,
3276 &cluster_info,
3277 rpc_override_health_check,
3278 &start_progress,
3279 ),
3280 Err(ValidatorError::BankHashMismatch(_, _)),
3281 ));
3282 }
3283
3284 #[test]
3285 fn test_is_snapshot_config_valid() {
3286 fn new_snapshot_config(
3287 full_snapshot_archive_interval_slots: Slot,
3288 incremental_snapshot_archive_interval_slots: Slot,
3289 ) -> SnapshotConfig {
3290 SnapshotConfig {
3291 full_snapshot_archive_interval: SnapshotInterval::Slots(
3292 NonZeroU64::new(full_snapshot_archive_interval_slots).unwrap(),
3293 ),
3294 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3295 NonZeroU64::new(incremental_snapshot_archive_interval_slots).unwrap(),
3296 ),
3297 ..SnapshotConfig::default()
3298 }
3299 }
3300
3301 assert!(is_snapshot_config_valid(&SnapshotConfig::default()));
3303
3304 assert!(is_snapshot_config_valid(&SnapshotConfig {
3306 incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3307 ..SnapshotConfig::default()
3308 }));
3309
3310 assert!(!is_snapshot_config_valid(&SnapshotConfig {
3312 full_snapshot_archive_interval: SnapshotInterval::Disabled,
3313 ..SnapshotConfig::default()
3314 }));
3315
3316 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 200)));
3318 assert!(is_snapshot_config_valid(&new_snapshot_config(100, 42)));
3319 assert!(is_snapshot_config_valid(&new_snapshot_config(444, 200)));
3320 assert!(is_snapshot_config_valid(&new_snapshot_config(400, 222)));
3321
3322 assert!(!is_snapshot_config_valid(&new_snapshot_config(42, 100)));
3324 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 100)));
3325 assert!(!is_snapshot_config_valid(&new_snapshot_config(100, 200)));
3326
3327 assert!(is_snapshot_config_valid(&SnapshotConfig::new_disabled()));
3329 assert!(is_snapshot_config_valid(&SnapshotConfig::new_load_only()));
3330 assert!(is_snapshot_config_valid(&SnapshotConfig {
3331 full_snapshot_archive_interval: SnapshotInterval::Slots(NonZeroU64::new(37).unwrap()),
3332 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
3333 NonZeroU64::new(41).unwrap()
3334 ),
3335 ..SnapshotConfig::new_load_only()
3336 }));
3337 assert!(is_snapshot_config_valid(&SnapshotConfig {
3338 full_snapshot_archive_interval: SnapshotInterval::Disabled,
3339 incremental_snapshot_archive_interval: SnapshotInterval::Disabled,
3340 ..SnapshotConfig::new_load_only()
3341 }));
3342 }
3343
3344 fn target_tick_duration() -> Duration {
3345 let target_tick_duration_us =
3353 solana_clock::DEFAULT_MS_PER_SLOT * 1000 / solana_clock::DEFAULT_TICKS_PER_SLOT;
3354 assert_eq!(target_tick_duration_us, 6250);
3355 Duration::from_micros(target_tick_duration_us)
3356 }
3357
3358 #[test]
3359 fn test_poh_speed() {
3360 agave_logger::setup();
3361 let poh_config = PohConfig {
3362 target_tick_duration: target_tick_duration(),
3363 hashes_per_tick: Some(100 * solana_clock::DEFAULT_HASHES_PER_TICK),
3365 ..PohConfig::default()
3366 };
3367 let genesis_config = GenesisConfig {
3368 poh_config,
3369 ..GenesisConfig::default()
3370 };
3371 let bank = Bank::new_for_tests(&genesis_config);
3372 assert!(check_poh_speed(&bank, Some(10_000)).is_err());
3373 }
3374
3375 #[test]
3376 fn test_poh_speed_no_hashes_per_tick() {
3377 agave_logger::setup();
3378 let poh_config = PohConfig {
3379 target_tick_duration: target_tick_duration(),
3380 hashes_per_tick: None,
3381 ..PohConfig::default()
3382 };
3383 let genesis_config = GenesisConfig {
3384 poh_config,
3385 ..GenesisConfig::default()
3386 };
3387 let bank = Bank::new_for_tests(&genesis_config);
3388 check_poh_speed(&bank, Some(10_000)).unwrap();
3389 }
3390}